java - 如果我不关闭 kafka 生产者会发生什么

标签 java apache-kafka kafka-producer-api

我正在处理 xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了 kafka 生产者,这里的问题是 kafka 生产者的发送方法是异步的,因此,有时当我关闭它发送的生产者 java.lang.IllegalStateException: 生产者关闭后无法发送。 我在某处读到可以让生产者保持打开状态。我的问题是:这意味着什么,或者是否有更好的解决方案。

---编辑---

<list>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
...
</list>

想象一下以下场景:

  • 我们读取标签并创建 kafka 生产者
  • 我们读取每个元素的属性,生成一个 json 对象并使用 send 方法将其发送到 kafka。 -当我们读取元素时,我们调用生产者中的关闭方法

问题元素的数量可以是 80k 因此,有时当我们调用 disconnect 方法时,它会继续以异步方式发送消息。所以我们需要先调用flush方法但是会影响性能

最佳答案

您应该在调用 Producer.close() 之前调用 Producer.flush()。这是一个阻塞调用,不会在所有记录发送之前返回。

如果您不调用 close(),根据实现/语言的不同,您可能会导致资源/内存泄漏。

关于java - 如果我不关闭 kafka 生产者会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43057042/

相关文章:

python - 如何以编程方式更新 Confluent Schema Registry 中的主题架构和兼容性

java - Spring Kafka 生产者抛出 TimeoutExceptions

java - Kafka无法更新元数据

java - Android 可绘制范例!

gradle - 使用 gradle 构建 kafka 源时出错

java - 此正则表达式行超出了我的理解 "(?=(?:\d{3})++(?!\d))"

serialization - Flink流: Unexpected charaters in serialized String messages

java - 如何将流式 json 数据作为键值对发送到 kafka 消费者中

java - Helper 和 Utility 类之间有什么区别?

java - 如何检索对应的xpath