我正在尝试轮询来自特定主题的数据,例如 kafka 正在接收 100 条记录/秒 但大多数时候它不会获取所有记录。 我使用超时为 5000 毫秒,并且每隔 100 毫秒调用此方法 注意:我也订阅了特定主题
@Scheduled(fixedDelayString = "100")
public void pollRecords() {
ConsumerRecords<String, String> records =
leadConsumer.poll("5000");
如何从kafka获取所有数据?
最佳答案
从 poll() 返回的最大记录数由 max.poll.records 消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制从服务器返回的最大数据量。 (fetch.max.bytes
和 max.partition.fetch.bytes
)
另一方面,在代理端还有另一个大小限制,称为 message.max.bytes
。
因此您应该正确设置这些参数以获取更多消息。
来自 Kafka 文档 (link):
max.poll.records: The maximum number of records returned in a single call to poll(). (default: 500)
fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)
message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)
max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)
关于java - 无法从 kafka 主题轮询/获取所有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60666407/