python - kafka python 不传递消息

标签 python apache-kafka kafka-python

我正在尝试使用 kafka-python 设置一个简单的 Kafka 应用程序.我一直在尝试让我在网上找到的一些示例起作用,但似乎无法做到。我有一个在 docker 容器中运行的 kafka 实例。我测试了 shell 工具,实例确实可以正常工作。我能够发送和接收消息。我怀疑生产者消息超时。以下是具有基本相同行为的代码的两个版本:

import time
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'test'
producer.send_messages(topic, b'this is a message')

第二个版本:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], api_version=(0,10))
topic = "test"

producer.send(topic, b'test message')

最佳答案

改变行: producer.send(topic, b'test message')

收件人: producer.send(topic, b'test message').get(timeout=30)(或您认为合适的任何值)

问题是生产者在发送消息之前被杀死,因为这个方法是异步的。 如果您添加:

import logging
logging.basicConfig(level=logging.INFO)

并看到超时为 0。

关于python - kafka python 不传递消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44996929/

相关文章:

python - 如何在Windows中安装mysql-python?

python - kafka-python ssl 支持 python < v2.7.9(无属性 'SSLContext')

message-queue - kafka多经纪人的优点

python - 如何在 Kafka 中实现请求-回复(同步)消息传递范式?

Python-Kafka : Keep polling topic infinitely

python - 是否有更 pythonic 的方式来组合列表中的重复对象?

python - 尝试 .read() 时,Mechanze 表单提交会导致响应 'Assertion Error'

Python - 从主文件运行多个Python脚本

java - 如何在流查询(Java)中使用 JSON 数组作为 Kafka 记录?