我想使用 Kafka Producer send() 的回调方法,如下所示:
RecordMetadata recordmetadata = kafkaProducer.send(new ProducerRecord<>(topic,
null, timestamp, key, message), this::onCompletion);
private RecordMetadata onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
return metadata;
} else {
return null;
}
}
但是我有 onCompletion
方法返回的元数据或 null
由 send() 方法返回(因为调用 send()
的方法依赖于它)。
如何与回调方法结合使用send()
返回元数据或null
?
最佳答案
文档指出发送是异步的,一旦记录已存储在等待发送的记录缓冲区中,该方法将立即返回。为了让它按照您想要的方式工作,您必须为其实现自己的解决方案,例如使用全局标志:
private RecordMetadata recordMetadata;
private boolean onCompletetionExecuted = false;
kafkaProducer.send(new ProducerRecord<>(topic,
null, timestamp, key, auditorMessage), this::onCompletion);
while (!onCompletetionExecuted) {
// waiting, would be good to have a fixed timeout
}
// after this point the value of recordMetadata is the one returned by onCompletion
private void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
recordMetadata = metadata;
} else {
recordMetadata = null;
}
onCompletetionExecuted = true;
}
这远非优雅,但它可以完成工作。
关于java - 如何使用带有回调的 send() 方法返回元数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49546116/