java - 无法从 kafka 主题轮询/获取所有记录

标签 java apache-kafka kafka-consumer-api spring-kafka

我正在尝试轮询来自特定主题的数据,例如 kafka 正在接收 100 条记录/秒 但大多数时候它不会获取所有记录。 我使用超时为 5000 毫秒,并且每隔 100 毫秒调用此方法 注意:我也订阅了特定主题

@Scheduled(fixedDelayString = "100")

    public void pollRecords() {
        ConsumerRecords<String, String> records = 
        leadConsumer.poll("5000");

如何从kafka获取所有数据?

最佳答案

从 poll() 返回的最大记录数由 max.poll.records 消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制从服务器返回的最大数据量。 (fetch.max.bytesmax.partition.fetch.bytes)

另一方面,在代理端还有另一个大小限制,称为 message.max.bytes

因此您应该正确设置这些参数以获取更多消息。

来自 Kafka 文档 (link):

max.poll.records: The maximum number of records returned in a single call to poll(). (default: 500)

fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)

message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)

max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)

关于java - 无法从 kafka 主题轮询/获取所有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60666407/

相关文章:

java - Google Guava Function 界面中的@Nullable 输入触发 FindBugs 警告

java - 如何合并和验证 swt 对话框的两个文本字段?

performance - Kafka + Spark 可扩展性

apache-kafka - Kafka Java API偏移量操作说明

java - 来自 KafkaConsumer 的 NiFi 流文件属性

java - 检查 HttpServletRequest 的连接是否仍然打开?

java - 如何使用注释在 Spring 4 中重新加载属性文件?

apache-kafka - 分区的重新分配无限地停留在 "Still in progress"状态

go - 在 go 中使用 Kafka Avro 消息

node.js - 带有 pm2 集群的 Nodejs Kafka 消费者