apache-kafka - 如何强制消费者读取kafka中的特定分区

标签 apache-kafka kafka-python

我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者。但是,网页下载的超时时间为 60 秒。
当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。

我已经尝试了中提到的所有内容

Kafka consumer configuration / performance issues



https://github.com/spring-projects/spring-kafka/issues/202

但我每次都会收到不同的错误。

是否可以将特定消费者与 kafka 中的分区联系起来?
我正在为我的应用程序使用 kafka-python

最佳答案

我错过了 Kafka-python 的文档。我们可以使用 TopicPartition 类为一个特定的消费者分配一个分区。

http://kafka-python.readthedocs.io/en/master/

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)

关于apache-kafka - 如何强制消费者读取kafka中的特定分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45940171/

相关文章:

apache-kafka - Kafka 重新平衡和监听器陷阱

apache-kafka - Kafka - 主题 & 分区 & 消费者

python - NoBrokersAvailable : NoBrokersAvailable-Kafka Error

hadoop - Apache Kafka 是否将消息内部存储在 HDFS 或其他文件系统中

apache-kafka - Spark Streaming - 是否可以使用 Kafka 主题的特定分区?

apache-spark - Spark Dataframe 到 Kafka

apache-kafka - Kafka Consumer 在处理消息时重试次数有限

python - 使用 ssl 访问 kafka 代理时出错

Python - 无模式 Apache Avro 数据序列化

python - 无法使用kafka-python从另一个容器向Kafka容器发出请求