java - 如何确保消息到达Kafka Broker?

标签 java apache-kafka

我的本​​地计算机上有一个消息生成器,在远程主机 (aws) 上有一个代理。

生产者发送消息后, 我等待并调用远程主机上的控制台消费者并 查看过多的日志。 没有来自生产者的值(value)。

生产者在调用send方法后刷新数据。 一切都配置正确。

如何检查代理是否收到了来自生产者的消息以及生产者是否收到了答案?

最佳答案

Send方法异步将消息发送到主题,并且 返回 FutureRecordMetadata .

java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously sends a record to a topic

flush之后称呼, 通过调用 isDone 检查 Future 是否已完成方法。 (例如 Future.isDone() == true )

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). A request is considered completed when it is successfully acknowledged according to the acks configuration you have specified or else it results in an error.

RecordMetadata包含offsetpartition

公共(public) int 分区()

The partition the record was sent to

公共(public)长偏移()

the offset of the record, or -1 if {hasOffset()} returns false.

或者您也可以使用Callback确保消息是否发送到主题的函数

Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

这里是文档中的清晰示例

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null) {
                      e.printStackTrace();
                   } else {
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               }
           });

关于java - 如何确保消息到达Kafka Broker?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54595952/

相关文章:

java - Spring Cloud 溪流和动物园管理员

apache-zookeeper - Kafka 消费者启动错误 : Failed to add leader for partitions [calls, 0] - NotLeaderForPartitionException

java - 将运行时(元)数据传递给 CDI 中的生产者方法

node.js - Kafka Node - client.createTopic 失败代码 38

apache-spark - pyspark.sql.utils.AnalysisException : Failed to find data source: kafka

apache-kafka - kafka artifactIds kafka_2.10 和 kafka-clients 有什么区别?

java - Spring Boot - 自定义查询上的空指针

java - 带有 javaconfig PropertySource 的 SpringMVC 失败

java - 扫描仪 nextLine() 允许输入一个或两个单词

java - powermock 中意外的方法调用异常