java - Spring Kafka 中的 Kafka Consumer/Producer 测试

标签 java spring apache-kafka spring-test spring-kafka

我目前正在开发 Kafka 模块,我正在使用 Kafka 通信的 spring-kafka 抽象。我能够从实际实现的角度集成生产者和消费者,但是,我不确定如何使用 @KafkaListener 测试(特别是集成测试)消费者周围的业务逻辑。我尝试关注 spring-kafk 文档和有关该主题的各种博客,但没有一个能回答我的预期问题。

Spring Boot 测试类

//imports not mentioned due to brevity

@RunWith(SpringRunner.class)
@SpringBootTest(classes = PaymentAccountUpdaterApplication.class,
                webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class CardUpdaterMessagingIntegrationTest {

    private final static String cardUpdateTopic = "TP.PRF.CARDEVENTS";

    @Autowired
    private ObjectMapper objectMapper;

    @ClassRule
    public static KafkaEmbedded kafkaEmbedded =
            new KafkaEmbedded(1, false, cardUpdateTopic);

    @Test
    public void sampleTest() throws Exception {
        Map<String, Object> consumerConfig =
                KafkaTestUtils.consumerProps("test", "false", kafkaEmbedded);
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfig);
        ContainerProperties containerProperties = new ContainerProperties(cardUpdateTopic);
        containerProperties.setMessageListener(new SafeStringJsonMessageConverter());
        KafkaMessageListenerContainer<String, String>
                container = new KafkaMessageListenerContainer<>(cf, containerProperties);

        BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) data -> {
            System.out.println("Added to Queue: "+ data);
            records.add(data);
        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container, kafkaEmbedded.getPartitionsPerTopic());


        Map<String, Object> producerConfig = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        ProducerFactory<String, Object> pf =
                new DefaultKafkaProducerFactory<>(producerConfig);
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(pf);

        String payload = objectMapper.writeValueAsString(accountWrapper());
        kafkaTemplate.send(cardUpdateTopic, 0, payload);
        ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);

        assertThat(received).has(partition(0));
    }


    @After
    public void after() {
        kafkaEmbedded.after();
    }

    private AccountWrapper accountWrapper() {
        return AccountWrapper.builder()
                .eventSource("PROFILE")
                .eventName("INITIAL_LOAD_CARD")
                .eventTime(LocalDateTime.now().toString())
                .eventID("8730c547-02bd-45c0-857b-d90f859e886c")
                .details(AccountDetail.builder()
                        .customerId("idArZ_K2IgE86DcPhv-uZw")
                        .vaultId("912A60928AD04F69F3877D5B422327EE")
                        .expiryDate("122019")
                        .build())
                .build();
    }
}

监听类

@Service
public class ConsumerMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessageListener.class);

    private ConsumerMessageProcessorService consumerMessageProcessorService;

    public ConsumerMessageListener(ConsumerMessageProcessorService consumerMessageProcessorService) {
        this.consumerMessageProcessorService = consumerMessageProcessorService;
    }


    @KafkaListener(id = "cardUpdateEventListener",
            topics = "${kafka.consumer.cardupdates.topic}",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void processIncomingMessage(Payload<AccountWrapper,Object> payloadContainer,
                                       Acknowledgment acknowledgment,
                                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                                       @Header(KafkaHeaders.OFFSET) String offset) {

        try {
            // business logic to process the message
            consumerMessageProcessorService.processIncomingMessage(payloadContainer);
        } catch (Exception e) {
            LOGGER.error("Unhandled exception in card event message consumer. Discarding offset commit." +
                    "message:: {}, details:: {}", e.getMessage(), messageMetadataInfo);
            throw e;
        }
        acknowledgment.acknowledge();
    }
}

我的问题是:在测试类中,我断言从 BlockingQueue 轮询的分区、有效负载等,但是,我的问题是如何验证我在类中注释的业务逻辑@KafkaListener 得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在某些示例中,我看到 CountDownLatch 断言我不想将其放入我的业务逻辑中以在生产级代码中断言。此外,消息处理器是 Async 所以,如何断言执行,不确定。

任何帮助,不胜感激。

最佳答案

is getting executed properly and routing the messages to different topic based on error handling and other business scenarios.

集成测试可以从那个“不同的”主题中消费,以断言监听器按预期处理了它。

您还可以将 BeanPostProcessor 添加到您的测试用例中,并将 ConsumerMessageListener bean 包装在代理中以验证输入参数是否符合预期。

编辑

这是一个将监听器包装在代理中的示例...

@SpringBootApplication
public class So53678801Application {

    public static void main(String[] args) {
        SpringApplication.run(So53678801Application.class, args);
    }

    @Bean
    public MessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

@Component
class Listener {

    @KafkaListener(id = "so53678801", topics = "so53678801")
    public void processIncomingMessage(Foo payload,
            Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
            @Header(KafkaHeaders.OFFSET) String offset) {

        System.out.println(payload);
        // ...
        acknowledgment.acknowledge();
    }

}

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So53678801Application.class,
        So53678801ApplicationTests.TestConfig.class})
public class So53678801ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embededKafka = new EmbeddedKafkaRule(1, false, "so53678801");

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers",
                embededKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Autowired
    private ListenerWrapper wrapper;

    @Test
    public void test() throws Exception {
        this.template.send("so53678801", "{\"bar\":\"baz\"}");
        assertThat(this.wrapper.latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.wrapper.argsReceived[0]).isInstanceOf(Foo.class);
        assertThat(((Foo) this.wrapper.argsReceived[0]).getBar()).isEqualTo("baz");
        assertThat(this.wrapper.ackCalled).isTrue();
    }

    @Configuration
    public static class TestConfig {

        @Bean
        public static ListenerWrapper bpp() { // BPPs have to be static
            return new ListenerWrapper();
        }

    }

    public static class ListenerWrapper implements BeanPostProcessor, Ordered {

        private final CountDownLatch latch = new CountDownLatch(1);

        private Object[] argsReceived;

        private boolean ackCalled;

        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }

        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            if (bean instanceof Listener) {
                ProxyFactory pf = new ProxyFactory(bean);
                pf.setProxyTargetClass(true); // unless the listener is on an interface
                pf.addAdvice(interceptor());
                return pf.getProxy();
            }
            return bean;
        }

        private MethodInterceptor interceptor() {
            return invocation -> {
                if (invocation.getMethod().getName().equals("processIncomingMessage")) {
                    Object[] args = invocation.getArguments();
                    this.argsReceived = Arrays.copyOf(args, args.length);
                    Acknowledgment ack = (Acknowledgment) args[1];
                    args[1] = (Acknowledgment) () -> {
                        this.ackCalled = true;
                        ack.acknowledge();
                    };
                    try {
                        return invocation.proceed();
                    }
                    finally {
                        this.latch.countDown();
                    }
                }
                else {
                    return invocation.proceed();
                }
            };
        }

    }

}

关于java - Spring Kafka 中的 Kafka Consumer/Producer 测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53678801/

相关文章:

java - AspectJ 建议未通过单元测试执行

java - Kafka消费者多次收到同一条消息

java - 数据流/ApacheBeam 将输入限制为前 X 数量?

java - 在文件中添加逗号

java - 无法使用 @Before Hook 获取场景对象

java - 来自不同 JAR 的 Spring Bean XML 声明

java - 将PlatformTransactionManager注入(inject)Ehcache

docker - Apache kafka 无效接收大小

spring - 我如何在 spring 应用程序中配置 ssl 工件?

java - 在 Eclipse 中,构建 Android 应用程序时,我以某种方式删除了 "R"类,并且无法让 Eclipse 再次生成它