java - Spring Kafka 生产者抛出 TimeoutExceptions

标签 java spring-boot apache-kafka kubernetes

问题

我在 Kubernetes 中有一个 Kafka 设置和三个代理,根据 https://github.com/Yolean/kubernetes-kafka 上的指南设置.从 Java 客户端生成消息时出现以下错误消息。

2018-06-06 11:15:44.103 ERROR 1 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='[...redacted...]':
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topicname-0: 30001 ms has passed since last append

详细设置

监听器设置为允许来自外部世界的 SSL 生产者/消费者:

advertised.host.name = null
advertised.listeners = OUTSIDE://kafka-0.mydomain.com:32400,PLAINTEXT://:9092
advertised.port = null
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:SSL
listeners = OUTSIDE://:9094,PLAINTEXT://:9092
inter.broker.listener.name = PLAINTEXT
host.name =
port.name = 9092

外部监听器正在监听 kafka-0.mydomain.com、kafka-1.mydomain.com 等。纯文本监听器正在监听任何 IP,因为它们是 Kubernetes 的本地集群。

生产者设置:

kafka:
  bootstrap-servers: kafka.mydomain.com:9092
  properties:
    security.protocol: SSL
   producer:
    batch-size: 16384
    buffer-memory: 1048576 # 1MB
    retries: 1
    ssl:
      key-password: redacted
      keystore-location: file:/var/private/ssl/kafka.client.keystore.jks
      keystore-password: redacted
      truststore-location: file:/var/private/ssl/kafka.client.truststore.jks
      truststore-password: redacted

另外我在代码中设置linger.ms为100,强制消息在100ms以内发送。延迟时间有意设置得较低,因为用例需要最小的延迟。

分析

  • 当代理被移动到 SSL 时,错误开始出现。
  • 在服务器端,一切都按预期运行,日志中没有错误,我可以使用 Kafka 客户端工具手动连接到代理。
  • 间歇性地出现错误:有时它每秒发送 30 多条消息,有时它根本不发送任何消息。它可能像魅力一样工作数小时,然后只是垃圾邮件超时一段时间。
  • 客户端和服务器的时钟同步 (UTC)。
  • 生产端和服务器端的 CPU 始终保持在 20% 左右。

它会是什么?

最佳答案

此问题通常发生在生产者比代理快时,您的设置发生这种情况的原因似乎是 SSL 需要额外的 CPU,这可能会降低代理的速度。但无论如何请检查以下内容:

  • 检查您是否以相同的速度生成消息,根据您所说的,您似乎有尖峰。
  • 另一种可能性是集群中的其他 kafka 客户端(生产者或消费者)不一定使用相同的主题,导致这种情况发生,因为代理过载(检查代理 cpu/网络)。

要尽量减少导致此保留的原因,您应该将 buffer-memory 增加到 32MB 以上,认为 32MB 是默认值,您将其设置得更低。你拥有的越低,缓冲区就越容易变满,如果发生这种情况,它最多会阻塞 max.block.ms,并且请求将在 request.timeout.ms< 后超时.

您应该增加的另一个参数是batch-size,这个参数以字节为单位,而不是消息的数量。还应增加 linger.ms,以防此生产者消息是在用户请求时间内创建的,不要增加太多,一个不错的选择可能是 1-4 毫秒。

batch.size 已满或需要比 linger.ms 更长的时间来拥有比 batch.size 更多的数据时,将发送消息>。大批量在正常情况下会增加吞吐量,但如果延迟太低则无济于事,因为您会在有足够数据获得 batch.size 之前发送。

同时重新检查生产者日志是否正确加载了属性。

关于java - Spring Kafka 生产者抛出 TimeoutExceptions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50725643/

相关文章:

java - 弹性4j - 请求超时

使用 Spring Boot 后端通过 Google Sign-In 进行 Android 登录

python - 在 Python 中指示 group_id 时,Kafka 未收到消息

java - 根据 String[] 创建 int[](特殊计数器)

java - 如何销毁现有框架窗口的内存空间,同时在命令按钮中调用另一个框架

java - Spring 外部化属性和 logback

spring-boot - tomcat 中的 Spring boot WAR - 外部化 application.properties

spring-integration - spring kafka 集成动态创建消费者

apache-kafka - Zookeeper session 不断过期...没有心跳?

java - 在Java中压缩现有文件的简单方法?