我正在使用嵌入式kafka编写junit测试用例。我们有一个管道,其中 Producer > topic > Consumer > do work() > Produce 。 我正在使用第三方模式注册表(通过提供虚假网址来模拟此注册表)以及与其绑定(bind)的特定 serdes。在 kafka 用户组上讨论此问题后,执行此操作的方法是使用 一个模拟注册表,用于手动序列化数据并在生产者中传递 byte[] 本身而不是 avro record 。在这种情况下,我的消费者将如何失败,因为它期望特定的记录有效负载。关于如何解决此问题有什么想法吗?
//Listener method
*/
@KafkaListener(topics = test1,id="tesId1")
public void onMessage(@Payload Log log,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset) throws Exception
{
}
// test class
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "test1" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ConsumerTests {
}
最佳答案
只需使用原始KafkaTemplate
(无泛型)和字节数组序列化器。
例如,使用 JSON 和 StringSerializer
:
@SpringBootApplication
public class So53179695Application {
public static void main(String[] args) {
SpringApplication.run(So53179695Application.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@KafkaListener(id = "foo", topics = "foo")
public void listen(Foo in) {
System.out.println(in);
}
public static class Foo {
private String bar;
public Foo() {
super();
}
Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
@RunWith(SpringRunner.class)
@SpringBootTest
public class So53179695ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka =
new EmbeddedKafkaRule(1, false, "foo");
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers",
embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@Autowired
public KafkaTemplate<String, Foo> template;
@SuppressWarnings("rawtypes")
@Autowired
public KafkaTemplate rawTemplate;
@SuppressWarnings("unchecked")
@Test
public void test() throws Exception {
// template.send("foo", new Foo("bar"));
rawTemplate.send("foo", "{\"bar\":\"baz\"}");
Thread.sleep(10_000);
}
}
和
Foo [bar=baz]
请注意,两个模板都指向同一个物理对象 - 由于 java 的类型删除,这在运行时并不重要。
这假设您仍在消费者端使用 Avro 反序列化器(或本例中的 JSON)。
或者您可以在消费者端使用模拟反序列化器来创建日志
。
关于spring-kafka - Spring kafka嵌入式测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53179695/