我使用 SpringBoot
并希望使用批处理从 Kafka
读取数据。我的 application.yml
如下所示:
spring:
kafka:
bootstrap-servers:
- localhost:9092
properties:
schema.registry.url: http://localhost:8081
consumer:
auto-offset-reset: earliest
max-poll-records: 50000
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: "batch"
properties:
fetch.min.bytes: 1000000
fetch.max.wait.ms: 20000
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
listener:
type: batch
我的听众:
@KafkaListener(id = "bar2", topics = "TestTopic")
public void listen(List<ConsumerRecord<String, GenericRecord>> records) {
log.info("start of batch receive. Size::{}", records.size());
}
在日志中我看到:
2019-10-04 11:08:19.693 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33279
2019-10-04 11:08:19.746 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33353
2019-10-04 11:08:19.784 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33400
2019-10-04 11:08:19.821 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33556
2019-10-04 11:08:39.859 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::16412
我设置了必需的设置:fetch.min.bytes
和fetch.max.wait.ms
,但它们没有产生任何效果。
在日志中,我发现在任何设置下,一个包的大小都不超过 33000 个。我崩溃了,我不明白为什么会发生这种事?
最佳答案
max.poll.records
只是一个最大值。
还有其他属性会影响您获取的记录数量
fetch.min.bytes
- 服务器应为获取请求返回的最小数据量。如果可用数据不足,请求将等待积累足够多的数据,然后再答复请求。默认设置为 1 字节,意味着只要有一个字节的数据可用,或者获取请求在等待数据到达时超时,就会立即应答获取请求。将其设置为大于 1 的值将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会带来一些额外的延迟。fetch.max.wait.ms
- 如果没有足够的数据来立即满足 fetch.min 给出的要求,则服务器在响应提取请求之前将阻塞的最长时间。字节。
没有办法精确控制最小记录数(除非它们的长度都相同)。
关于java - 使用 SpringBoot 使用 Batch 从 Kafka 读取数据无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58113679/