python - Kafka分区延迟增加

标签 python apache-kafka kafka-consumer-api kafka-python

我有一个使用 Kafka 1.0 作为队列的应用程序。 Kafka 主题有 80 个分区和 80 个消费者在运行。 (Kafka-python 消费者)。

通过运行命令:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup  --describe 

我看到其中一个分区卡在偏移量处,并且随着向其添加新记录,延迟不断增加。

上述命令的输出看起来像这样:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST

118 mytopic                       37         1924            2782            858        kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic                       38         2741            2742            1          kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic                       39         2713            2713            0          kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic                       40         2687            2688            1          kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost

这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移量也是不可取的,因为可能不会定期手动监控此服务器。

客户端在 Linux m/c 中作为并行进程在后台运行:

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
                     session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
                     auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
                     value_deserializer=lambda m: json.loads(m.decode('ascii')))

for message in consumer:
    msg = json.loads(message.value)
    process_message(msg)

最佳答案

If consumer offset is not moving after some time, then consumer is likely to have stopped. If consumer offset is moving, but consumer lag (difference between the end of the log and the consumer offset) is increasing, the consumer is slower than the producer. If the consumer is slow, the typical solution is to increase the degree of parallelism in the consumer. This may require increasing the number of partitions of a topic.

在 Kafka docs 阅读更多内容.

简单来说;你生产的比你消费的多。您需要提高消耗率以减少滞后。您需要添加更多消费者。如果您只是在测试,那么您的消费者速度很慢。

关于python - Kafka分区延迟增加,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47305359/

相关文章:

python - 如何有效地从多个列表中获取一组唯一值 (Python)

apache-kafka - Kafka - 添加自定义 header 以记录元数据

apache-kafka - 破折号在 CURRENT-OFFSET 中代表什么

spring-boot - 使用 Spring Boot 创建 Kafka 主题

apache-kafka - Kafka自动提交间隔最佳实践

apache-kafka - Kafka : The message when serialized is larger than the maximum request size you have configured with the max. request.size 配置

python - 突出显示数据框中的某些单词 Pandas HTML

python - Pandas Groupby 对特定列进行聚合函数,显示结果中的所有列

python - numpy中分数的线性系统解决方案

docker - 使用相同的kafka主题时如何复制微服务?