python - 使用 Python 读取特定的 Kafka 主题

标签 python apache-kafka kafka-python

我的主题有 3 个分区,我尝试使用以下代码从每个特定分区读取内容

from kafka import KafkaConsumer, TopicPartition

brokers = 'localhost:9092'
topic = 'b3'

m = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
par = TopicPartition(topic=topic, partition=1)
m.assign(par)

但我收到此错误:

    raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.

有人可以帮我解决这个问题吗?

最佳答案

您可以从 KafkaConsumer() 中删除主题参数并重试吗?

示例:

# manually assign the partition list for the consumer
from kafka import TopicPartition, KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

引用:http://kafka-python.readthedocs.io/en/master/

关于python - 使用 Python 读取特定的 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46856937/

相关文章:

python - Kafka生产者不选择新分区

python - Python 中的记忆化、类和多处理

java - LIUM 演讲者分类如何运作?

python - Numpy 转置用法

python - 如何在 Pandas 中获得 "group by"单元格值?

java - 动态消费来自kafka主题的消息

apache-spark - Kafka 结构化流 java.lang.NoClassDefFoundError

apache-kafka - Kafka 连接与多个消息队列的集成

python - python kafka 库的编码/格式问题

python - 如果Kafka消费者关闭,如何读取最后一条消费消息后的消息?