java - Spring Kafka Embedded - 测试之间已经存在主题

标签 java spring-kafka junit5

我创建了一组带有嵌入式kafka(spring-kafka-test)的测试(JUnit 5),当我有时(并非总是)运行它们时,我得到“主题'some_name'已经存在 “在单次运行中进行一项或多项测试。

所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试类具有 DirtiesContext 注释(AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么以及如何解决。

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
    private final static String SERVER_ADDRES = "127.0.0.1:9092";

    private Consumer<String, String> prepareConsumer() {
        Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(singleton("some_name"));
        return consumer;
    }

    @Test
    public void someMethodWithKafka1() {
        // some logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }

    @Test
    public void someMethodWithKafka2() {
        // some other logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }
}

最佳答案

您有两名经纪人;您自己创建的一个:

private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);

还有一个由 Spring 管理:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)

当您在 Spring 测试上下文中使用 @EmbeddedKafka 时;代理已添加到上下文中。

更改为

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

并且不要添加其他 bean。

通常,为每个测试使用不同的主题会更容易(也更快);避免为每个测试创建一个代理。

编辑

ports = 9092

使用随机端口(省略此配置)并使用

configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());

关于java - Spring Kafka Embedded - 测试之间已经存在主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59877538/

相关文章:

android - 如何在 JUnit 5 扩展中存储值并在参数化测试中注入(inject)

java - 什么导致junit警告org.junit.platform.launcher.core.EngineDiscoveryOrchestrator lambda$logTestDescriptorExclusionReasons$7

docker - Kafka Docker、docker-maven-plugin、Spring Boot

java - JUnit 5 无法模拟从被测函数调用的函数

java - 每次迭代后删除二维数组的列和行

java - Android 6.0 和 View 背景颜色的奇怪问题

spring-kafka - 如果没有消息,则不调用使用@KafkaListener 注释的方法

java - Java处理Kafka超时异常

java - 使用递归重复相同的数字

java - map 在 OpenStreetMap (osm) 中不可见