java - 如何使用带有回调的 send() 方法返回元数据?

标签 java kafka-producer-api confluent-platform

我想使用 Kafka Producer send() 的回调方法,如下所示:

RecordMetadata recordmetadata = kafkaProducer.send(new ProducerRecord<>(topic,
                null, timestamp, key, message), this::onCompletion);

private RecordMetadata onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
        return metadata;
    } else {
        return null;
    }
}

但是我有 onCompletion 方法返回的元数据或 null 由 send() 方法返回(因为调用 send() 的方法依赖于它)。

如何与回调方法结合使用send()返回元数据或null

最佳答案

文档指出发送是异步的,一旦记录已存储在等待发送的记录缓冲区中,该方法将立即返回。为了让它按照您想要的方式工作,您必须为其实现自己的解决方案,例如使用全局标志:

private RecordMetadata recordMetadata;
private boolean onCompletetionExecuted = false;

kafkaProducer.send(new ProducerRecord<>(topic,
                null, timestamp, key, auditorMessage), this::onCompletion);
while (!onCompletetionExecuted) {
    // waiting, would be good to have a fixed timeout
}
// after this point the value of recordMetadata is the one returned by onCompletion

private void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
        recordMetadata = metadata;
    } else {
        recordMetadata = null;
    }
    onCompletetionExecuted = true;
}

这远非优雅,但它可以完成工作。

关于java - 如何使用带有回调的 send() 方法返回元数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49546116/

相关文章:

java - Java Card 中的 Exp 文件和小程序依赖项

C++ with Kafka - 消费者刚刚收到一些生产者消息

java - 如果一个分区受到限制,如何对 kafka 中的剩余分区应用循环法

apache-kafka - 是否可以从 kafka 消息中获取消息 key 的最新值

mysql - 使用 Helm 安装的 Kafka/Confluent 的连接器

java - 如何在我的应用程序中使用Android默认墙纸裁剪选项来设置墙纸

java - 在 Cassandra 中实现 FIFO 读取

java - 让类(class)相互交换信息的最佳方式?

apache-kafka - 发布和使用不同类型消息的最佳方式是什么?

go - 对于主题的某些分区,kafka 偏移量和滞后是未知的