spring - EmbeddedKafka如何在单元测试中检查收到的消息

标签 spring junit apache-kafka spring-kafka

我创建了一个 Spring Boot 应用程序,用于将消息发送到 Kafka 主题。我正在使用 Spring spring-integration-kafka : 一个KafkaProducerMessageHandler<String,String>订阅 channel ( SubscribableChannel ) 并将收到的所有消息推送到一个主题。 该应用程序运行良好。我看到消息通过控制台消费者(本地 kafka)到达 Kafka。

我还创建了一个使用 KafkaEmbedded 的集成测试。我正在通过订阅测试中的 channel 来检查预期的消息 - 一切都很好。

但我希望测试也检查放入 kafka 的消息。遗憾的是 Kafka 的 JavaDoc 并不是最好的。到目前为止我尝试过的是:

@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {

    mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
    kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );

}
//...

@Test
public void endToEnd() throws Exception {
//  ...

    ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );

    StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );


}

问题是我没有看到任何记录。我不确定我的 KafkaEmbedded 设置是否正确。 但消息是由 channel 接收的。

最佳答案

这对我有用。尝试一下

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaEmbeddedTest {

    private static String SENDER_TOPIC = "testTopic";

    @ClassRule
    // By default it creates two partitions.
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); 

    @Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        //If you wish to send it to partitions other than 0 and 1, 
        //then you need to specify number of paritions in the declaration

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();


        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
        // Make sure you set the offset as earliest, because by the 
        // time consumer starts, producer might have sent all messages
        consumerProps.put("auto.offset.reset", "earliest");

        final List<String> receivedMessages = Lists.newArrayList();
        final CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
            kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
            try {
                while (true) {
                    ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
                    records.iterator().forEachRemaining(record -> {
                        receivedMessages.add(record.value());
                        latch.countDown();
                    });
                }
            } finally {
                kafkaConsumer.close();
            }
        });

    latch.await(10, TimeUnit.SECONDS);
    assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
    }
}

我正在使用倒计时锁存器,因为 Producer.Send(..) 是一个异步操作。所以我在这里所做的就是在无限循环中等待,每 100 毫秒轮询一次 kafka,如果有新记录,如果有,则将其添加到列表中以供将来断言,然后减少倒计时。为了确定,我总共会等待 10 秒。
您也可以使用一个简单的循环,然后在几分钟后退出。(如果您不想使用 CountdownLatch 和 ExecutorService 之类的东西)

关于spring - EmbeddedKafka如何在单元测试中检查收到的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48682745/

相关文章:

java - 如果类没有 setter,如何将对象 Autowiring 到 bean

java - Karaf Startup 的 JUnitTest

apache-kafka - 无法连接和描述 Kafka 集群。检查工作人员的代理连接和安全属性

java - Apache Kafka 的流连接示例?

spring - Swagger codegen - 更改 api 和 Controller 类名称

java - 为什么表单提交会打开新窗口/标签?

spring - 如何在Spring表单内的 map 内填充 map ?

groovy - 如何让多个 MockFor 在 Groovy 中工作?

spring - 如何激活配置文件作为环境变量

java - Kafka能否用作分布式工作队列