spring-boot - 测试容器中的 Kafka 监听器,避开端口 9092

标签 spring-boot apache-kafka testcontainers

我编写了一个集成测试来显示发送到 Kafka 的消息将通过监听器到达。当且仅当我使用 KAFKA_PORT=9092 时它才会通过。该常量是开发人员机器(或 CI 机器)上使用的端口。

最终我想在动态分配的端口上执行此操作(即使用 GenericContainer 而不是 FixedHostPortGenericContainer),但目前我只是希望能够使用不同的端口。

如果我在下面的代码中设置 KAFKA_PORT=59092 ,则测试失败,并且我看到控制台输出,例如 无法建立到节点 -1 (localhost/127.0.0.1:9092) 的连接 例如:

2020-06-08 12:16:22.374  WARN 1371 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

我假设我需要做一些额外的配置,这样就不会 尝试使用端口 9092,但我无法做到这一点。

下面是重新创建的、剥离的回测,以及相关的 gradle.build

KafaSpikeFixedPort.java

package com.example.kafkaspike;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@Testcontainers
public class KafaSpikeFixedPort {

    // Test works only if this port is 9092 (matching the Docker container port)
    final static int KAFKA_PORT = 9092;

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.producer.bootstrap-servers",
                     () -> "kubernetes.docker.internal:"+ KAFKA_PORT);
        registry.add("spring.kafka.consumer.group-id",
                     () -> "test-consumer-group");
    }

    @Container
    private GenericContainer kafkaContainer =
            new FixedHostPortGenericContainer("obsidiandynamics/kafka:2.3.0-11")
                    .withFixedExposedPort(KAFKA_PORT, 9092)
                    .withExtraHost("kubernetes.docker.internal", "127.0.0.1")
                    .withEnv("KAFKA_LISTENERS",
                             "INTERNAL://:29092," +
                             "EXTERNAL://:"+KAFKA_PORT)
                    .withEnv("KAFKA_ADVERTISED_LISTENERS",
                             "INTERNAL://kubernetes.docker.internal:29092," +
                             "EXTERNAL://kubernetes.docker.internal:"+KAFKA_PORT)
                    .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
                             "INTERNAL:PLAINTEXT," +
                             "EXTERNAL:PLAINTEXT")
                    .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME",
                             "INTERNAL")
                    .waitingFor(Wait.forLogMessage(
                            ".*INFO\\s+\\[KafkaServer\\s+id=\\d+\\]" +
                            "\\s+started\\s+\\(kafka.server.KafkaServer\\).*",
                            1));

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    private List<String> payloadsReceived = new ArrayList<>();

    @KafkaListener(autoStartup = "false", topics = "topic1")
    public void onMessage(@Payload String payload) {
        payloadsReceived.add(payload);
    }

    @BeforeEach
    public void beforeEach() {
        payloadsReceived.clear();
        for(MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            listenerContainer.start();
        }
        sleep(2_000); // Just for the spike. (Eliminates code checking the listener container states.)
    }

    @AfterEach
    public void afterEach() {
        for(MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            listenerContainer.stop();
        }
        sleep(2_000); // Just for the spike. (Eliminates code checking the listener container states.)
    }

    @Test
    @Timeout(value = 3, unit = TimeUnit.SECONDS)
    public void test() {
        kafkaTemplate.send("topic1", "Hello World!");
        sleep(1_000); // Just for the spike. (Wait for message in production test, with test timeout.)
        assertAll(
                () -> assertEquals(1, payloadsReceived.size()),
                () -> assertEquals("Hello World!", payloadsReceived.get(0))
                 );
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch(InterruptedException e) {
        }
    }
}

gradle.build

plugins {
    id 'org.springframework.boot' version '2.2.7.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    // for TestContainers
    testCompile group: 'org.testcontainers', name: 'testcontainers', version: "1.14.1"
    testCompile group: 'org.testcontainers', name: 'junit-jupiter', version: "1.14.1"
}

test {
    useJUnitPlatform()
}

最佳答案

有原因 Testcontainers' Kafka module存在:)

它通过推迟运行 Kafka 进程来处理端口设置,以便它可以提供实际分配的随机端口 ADVERTISED_HOST环境变量。

尝试一下,或者查看一些灵感的来源:
https://github.com/testcontainers/testcontainers-java/tree/master/modules/kafka

关于spring-boot - 测试容器中的 Kafka 监听器,避开端口 9092,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62261265/

相关文章:

java - Tomcat - 进一步发生的 HTTP header 解析错误将在 DEBUG 级别记录

spring - 如何以编程方式为 Spring 中的集成测试填充测试数据?

apache-kafka - kafka-python:生产者无法连接

java - 无法为 Kafka 流打开存储,因为状态无效

apache-kafka - 使用 Kafka 流进行事件溯源

spring-boot - 在 Spring Boot 集成测试中使用 TestContainers 填充数据库

java - 如何检查电子邮件是否存在,除了请求的用户之外?

java - 如何将 Testcontainers 与 @DataJpaTest 结合起来避免代码重复?

java - 如何从java代码修改现有的docker-compose文件

java - 使用 JUnit 5 的并发 Spring Boot 测试正在启动多个 JVM