我正在处理 xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了 kafka 生产者,这里的问题是 kafka 生产者的发送方法是异步的,因此,有时当我关闭它发送的生产者 java.lang.IllegalStateException: 生产者关闭后无法发送。
我在某处读到可以让生产者保持打开状态。我的问题是:这意味着什么,或者是否有更好的解决方案。
---编辑---
<list>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
...
</list>
想象一下以下场景:
- 我们读取标签并创建 kafka 生产者
- 我们读取每个元素的属性,生成一个 json 对象并使用 send 方法将其发送到 kafka。 -当我们读取元素时,我们调用生产者中的关闭方法
问题元素的数量可以是 80k 因此,有时当我们调用 disconnect 方法时,它会继续以异步方式发送消息。所以我们需要先调用flush方法但是会影响性能
最佳答案
您应该在调用 Producer.close()
之前调用 Producer.flush()
。这是一个阻塞调用,不会在所有记录发送之前返回。
如果您不调用 close()
,根据实现/语言的不同,您可能会导致资源/内存泄漏。
关于java - 如果我不关闭 kafka 生产者会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43057042/