spring-kafka - Spring kafka嵌入式测试

标签 spring-kafka

我正在使用嵌入式kafka编写junit测试用例。我们有一个管道,其中 Producer > topic > Consumer > do work() > Produce 。 我正在使用第三方模式注册表(通过提供虚假网址来模拟此注册表)以及与其绑定(bind)的特定 serdes。在 kafka 用户组上讨论此问题后,执行此操作的方法是使用 一个模拟注册表,用于手动序列化数据并在生产者中传递 byte[] 本身而不是 avro record 。在这种情况下,我的消费者将如何失败,因为它期望特定的记录有效负载。关于如何解决此问题有什么想法吗?

//Listener method


    */
        @KafkaListener(topics = test1,id="tesId1")
        public void onMessage(@Payload Log log, 
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) Long offset) throws Exception
                 {

               }


    // test class
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @DirtiesContext
    @EmbeddedKafka(topics = { "test1" })
    @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
    public class ConsumerTests {

    }

最佳答案

只需使用原始KafkaTemplate(无泛型)和字节数组序列化器。

例如,使用 JSON 和 StringSerializer:

@SpringBootApplication
public class So53179695Application {

    public static void main(String[] args) {
        SpringApplication.run(So53179695Application.class, args);
    }

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(Foo in) {
        System.out.println(in);
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

@RunWith(SpringRunner.class)
@SpringBootTest
public class So53179695ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = 
        new EmbeddedKafkaRule(1, false, "foo");

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers",
            embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @Autowired
    public KafkaTemplate<String, Foo> template;

    @SuppressWarnings("rawtypes")
    @Autowired
    public KafkaTemplate rawTemplate;

    @SuppressWarnings("unchecked")
    @Test
    public void test() throws Exception {
//      template.send("foo", new Foo("bar"));
        rawTemplate.send("foo", "{\"bar\":\"baz\"}");
        Thread.sleep(10_000);
    }

}

Foo [bar=baz]

请注意,两个模板都指向同一个物理对象 - 由于 java 的类型删除,这在运行时并不重要。

这假设您仍在消费者端使用 Avro 反序列化器(或本例中的 JSON)。

或者您可以在消费者端使用模拟反序列化器来创建日志

关于spring-kafka - Spring kafka嵌入式测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53179695/

相关文章:

java - @KafkaListener不接收记录

java - Spring-Kafka消费者收不到消息

java - Kafka无法更新元数据

docker - 卡夫卡生产者抛出 'TimeoutException: Batch Expired'异常

apache-kafka - spring-kafka AckMode中MANUAL和MANUAL_IMMEDIATE有什么区别

spring-boot - Spring kafka,ConsumerSeekAware接口(interface),再平衡时是否调用onPartitionsAssigned方法

java - 如何从 spring ListenerContainerIdleEvent 执行手动偏移确认

java - SpringBoot、Kafka : java. lang.NoSuchMethodError : org. apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V

apache-kafka - 如何在 kafkalistener 中设置偏移量

java - 如何编写 Kafka 消费者——单线程 vs 多线程