scala - 卡夫卡流: configuring `AdminClientConfig` or `ConsumerConfig` without overriding the values both

标签 scala apache-kafka kafka-consumer-api apache-kafka-streams

使用 kafka 版本:2.0.1 和 kafka-streams-scala 版本 2.0.1

记录调试消息,例如:

DEBUG 2019-05-08 09:57:53,322 [he.kafka.clients.NetworkClient] [
] [ ]: [Consumer clientId=XXX-bd6b071d-a44f-4253-a3a5-539d60a72dd3-StreamThread-1-consumer, groupId=XXX] Disconnecting from node YYY due to request timeout."

引导我增加 request.timeout.ms 值:

  private val config: Properties = new Properties
  config.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, "240000")
...
  private val streams: KafkaStreams = new KafkaStreams(topology, config)

但是,这会将 AdminClientConfigConsumerConfig 的新值设置为 240000ms(AdminClientConfig 的默认 request.timeout.ms 值> 和 ConsumerConfig 实际上是不同的——分别为 120000ms 和 40000ms)。

有没有办法为 AdminClientConfigConsumerConfig 设置 Kafka Streams 配置值,而不覆盖两者的值?

最佳答案

您可以在任何配置前加上 consumer.admin. 前缀,以将其仅应用于一个客户端。

还有main.consumer.restore.consumer.global.consumer.来进一步区分不同的消费者。使用 consumer. 作为前缀,该配置将应用于所有消费者。

最后,还有 生产者。 前缀(只是为了完整性而提及它)。

比较文档:https://docs.confluent.io/current/streams/developer-guide/config-streams.html#naming

关于scala - 卡夫卡流: configuring `AdminClientConfig` or `ConsumerConfig` without overriding the values both,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56036742/

相关文章:

java - Kafka Consumer 获取特定主题的分配分区

java - kafka_2.11-0.8.2.1 消费者再平衡

scala - 我如何关闭 Finagle Thrift 客户端?

apache-kafka - 来自kafka的元数据信息

scala - 如何使用键将值映射到Spark DataFrame中的列

docker - Docker设置-多个容器之间的联网

go - 我如何知道在 Kafka 中使用哪个分区?

go - Golang Consumer连接Kafka后延迟接收Kafka消息

scala - Scala中的惰性迭代器?

scala - 为什么结构类型的编译时生成技术会阻止单独编译?