我有一个多线程应用程序,它使用生产者类来生成消息,之前我使用下面的代码为每个请求创建生产者。KafkaProducer 是用每个请求新建的,如下所示:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
然后我阅读了关于生产者的 Kafka 文档,了解到我们应该使用单个生产者实例以获得良好的性能。
然后我在单例类中创建了 KafkaProducer 的单个实例。
现在我们应该何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它不会找到生产者重新发送消息,因此抛出:
java.lang.IllegalStateException: Cannot send after the producer is closed.
或者我们如何在关闭后重新连接到生产者。 问题是如果程序崩溃或有异常呢?
最佳答案
通常,在 KafkaProducer
上调用 close()
足以确保所有飞行记录已完成:
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
如果您的生产者在您的应用程序的整个生命周期中都在使用,请不要关闭它,直到您收到终止信号,然后调用 close()
。如文档中所述,生产者在多线程环境中使用是安全的,因此您应该重复使用同一个实例。
如果您在多个线程中共享您的KafkaProducer
,您有两个选择:
- 在主执行线程中通过
Runtime.getRuntime().addShutdownHook
注册关闭回调时调用close()
- 让您的多线程方法竞争关闭,只允许一个方法获胜。
2 的粗略草图可能如下所示:
object KafkaOwner {
private var producer: KafkaProducer = ???
@volatile private var isClosed = false
def close(): Unit = {
if (!isClosed) {
kafkaProducer.close()
isClosed = true
}
}
def instance: KafkaProducer = {
this.synchronized {
if (!isClosed) producer
else {
producer = new KafkaProducer()
isClosed = false
}
}
}
}
关于java - 关闭后如何重新连接kafka生产者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39917046/