java - Kafka AdminClientConfig 忽略提供的配置

标签 java apache-kafka spring-kafka

我有一个基于微服务的 Java (Spring Boot) 应用程序,我正在其中集成 Kafka 以进行事件驱动的内部服务通信。服务在同一桥接网络下的 docker-compose 内运行。我已在同一网络下再次将 cp-kafka 添加到该 docker-compose 中。

我的问题是,一旦我启动 docker-compose,生产者和消费者都不会连接到代理。发生的情况是 AdminClientConfig 使用 localhost:9092 而不是我在 Broker 配置中定义为广告监听器的 kafka:9092

这是我在生产者处得到的输出:

2023-02-14 13:09:17.563  INFO [article-service,,] 1 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-14T13:09:17.563482000Z  bootstrap.servers = [localhost:9092]
2023-02-14T13:09:17.563518700Z  client.dns.lookup = use_all_dns_ips
2023-02-14T13:09:17.563524100Z  client.id = 
2023-02-14T13:09:17.563528200Z  connections.max.idle.ms = 300000
...

消费者将使用我提供的 ConsumerConfig 进行短暂连接:

2023-02-14 13:10:13.358  INFO 1 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-saveArticle-1
        client.rack = 
        connections.max.idle.ms = 540000
...

但是,之后它会重试,这次使用 AdminClientConfig 代替

2023-02-14 13:10:42.365  INFO 1 --- [   scheduling-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = use_all_dns_ips
        client.id = 
        connections.max.idle.ms = 300000

docker-compose.yml的相关部分

...
networks:
  backend:
    name: backend
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: dev.infra.zookeeper
    networks:
      - backend
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    networks:
      - backend
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      AUTO_CREATE_TOPICS_ENABLE: 'false'
...

生产者 application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: kafka:9092
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
    topic:
      name: saveArticle

消费者应用程序.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: kafka:9092
      auto-offset-reset: earliest
      group-id: stock
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    topic:
      name: saveArticle

我正在使用的 Kafka 依赖项:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.0.2</version>
            <type>pom</type>
        </dependency>

有什么线索可以说明它从哪里获取localhost:9092以及为什么它忽略我在代理配置中提供的明确指定的kafka:9092主机吗?我该如何解决我的问题?

最佳答案

一个应用程序只需要一个 yaml 文件。

该错误是因为您没有设置 spring.kafka.bootstrap-servers=kafka:9092,仅设置 生产者消费者因此,单独的客户端与代理所宣传的内容无关,而是 spring-kafka 默认值。

您可以添加 spring.kafka.admin 部分,但最好不要重复不必要的配置

https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka

但是,如果您尝试在主机上运行此代码,您将需要宣传 localhost:9092,否则,您最终将得到 UnknownHostException: kafka

关于java - Kafka AdminClientConfig 忽略提供的配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75448681/

相关文章:

java - 使用 Spring Embedded Kafka 测试 @KafkaListener

spring-boot - KafkaListener Spring Boot中组ID、客户端ID和ID的区别

Java:如何在抽象类中引用子类的静态变量?

java - 如何动态更改jtable单元格背景

java - 我们可以在java中直接将对象读入字节缓冲区吗?

java - 通过与 kafka-streams 的连接批量处理数据导致 `Skipping record for expired segment`

hadoop - 卡夫卡的消费者无法开始

java - hibernate 。如何为字段组合添加唯一索引?

java - Kafka 读取字段时出错 'correlation_id' : java. nio.BufferUnderflowException

java - 如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交