我在 Spring Kafka 中实现了一个相对简单的即发即弃查询系统 https://github.com/trajano/spring-kafka-stream-example
目前的行为是
I need an answer to this question, whoever can answer first please tell me and I will trust it.
我想稍微改变一下行为
I need an answer to this question, whoever can answer first AND passes my internal test condition, please tell me and I will trust it.
但是,我在 ReplyingKafkaTemplate 中看不到我可以做的任何事情.根据 API 文档,我认为我可能必须扩展此类才能以某种方式添加该逻辑。
我的猜测是覆盖 onMessage()
但它会在下一行之前复制它
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
添加消费者记录检查。
最佳答案
ReplyingKafkaTemplate
严格针对每个请求一个回复;多余的回复将被丢弃。
我们在 2.3 中添加了 AggregatingReplyingKafkaTemplate
正是针对这种情况 - 等待多个回复或超时。
这是一个测试用例...
@KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
@SendTo // default REPLY_TOPIC header
public String dListener1(String in) {
return in.toUpperCase();
}
@KafkaListener(id = "def2", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
@SendTo // default REPLY_TOPIC header
public String dListener2(String in) {
return in.substring(0, 1) + in.substring(1).toUpperCase();
}
和
@Test
public void testAggregateNormal() throws Exception {
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
new TopicPartitionOffset(D_REPLY, 0), 2);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value().size()).isEqualTo(2);
Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();
String value1 = iterator.next().value();
assertThat(value1).isIn("fOO", "FOO");
String value2 = iterator.next().value();
assertThat(value2).isIn("fOO", "FOO");
assertThat(value2).isNotSameAs(value1);
assertThat(consumerRecord.topic()).isEqualTo(AggregatingReplyingKafkaTemplate.AGGREGATED_RESULTS_TOPIC);
}
finally {
template.stop();
template.destroy();
}
}
关于java - 如何等待 `ReplyingKafkaTemplate` 的下一个响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58938524/