java - Spring Cloud Stream (Hoxton) Kafka 生产者/消费者无法与 EmbeddedKafka 进行集成测试

标签 java apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka

我有一个工作应用程序,它使用 Hoxton 附带的 Producers 的最新更新。现在我尝试添加一些集成测试,断言生产者实际上正在按预期生成消息。问题是,我在测试中使用的消费者从未从主题中读取任何内容。

为了使这个问题可重现,我重新使用了 Spring Cloud Stream 示例中的一个项目( spring-cloud-stream-samples/source-samples/dynamic-destination-source-kafka ),并对其进行了如下调整:

DynamicDestinationSourceApplication(EmitterProcessor 现在是一个 bean)


@SpringBootApplication
@RestController
public class DynamicDestinationSourceApplication {

    @Autowired
    private ObjectMapper jsonMapper;

    @Autowired
    private EmitterProcessor<Message<?>> processor;

    public static void main(String[] args) {
        SpringApplication.run(DynamicDestinationSourceApplication.class, args);
    }

    @SuppressWarnings("unchecked")
    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destinationName = payload.get("id");
        Message<?> message = MessageBuilder.withPayload(payload)
                .setHeader("spring.cloud.stream.sendto.destination", destinationName).build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<Message<?>>> supplier() {
        return () -> processor;
    }

    @Bean
    public EmitterProcessor<Message<?>> processor(){
        return EmitterProcessor.create();
    }

    //Following sink is used as test consumer. It logs the data received through the consumer.
    static class TestSink {

        private final Log logger = LogFactory.getLog(getClass());

        @Bean
        public Consumer<String> receive1() {
            return data -> logger.info("Data received from customer-1..." + data);
        }

        @Bean
        public Consumer<String> receive2() {
            return data -> logger.info("Data received from customer-2..." + data);
        }
    }
}

模块应用测试

@EmbeddedKafka
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DynamicDestinationSourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

    private static String TOPIC = "someTopic";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private EmitterProcessor<Message<?>> processor;

    @Test
    public void shouldProduceAndConsume() {

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(Collections.singleton(TOPIC));
        consumer.poll(0);

        Message<?> message = MessageBuilder.withPayload(new HashMap<String,String>(){{put("somekey", "somevalue");}})
                .setHeader("spring.cloud.stream.sendto.destination", TOPIC).build();
        processor.onNext(message);

        ConsumerRecord<String, String> someRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
        System.out.println(someRecord);

    }

}

No records found for topic 结尾。为什么这在测试期间不起作用?

更新:

我的实际项目与上面的项目并不完全一样,我看到emitterProcessor.onNext()最终不会调用AbstractMessageHandler.onNext()

调试到emitterProcessor.onNext()我看到它调用 drain()并在 FluxPublish.PubSubInner<T>[] a = subscribers;订阅者是一个空数组,而在正常的应用程序执行中它包含一个 EmitterProcessor。

最佳答案

我错误地添加了 testImplementation("org.springframework.cloud:spring-cloud-stream-test-support") 作为依赖项。这使用了不适合与集成测试一起使用的测试绑定(bind)程序。

关于java - Spring Cloud Stream (Hoxton) Kafka 生产者/消费者无法与 EmbeddedKafka 进行集成测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60030609/

相关文章:

java - JPA 中的表联接 - 创建包含联接的实体

java - 如何在 spring 3.2 mvc 中接收复杂对象?

java - 使用 avro 序列化将整个 Json 发送到 kafka?

scala - 如何使用 Samza 在 Kafka 主题上创建分区?

apache-kafka - 如何保证Kafka分区中的顺序

spring-boot - 多个 @EnableBinding 与 Kafka Spring Cloud Stream

java - 无法将 int 字段设置为空值

java - Spring Kinesis 消费者太慢

spring-cloud-dataflow:流部署在 OSX 上失败

java - 如何将变量从 HTML 传递到 Java?