问题
我在 Kubernetes 中有一个 Kafka 设置和三个代理,根据 https://github.com/Yolean/kubernetes-kafka 上的指南设置.从 Java 客户端生成消息时出现以下错误消息。
2018-06-06 11:15:44.103 ERROR 1 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='[...redacted...]':
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topicname-0: 30001 ms has passed since last append
详细设置
监听器设置为允许来自外部世界的 SSL 生产者/消费者:
advertised.host.name = null
advertised.listeners = OUTSIDE://kafka-0.mydomain.com:32400,PLAINTEXT://:9092
advertised.port = null
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:SSL
listeners = OUTSIDE://:9094,PLAINTEXT://:9092
inter.broker.listener.name = PLAINTEXT
host.name =
port.name = 9092
外部监听器正在监听 kafka-0.mydomain.com、kafka-1.mydomain.com 等。纯文本监听器正在监听任何 IP,因为它们是 Kubernetes 的本地集群。
生产者设置:
kafka:
bootstrap-servers: kafka.mydomain.com:9092
properties:
security.protocol: SSL
producer:
batch-size: 16384
buffer-memory: 1048576 # 1MB
retries: 1
ssl:
key-password: redacted
keystore-location: file:/var/private/ssl/kafka.client.keystore.jks
keystore-password: redacted
truststore-location: file:/var/private/ssl/kafka.client.truststore.jks
truststore-password: redacted
另外我在代码中设置linger.ms
为100,强制消息在100ms以内发送。延迟时间有意设置得较低,因为用例需要最小的延迟。
分析
- 当代理被移动到 SSL 时,错误开始出现。
- 在服务器端,一切都按预期运行,日志中没有错误,我可以使用 Kafka 客户端工具手动连接到代理。
- 间歇性地出现错误:有时它每秒发送 30 多条消息,有时它根本不发送任何消息。它可能像魅力一样工作数小时,然后只是垃圾邮件超时一段时间。
- 客户端和服务器的时钟同步 (UTC)。
- 生产端和服务器端的 CPU 始终保持在 20% 左右。
它会是什么?
最佳答案
此问题通常发生在生产者比代理快时,您的设置发生这种情况的原因似乎是 SSL 需要额外的 CPU,这可能会降低代理的速度。但无论如何请检查以下内容:
- 检查您是否以相同的速度生成消息,根据您所说的,您似乎有尖峰。
- 另一种可能性是集群中的其他 kafka 客户端(生产者或消费者)不一定使用相同的主题,导致这种情况发生,因为代理过载(检查代理 cpu/网络)。
要尽量减少导致此保留的原因,您应该将 buffer-memory
增加到 32MB 以上,认为 32MB 是默认值,您将其设置得更低。你拥有的越低,缓冲区就越容易变满,如果发生这种情况,它最多会阻塞 max.block.ms
,并且请求将在 request.timeout.ms< 后超时
.
您应该增加的另一个参数是batch-size
,这个参数以字节为单位,而不是消息的数量。还应增加 linger.ms,以防此生产者消息是在用户请求时间内创建的,不要增加太多,一个不错的选择可能是 1-4 毫秒。
当 batch.size
已满或需要比 linger.ms
更长的时间来拥有比 batch.size
更多的数据时,将发送消息>。大批量在正常情况下会增加吞吐量,但如果延迟太低则无济于事,因为您会在有足够数据获得 batch.size
之前发送。
同时重新检查生产者日志是否正确加载了属性。
关于java - Spring Kafka 生产者抛出 TimeoutExceptions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50725643/