java - 使用 SpringBoot 使用 Batch 从 Kafka 读取数据无法正常工作

标签 java spring-boot apache-kafka spring-kafka

我使用 SpringBoot 并希望使用批处理从 Kafka 读取数据。我的 application.yml 如下所示:

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
    properties:
      schema.registry.url: http://localhost:8081
    consumer:
      auto-offset-reset: earliest
      max-poll-records: 50000
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      group-id: "batch"
      properties:
        fetch.min.bytes: 1000000
        fetch.max.wait.ms: 20000
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    listener:
      type: batch

我的听众:

@KafkaListener(id = "bar2", topics = "TestTopic")
public void listen(List<ConsumerRecord<String, GenericRecord>> records) {
   log.info("start of batch receive. Size::{}", records.size());
}

在日志中我看到:

2019-10-04 11:08:19.693  INFO 2123 --- [     bar2-0-C-1] kafka.batch.demo.DemoApplication         : start of batch receive. Size::33279
2019-10-04 11:08:19.746  INFO 2123 --- [     bar2-0-C-1] kafka.batch.demo.DemoApplication         : start of batch receive. Size::33353
2019-10-04 11:08:19.784  INFO 2123 --- [     bar2-0-C-1] kafka.batch.demo.DemoApplication         : start of batch receive. Size::33400
2019-10-04 11:08:19.821  INFO 2123 --- [     bar2-0-C-1] kafka.batch.demo.DemoApplication         : start of batch receive. Size::33556
2019-10-04 11:08:39.859  INFO 2123 --- [     bar2-0-C-1] kafka.batch.demo.DemoApplication         : start of batch receive. Size::16412

我设置了必需的设置:fetch.min.bytesfetch.max.wait.ms,但它们没有产生任何效果。

在日志中,我发现在任何设置下,一个包的大小都不超过 33000 个。我崩溃了,我不明白为什么会发生这种事?

最佳答案

max.poll.records 只是一个最大值。

还有其他属性会影响您获取的记录数量

  • fetch.min.bytes - 服务器应为获取请求返回的最小数据量。如果可用数据不足,请求将等待积累足够多的数据,然后再答复请求。默认设置为 1 字节,意味着只要有一个字节的数据可用,或者获取请求在等待数据到达时超时,就会立即应答获取请求。将其设置为大于 1 的值将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会带来一些额外的延迟。
  • fetch.max.wait.ms - 如果没有足够的数据来立即满足 fetch.min 给出的要求,则服务器在响应提取请求之前将阻塞的最长时间。字节。

参见the documentation .

没有办法精确控制最小记录数(除非它们的长度都相同)。

关于java - 使用 SpringBoot 使用 Batch 从 Kafka 读取数据无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58113679/

相关文章:

python - 由于 KafkaTimeoutError,无法使用 kafka-python 从 django 应用程序向 kafka 发送消息

java - application.properties 中的 Kafka ErrorHandlingDeserializer2 配置值

java - 我如何剪切用于 Selenium 测试的java堆栈跟踪?

Java - 计算 2D boolean 数组中剩余的 false

java - 无法 hibernate 以连接到mysql数据库 - Spring

spring-boot - 401未经授权的页面会 Swagger 吗?

mysql - 如何在 Spring Boot 配置中设置正确的 MySQL JDBC 时区

apache-kafka - 我可以设置Kafka Stream消费者group.id吗?

java - GridBagLayout 中的图像大小不正确

java - 如何使 Autowired 在 Configuration 类中工作?