spring-kafka - 如何模拟 KafkaTemplate 的结果

标签 spring-kafka

我有一种发送 kafka 消息的方法,如下所示:

@Async
public void sendMessage(String topicName, Message message) {
    ListenableFuture<SendResult<String, Message >> future = kafkaTemplate.send(topicName, message);

    future.addCallback(new ListenableFutureCallback<>() {

        @Override
        public void onSuccess(SendResult<String, Message > result) {
            //do nothing
        }

        @Override
        public void onFailure(Throwable ex) {
            log.error("something wrong happened"!);
        }
    });
}

现在我正在为它编写单元测试。我还想测试两个回调方法 onSuccessonFailure方法,所以我的想法是模拟 KafkaTemplate,例如:
KafkaTemplate kafkaTemplate = Mockito.mock(KafkaTemplate.class);

但是现在我陷入了这两种情况的模拟结果上:
when(kafkaTemplate.send(anyString(), any(Message.class))).thenReturn(????);

我应该在 thenReturn 中放什么案例成功和案例失败的方法?请问有人有什么想法吗?非常感谢!

最佳答案

您可以模拟模板,但最好模拟界面。

    Sender sender = new Sender();
    KafkaOperations template = mock(KafkaOperations.class);
    SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>();
    when(template.send(anyString(), any(Message.class))).thenReturn(future);
    sender.setTemplate(template);
    sender.send(...);

    future.set(new SendResult<>(...));

    ...or...

    future.setException(...

关于spring-kafka - 如何模拟 KafkaTemplate 的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57475464/

相关文章:

java - Spring Cloud Stream嵌入 header 格式(Kafka)

spring-kafka - 寻找在 Spring Kafka 2.1.0 中使用自定义 ConsumerAwareRebalanceListener 的工作示例

java - AWS Kafka (MSK) - 如何生成 keystore 和信任库并在我的 Spring Cloud Stream 应用程序中使用它们?

java - 使用 SpringBoot 使用 Batch 从 Kafka 读取数据无法正常工作

spring-boot - 如何使用 Spring Cloud Stream Supplier 向 Kafka 发送键控消息

java - ReplyingKafkaTemplate 是否阻塞?

java - 具有多个 JsonSerializer 映射的 DefaultKafkaProducerFactory

java - Spring Kafka - 为任何主题的分区消耗最后 N 条消息

spring - 如果消息失败并由 AfterRollbackProcessor 处理,如何在 Spring Kafka 中提交偏移量

spring - 即使 Kafka 监听器 (spring-kafka) 未初始化,如何启动 spring 应用程序