python - 如何在 Pykafka simpleconsumer 中选择起始偏移量?

标签 python apache-kafka consumer pykafka

在我的kafka集群单分区主题中,我有一个简单的消费者处理所有传入消息,如果处理的数据出现错误,我想以相同的顺序重新处理来自某个偏移量(不是开头)的所有消息,以修复不一致并保持来自 kafka 的原始有序消息序列。

有没有办法用 Pykafka 来做到这一点?我不明白

最佳答案

您需要调用 reset_offsets() 。例如:

consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
# because we passed in a consumer_group the new offsets will be saved in Kafka
consumer.reset_offsets(partition_offsets=partition_offset_pairs)

(其中 get_offset_for_partition() 是您定义的函数)。或者对于单分区主题:

# read from offset 123456
consumer = topic.get_simple_consumer()
partition = topic.partitions[0]
consumer.reset_offsets([(partition, 123456)])

相同的 reset_offsets() 方法也可用于 BalancedConsumerManagedBalanceConsumer 类。

请注意,作为 Kafka 设计的一部分,仅保证每个主题分区独立地按顺序发送消息。

关于python - 如何在 Pykafka simpleconsumer 中选择起始偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47659137/

相关文章:

kotlin - Spring Cloud Stream Kotlin消费者问题

java - Kafka Consumer架构设计 : java plugin or external client

python - 如何访问 distutils 版本号?

python - SWIG:使用自定义代码包装对 C++ 方法的调用?

spring-boot - Camel 上下文在应用程序启动后立即启动关闭,没有明显的错误

java - Kafka Java Consumer 已关闭

python - 使用多个键从字典中获取元素

python - 从图像中堆叠星 PSF;对齐子像素中心

json - 与 AVRO 相比,在 kafka 流中使用 JSON 序列化程序会导致性能下降吗?

c - 无法获取互斥锁以防止线程访问函数