java - Kafka Streams 应用程序在 kafka 服务器上打开太多文件

标签 java apache-kafka apache-kafka-streams

我一直在开发一个基于 java kafka-streams API 的应用程序,其目标是处理来自一个 kafka 主题的数据流,并将其生成到另一个主题中。

看起来,每当我开始使用 kafka-streams 应用程序生成消息时,文件句柄就会在我使用的 kafka 代理上不断打开,并且它们永远不会关闭,这意味着最终 kafka 服务器会打开太多文件,并且 kafka 和 Zookeeper 守护进程会崩溃。

我正在使用适用于 Java 的 kafka-streams-1.0.1 API jar,并在 JDK 11 上运行。kafka 集群的 Kafka 版本为 1.0.0。

我的应用程序的配置包括以下 kafka 生产者配置:

  • batch.size:设置为 100,000 条消息。
  • linger.ms:设置为 1,000 毫秒。
  • buffer.memory:设置为相当于 5 MB 的字节。

流处理本身非常简单,由以下部分组成:

stream.map((k,v) -> handle(k,v)).filter((k,v) -> v != null).to(outgoingTopic);

如果你们有任何建议,我将不胜感激。

最佳答案

使用 Java 8 或 Java 10 或更低版本,并且 使用最新的Kafka,https://kafka.apache.org/quickstart

请参阅此处有关错误提交的一些报告 https://issues.apache.org/jira/browse/KAFKA-6855

关于java - Kafka Streams 应用程序在 kafka 服务器上打开太多文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53237372/

相关文章:

java - 缩放线段 - Java

java - 如何在 JComboBox 中正确渲染我自己的类?

apache-kafka - 如何将单个 csv 文件缓存到 Kafka 中的 KTable 中?

apache-spark - Spark 结构化流恰好一次 - 未实现 - 重复事件

java - Kafka和Storm如何实现 Multi-Tenancy ?

apache-kafka-streams - 卡夫卡流: Dynamically Configure RocksDb

java - 通过互联网发送对象并调用它的方法

java - java中是否存在内部异常概念

apache-kafka - MockSchemaRegistryClient 未注册 avro 模式 : Cannot get schema from schema registry

java - Kafka Streams 分组和串联