python - 如何在程序中停止 Python Kafka Consumer?

标签 python apache-kafka kafka-consumer-api kafka-python

我正在做 Python Kafka 消费者(尝试在 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html 中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer)。当我运行下面这段代码时,它会一直运行,即使所有的消息都被消耗掉了。 希望消费者消费完所有的消息就停下来。怎么做?我也不知道如何使用 stop() 函数(在基类 kafka.consumer.base.Consumer 中)。

更新

我使用信号处理程序来调用 consumer.stop()。一些错误消息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过 client.close()。但结果相同。

我需要一些方法来优雅地停止 for 循环。

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

欢迎任何帮助。谢谢。

最佳答案

我们可以先查看主题中最后一条消息的偏移量。 然后在我们达到该偏移量时停止循环。

    client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)

    consumer.seek_to_beginning(tp)        

    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break

关于python - 如何在程序中停止 Python Kafka Consumer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31841191/

相关文章:

spring-boot - kafka什么时候会重试处理尚未确认的消息?

python - 使用 os.walk() 时如何排除目录?其他方法没有效果

python - Groupby 中的项目计数

java - pom.xml 中的依赖项在 flink kafka 连接器示例中不起作用

message-queue - 消费者平衡如何在卡夫卡发挥作用?

java - 如何在同一线程中成功对第二个 Kafka 消费者执行轮询?

apache-kafka - Kafka 流与 Kafka 消费者如何决定使用什么

python - 为什么 pynput 不检测数字键盘按下?

python - 将一个数据框与另一个数据框分离

hadoop - Gobblin Kafka 到 HDFS pull 作业报错