我的本地计算机上有一个消息生成器,在远程主机 (aws) 上有一个代理。
生产者发送消息后, 我等待并调用远程主机上的控制台消费者并 查看过多的日志。 没有来自生产者的值(value)。
生产者在调用send方法后刷新数据。 一切都配置正确。
如何检查代理是否收到了来自生产者的消息以及生产者是否收到了答案?
最佳答案
Send
方法异步将消息发送到主题,并且
返回 Future
的RecordMetadata
.
java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
Asynchronously sends a record to a topic
在flush
之后称呼,
通过调用 isDone
检查 Future 是否已完成方法。
(例如 Future.isDone() == true
)
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). A request is considered completed when it is successfully acknowledged according to the acks configuration you have specified or else it results in an error.
RecordMetadata
包含offset
和 partition
公共(public) int 分区()
The partition the record was sent to
公共(public)长偏移()
the offset of the record, or -1 if {hasOffset()} returns false.
或者您也可以使用Callback确保消息是否发送到主题的函数
Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.
这里是文档中的清晰示例
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
关于java - 如何确保消息到达Kafka Broker?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54595952/