java - Sprint启动kafka Consumer无法连接到kafka容器

标签 java spring-boot docker apache-kafka kafka-consumer-api

我正在尝试部署 2 个 Spring boot 应用程序(kafka 生产者和消费者)。当我将 Producer 部署到 docker 时一切正常,但是当我部署 Consumer 时却无法工作,因为没有与 kafka 容器的连接。

日志显示了这个错误

2019-11-17 05:32:22.644  WARN 1 --- [main] o.a.k.c.NetworkClient: [Consumer clientId=consumer-1, groupId=exampleGroup] Connection to node -1 could not be established. Broker may not be available.

我的 docker-compose.yml 是

version: '3'

services:

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic1:1:1"

在我的 KafkaConfig 类上:

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      //  config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaConstants.ENABLE_AUTO_COMMIT_CONFIG);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
       // config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, KafkaConstants.SESSION_TIMEOUT_MS);

        return new DefaultKafkaConsumerFactory<>(config);
    }

以及常量类

public class KafkaConstants {

    public static String KAFKA_BROKERS = "localhost:9092";
    public static Integer MESSAGE_COUNT=1000;
    public static String TOPIC_NAME="demo";
    public static String GROUP_ID_CONFIG="exampleGroup";
    public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;
    public static String OFFSET_RESET_LATEST="latest";
    public static String OFFSET_RESET_EARLIER="earliest";
    public static Integer MAX_POLL_RECORDS=1;
    public static Integer SESSION_TIMEOUT_MS = 180000;
    public static Integer REQUEST_TIMEOUT_MS_CONFIG = 181000;
    public static String ENABLE_AUTO_COMMIT_CONFIG = "false";
    public static Integer AUTO_COMMIT_INTERVAL_MS_CONFIG = 8000;
}

当我在计算机上安装zookepper和kafka并使用intellij运行这2个Spring Boot应用程序时效果很好。问题是当我部署到本地 Docker 时。

你能帮我吗?

更新

更新我的 docker-compose:

version: '3'

services:

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "ACC_GROUP_CREATE:1:1"

  consumer:
    image: micro1
    container_name: micro1
    depends_on:
      - kafka
    restart: always
    ports:
      - 8088:8088
    depends_on:
      - kafka
    links:
      - kafka:kafka

  producer:
    image: micro2
    container_name: micro2
    depends_on:
      - kafka
    restart: always
    ports:
      - 8087:8087
    depends_on:
      - kafka
    links:
      - kafka:kafka

工作正常!基于@hqt的响应,但我不知道为什么我需要添加消费者/生产者的这些行

最佳答案

由于 KAFKA_ADVERTISED_HOST_NAME 属性导致的问题。这是documentation这解释了为什么 Kafka 需要公布的地址。

The key thing is that when you run a client, the broker you pass to it is just where it’s going to go and get the metadata about brokers in the cluster from. The actual host & IP that it will connect to for reading/writing data is based on the data that the broker passes back in that initial connection—even if it’s just a single node and the broker returned is the same as the one connected to.

当您将 KAFKA_ADVERTISED_HOST_NAME 设置为本地主机时:

  • 您的应用程序在“Intellij”上运行,这意味着在主机环境上运行。该主机创建 Kafka 的容器,因此来自 localhost:9092 的访问将指向 Kafka 的容器。
  • 当您的应用程序在容器内运行时,localhost:9092 表示容器本身。所以这是没有意义的。 (这个容器甚至没有任何监听端口9092的进程)

在容器环境中运行 Web 应用时,将 KAFKA_ADVERTISED_HOST_NAME 属性更新为 kafka 即可。请注意,您的 Web 应用程序和 kafka 容器必须位于同一 docker 网络上。

这里是建议的 docker-compose,用于使用 Wurstmeister 的镜像运行 Kafka 集群。

version: "2"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: "topic1:1:1"

   web_app:
    # your definition of the web_app goes  here

然后您可以连接到容器环境内地址 kafka:9092 上的 Kafka 代理。

关于java - Sprint启动kafka Consumer无法连接到kafka容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58897876/

相关文章:

java - 在实例创建或使用 new 关键字创建对象期间何时分配内存?

java - 使用 elasticSearch 和 Spring Boot 创建 bean 时出错

java - 在 Spring Boot 应用程序中添加条件外部 PropertySource

java - ElasticSearch Rest High Level Client 重新映射错误

apache-spark - 本地同一计算机的Spark集群有哪些优势?

java - 在循环中重用 StringBuilder 会更好吗?

java - 使用参数指定命名查询中的表达式

spring - 使用 Spring oauth2 使用受 OAuth 保护的 REST Web 服务

docker - 如何将 Virtual Box 中的 Docker 镜像推送到 Google 存储库?

docker - CURL在Docker镜像中不起作用[无法访问Docker镜像中的主机]