python 卡夫卡: Is there a way to block a consumer on a kafka topic till a new message is posted?

标签 python python-3.x apache-kafka kafka-python

我有一个消费者订阅了一个生产者线程定期发布的测试主题。我希望能够阻塞消费者线程,直到出现新消息 - 然后处理它并再次开始等待。我最接近的是:

consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                         bootstrap_servers=[localhost_],
                         api_version=(0, 10), consumer_timeout_ms=1000)
while True:
    print(consumer.poll(timeout_ms=5000))

是否有更惯用的方式(或者这种方式是否存在我看不到的任何严重问题)?

kafka 的新手,因此非常欢迎对此设计提出一般性建议。完整(运行)示例:

import time
from threading import Thread

import kafka
from kafka import KafkaProducer, KafkaConsumer

print('python-kafka:', kafka.__version__)

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(str(key), encoding='utf-8')
        value_bytes = bytes(str(value), encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
    except Exception as ex:
        print('Exception in publishing message\n', ex)

localhost_ = 'localhost:9092'

def kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[localhost_],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    j = 0
    while True:
        publish_message(_producer, topic_name, value=j, key=j)
        j += 1
        time.sleep(5)

if __name__ == '__main__':
    print('Running Producer..')
    topic_name = 'test'
    prod_thread = Thread(target=kafka_producer)
    prod_thread.start()
    consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                             bootstrap_servers=[localhost_],
                             api_version=(0, 10), consumer_timeout_ms=1000)
    # consumer.subscribe([topic_name])
    while True:
        print(consumer.poll(timeout_ms=5000))

python-kafka: 1.3.5

最佳答案

无限循环中的轮询是 Kafka: The Definitive Guide 中的建议以及。这是来自 Chapter 4. Kafka Consumers: Reading Data from Kafka 的 Java 摘录使用相同的想法:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        ...
    }
}

这很好地解释了如何在 Python 中推荐使用这些库。

kafka-python(请参阅 A Tale of Two Kafka Clients 中的完整示例)

from kafka import KafkaConsumer
...
kafka_consumer = Consumer(
...
)
consumer.subscribe([topic])

running = True
while running:
    message = kafka_consumer.poll()
...

confluent-kafka-python(请参阅 Introduction to Apache Kafka for Python Programmers 中的完整示例)

from confluent_kafka import Consumer, KafkaError
...
c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
...

可能会出现的另一个密切相关的问题是您如何处理消息。

您的这部分代码可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能会导致处理尝试失败。

因此,实现重试逻辑可能是个好主意,您可以在博文 Retrying consumer architecture in the Apache Kafka 中找到关于重试逻辑的详细描述。 .

关于 python 卡夫卡: Is there a way to block a consumer on a kafka topic till a new message is posted?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52262615/

相关文章:

python - PySide2 - 绑定(bind)模型以查看

python - Django 鹡鸰 'expected string or bytes-like object' 错误

python - 从 Excel 中提取日期并使用 python 将其附加到列表中

apache-kafka - Kafka Broker vs Topic

python - 子进程调用 ls 时出错

python - tf.layers.dense 如何创建inputs.kernel 权重矩阵?

python - 从复制和粘贴的 doctest 中删除 '>>> '

python - 如何从 beautifulsoup4 中的标签获取命名空间信息?

amazon-ec2 - 为什么我无法从外部连接到 Kafka?

java - 当组中的一个订阅者被严格分配到特定分区时,Kafka 重新平衡