我有一个使用 Kafka 1.0 作为队列的应用程序。 Kafka 主题有 80 个分区和 80 个消费者在运行。 (Kafka-python 消费者)。
通过运行命令:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
我看到其中一个分区卡在偏移量处,并且随着向其添加新记录,延迟不断增加。
上述命令的输出看起来像这样:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
118 mytopic 37 1924 2782 858 kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic 38 2741 2742 1 kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic 39 2713 2713 0 kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic 40 2687 2688 1 kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移量也是不可取的,因为可能不会定期手动监控此服务器。
客户端在 Linux m/c 中作为并行进程在后台运行:
consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
msg = json.loads(message.value)
process_message(msg)
最佳答案
If consumer offset is not moving after some time, then consumer is likely to have stopped. If consumer offset is moving, but consumer lag (difference between the end of the log and the consumer offset) is increasing, the consumer is slower than the producer. If the consumer is slow, the typical solution is to increase the degree of parallelism in the consumer. This may require increasing the number of partitions of a topic.
在 Kafka docs 阅读更多内容.
简单来说;你生产的比你消费的多。您需要提高消耗率以减少滞后。您需要添加更多消费者。如果您只是在测试,那么您的消费者速度很慢。
关于python - Kafka分区延迟增加,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47305359/