java - Kafka KStreams - 如何添加线程/使用 StreamsConfig.NUM_STREAM_THREADS_CONFIG

标签 java apache-kafka apache-kafka-streams

我正在弄乱这个参数并遇到了一些奇怪的事情。没有它我的应用程序运行正常,但是当我将此行添加到配置时:

 config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");

CPU 使用率不会攀升至零以上。应用似乎没有做任何事情。没有错误。

是否有一些建议的方法来增加 KStreams 应用程序的线程使用率?或者只是“相信原力”,让它一起运行?


编辑:

  1. 我有两个分区
  2. 已通过 kafka-consumer-groups 检查消费者延迟 - 大量可用记录
  3. 即使只有 1 个分区 - 为什么多个线程什么都不做? 0% CPU。

最佳答案

你有多少分区?如果您只有一个分区,那么增加线程数不会有任何影响,因为分区数定义了最大并行度。所以如果你有 1 个分区和 3 个线程,你将只有 1 个繁忙线程。

检查是否有可用于输入主题的数据。确保将 StreamsConfig.AUTO_OFFSET_RESET_CONFIG 设置为 latest。 如果您之前使用相同的 applicationId 运行它,那么 Kafka Streams 可能已经消耗了所有数据,因此无需执行任何操作。在这种情况下,您可以使用不同的 applicationId 或者您可以使用 Kafka Streams Reset Tool重置主题。

此设置用于某些测试,即 KStreamRepartitionJoinTest并且似乎工作正常。

关于java - Kafka KStreams - 如何添加线程/使用 StreamsConfig.NUM_STREAM_THREADS_CONFIG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39358883/

相关文章:

spring-boot - StreamsException : Unable to initialize state, 如果 Kafka Streams 的多个实例在同一状态目录中运行,则可能发生这种情况

apache-kafka - 滑动窗口中Kafka KStream相关消息事件

java - 启用 spring aop 回避依赖注入(inject)

apache-kafka - Kafka 分区和吞吐量

java - 如何在 Hazelcast 中获取本地 map 条目

apache-kafka - pulsar和kafka在消费方面有什么区别?

apache-kafka - 我在哪里/如何为 kafka 设置 ack

spring-boot - 如何在 spring boot 中配置两个 Kafka StreamsBuilderFactoryBean 实例

java - 顶点抛出 IllegalStateException : Response has already been written

java - 获取数组的正确索引值