java - Spring 的卡夫卡。未启动 EmbeddedKafkaBroker

标签 java apache-kafka spring-kafka

我正在编写 Kafka Broker 和 Consumer 代码来捕获来自应用程序的消息。尝试从Consumer获取消息时,发生错误

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)

在应用端(Producer),同样出现连接错误

2020-03-25 12:29:33.689  WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.

我的项目有以下依赖:

compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"

我的Kafka Broker代码

public class KafkaServer {

    private static final String BROKERPORT = "9092";
    private static final String BROKERHOST = "localhost";
    public static final String TOPIC1 = "fss-fsstransdata";
    public static final String TOPIC2 = "fss-fsstransscores";
    public static final String TOPIC3 = "fss-fsstranstimings";
    public static final String TOPIC4 = "fss-fssdevicedata";
    @Getter
    private Consumer<String, String> consumer;

    private EmbeddedKafkaBroker embeddedKafkaBroker;

    public void run() {

        String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};

        this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
                1,
                false,
                1,
                topics
        ).kafkaPorts(BROKERPORT);

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
        this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();

        this.consumer.subscribe(Arrays.asList(topics));
    } 
}

请帮助处理这种情况。我不擅长 kafka 架构以及如何在 Spring 上实现它。

最佳答案

EmbeddedKafkaBroker 旨在从 Spring 应用程序上下文或 JUnit4 @Rule@ClassRule 或 JUnit5 条件

要在这些环境之外使用它,您必须调用 afterPropertiesSet() 对其进行初始化,并调用 destroy() 将其关闭。

关于java - Spring 的卡夫卡。未启动 EmbeddedKafkaBroker,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60850710/

相关文章:

java - 如何获得一年中某一周的年份?

apache-kafka - 具有更改日志主题与日志压缩源主题的 Kafka Streams KTable 存储

apache-kafka - 在 KafkaStreams 拓扑中重试消息

java - Tomcat 运行 Twitter 流监控应用程序?

java - 从 rhino javascript 调用静态 Java 方法

apache-kafka - 动态创建消费者spring kafka

spring-boot - EmbeddedKafka kafka streams 用SpringBootTest测试发现两个StreamsBuilderFactoryBeans

java - kafka 0.8.2.2有spring支持吗?

java - 在 "graphView"中标签不是从原点开始

apache-kafka - kafka消费者接收开销?