apache-kafka - 卡夫卡休息的例子

标签 apache-kafka kafka-consumer-api kafka-producer-api confluent-platform

有没有生产者和消费者组在 Java 中使用 Kafka rest api 的好例子。我不是在寻找生产者和消费者的 simpleconsumer 或 kafka 客户端示例。任何帮助表示赞赏。

最佳答案

这是 Confluent 的示例 Rest API(Rest 代理)代码。
不幸的是,不是在 Java 中,而是在 Python 中。 :(
我必须输入它,所以它可能包含一些拼写错误。我希望这对你有一点帮助。

(使用 Python 编写的 REST API 的生产者)

import requests
import base64
import json

url = "http://restproxy:8082/topics/my_topic"
headers = {
    "Content-Type" : "application/vnd.kafka.binary.v1 + json",
}
# Create one or more messages
payload = {"records":
       [{
           "key":base64.b64encode('firstkey'),
           "value":base64.b64encode('firstvalue'),
       }],
}
# Send the message
r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code != 200:
   print("Status Code: " + str(r.status_code))
   print(r.text)

(使用 Python 编写的 Rest API 的消费者)
import requests
import base64
import json
import sys

#Base URL for interacting with REST server
baseurl = "http://restproxy:8082/consumers/group1"

#Create the Consumer instance
print("Creating consumer instance")
payload {
    "format": "binary",
}
headers = {
    "Content-Type" : "application/vnd.kafka.v1+json",
}
r = requests.post(baseurl, data=json.dumps(payload), headers=headers)

if r.status_code !=200:
    print("Status Code: " + str(r.status_code))
    print(r.text)
    sys.exit("Error thrown while creating consumer")

# Base URI is used to identify the consumer instance
base_uri = r.json()["base_uri"]

#Get the messages from the consumer
headers = {
    "Accept" : "application/vnd.kafka.binary.v1 + json",
}

# Request messages for the instance on the Topic
r = requests.get(base_uri + "/topics/my_topic", headers = headers, timeout =20)

if r.status_code != 200: 
    print("Status Code: " + str(r.status_code))
    print(r.text)
    sys.exit("Error thrown while getting message")

# Output all messages
for message in r.json():
    if message["key"] is not None:
        print("Message Key:" + base64.b64decode(message["key"]))
    print("Message Value:" + base64.b64decode(message["value"]))

# When we're done, delete the consumer
headers = {
    "Accept" : "application/vnd.kafka.v1+json",
}

r = requests.delete(base_uri, headers=headers)

if r.status_code != 204: 
    print("Status Code: " + str(r.status_code))
    print(r.text)

关于apache-kafka - 卡夫卡休息的例子,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38983915/

相关文章:

linux - 使用新的消费者API时如何删除属于主题T的消费者组C

Java在源码中设置Kafka保留时间

Golang segmentio/kafka-go 消费者不工作

c# - Kafka Confluent 库中 poll 和 consume 的区别

java - 如何在kafka中创建自定义序列化器?

java - 为什么元数据添加到这个 Kafka 连接器的输出中?

javascript - Node.js 中的 Kafka 消费者

java - 当主题中没有更多记录时,如何刷新Kafka Consumer中的数据批处理

apache-kafka - 向 Kafka 发送消息时是否需要 key ?

java - KafkaProducer不发送记录