使用 kafka 版本:2.0.1 和 kafka-streams-scala 版本 2.0.1
记录调试消息,例如:
DEBUG 2019-05-08 09:57:53,322 [he.kafka.clients.NetworkClient] [
] [ ]: [Consumer clientId=XXX-bd6b071d-a44f-4253-a3a5-539d60a72dd3-StreamThread-1-consumer, groupId=XXX] Disconnecting from node YYY due to request timeout."
引导我增加 request.timeout.ms 值:
private val config: Properties = new Properties
config.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, "240000")
...
private val streams: KafkaStreams = new KafkaStreams(topology, config)
但是,这会将 AdminClientConfig
和 ConsumerConfig
的新值设置为 240000ms(AdminClientConfig
的默认 request.timeout.ms 值> 和 ConsumerConfig
实际上是不同的——分别为 120000ms 和 40000ms)。
有没有办法为 AdminClientConfig
或 ConsumerConfig
设置 Kafka Streams 配置值,而不覆盖两者的值?
最佳答案
您可以在任何配置前加上 consumer.
或 admin.
前缀,以将其仅应用于一个客户端。
还有main.consumer.
、restore.consumer.
和global.consumer.
来进一步区分不同的消费者。使用 consumer.
作为前缀,该配置将应用于所有消费者。
最后,还有 生产者。
前缀(只是为了完整性而提及它)。
比较文档:https://docs.confluent.io/current/streams/developer-guide/config-streams.html#naming
关于scala - 卡夫卡流: configuring `AdminClientConfig` or `ConsumerConfig` without overriding the values both,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56036742/