java - 如何等待 `ReplyingKafkaTemplate` 的下一个响应?

标签 java spring apache-kafka messaging spring-kafka

我在 Spring Kafka 中实现了一个相对简单的即发即弃查询系统 https://github.com/trajano/spring-kafka-stream-example

目前的行为是

I need an answer to this question, whoever can answer first please tell me and I will trust it.

我想稍微改变一下行为

I need an answer to this question, whoever can answer first AND passes my internal test condition, please tell me and I will trust it.

但是,我在 ReplyingKafkaTemplate 中看不到我可以做的任何事情.根据 API 文档,我认为我可能必须扩展此类才能以某种方式添加该逻辑。

我的猜测是覆盖 onMessage() 但它会在下一行之前复制它

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);

添加消费者记录检查。

最佳答案

ReplyingKafkaTemplate 严格针对每个请求一个回复;多余的回复将被丢弃。

我们在 2.3 中添加了 AggregatingReplyingKafkaTemplate 正是针对这种情况 - 等待多个回复或超时。

这是一个测试用例...

    @KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
    @SendTo  // default REPLY_TOPIC header
    public String dListener1(String in) {
        return in.toUpperCase();
    }

    @KafkaListener(id = "def2", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
    @SendTo  // default REPLY_TOPIC header
    public String dListener2(String in) {
        return in.substring(0, 1) + in.substring(1).toUpperCase();
    }

@Test
public void testAggregateNormal() throws Exception {
    AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
            new TopicPartitionOffset(D_REPLY, 0), 2);
    try {
        template.setDefaultReplyTimeout(Duration.ofSeconds(30));
        ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
        RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
                template.sendAndReceive(record);
        future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
        ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
                future.get(30, TimeUnit.SECONDS);
        assertThat(consumerRecord.value().size()).isEqualTo(2);
        Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();
        String value1 = iterator.next().value();
        assertThat(value1).isIn("fOO", "FOO");
        String value2 = iterator.next().value();
        assertThat(value2).isIn("fOO", "FOO");
        assertThat(value2).isNotSameAs(value1);
        assertThat(consumerRecord.topic()).isEqualTo(AggregatingReplyingKafkaTemplate.AGGREGATED_RESULTS_TOPIC);
    }
    finally {
        template.stop();
        template.destroy();
    }
}

关于java - 如何等待 `ReplyingKafkaTemplate` 的下一个响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58938524/

相关文章:

当 Hibernate 应用程序加载数据以供只读使用时 Oracle 死锁

apache-kafka - Kafka 消费者(组)触发再平衡的条件

java - 从同一 JVM 运行 kafka 消费者和生产者时生产者速度缓慢

java - 扫描仪在空格字符后不读取

Java if 语句不满足

Java - 如果单词包含另一个单词的所有字母,则获取正则表达式

java - Apache Kafka 获取特定主题的消费者列表

java - 在服务器中找不到 Spring Boot 资源文件,但在 Eclipse 中可以使用

java - 设置在另一个 XML 文件中定义的 bean 的属性

java - Spring @PreDestroy : No logging because Logback stops too soon