docker - 无法从Spring Boot应用程序连接Kafka

标签 docker apache-kafka

我正在尝试从PC上运行的Springboot Java应用程序连接在docker中的远程主机上运行的Kafka。该应用程序尝试创建一个Kafka使用者。据我从日志中看到的(在下面),该应用程序正确地标识了kafka引导服务器并尝试建立连接。但是由于某种原因,它开始在本地主机上寻找Kafka经纪人,显然失败了。任何想法为什么会发生这种情况?从与Kafka相同的主机上运行时,该应用程序可以正常运行。

日志说明:

第1行:在设置引导服务器之前,我从java代码中打印输出。它表明已从配置文件正确读取配置。

第2行:使用者配置表明它将使用正确的kafka主机:bootstrap.servers = [MY_KAFKA_HOST:9092]

第3-11行:应用程序尝试连接正确的kafka主机

第14-20行:由于某种原因,应用程序尝试连接本地主机,我不知道为什么是

第21行:按预期,与本地主机的连接失败。

1  2020-03-13 14:24:34.135  INFO 9732 --- [           main] c.p.controller.KafkaConfiguration  : kafkaBootstrapServer=MY_KAFKA_HOST:9092

2  2020-03-13 14:24:36.956  INFO 9732 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [MY_KAFKA_HOST:9092]
    check.crcs = true
    client.dns.lookup = default

3  2020-03-13 14:24:37.299 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Found least loaded node MY_KAFKA_HOST:9092 (id: -1 rack: null) with no active connection

4  [mongod output] 2020-03-13 14:24:37.842 DEBUG 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Initiating connection to node MY_KAFKA_HOST:9092 (id: -1 rack: null) using address /MY_KAFKA_HOST
5  2020-03-13 14:24:37.865 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Found least loaded connecting node MY_KAFKA_HOST:9092 (id: -1 rack: null)
6  2020-03-13 14:24:37.868 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Found least loaded connecting node MY_KAFKA_HOST:9092 (id: -1 rack: null)
7  2020-03-13 14:24:37.869 DEBUG 9732 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-sent
8  2020-03-13 14:24:37.870 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics  : Registered metric named MetricName [name=outgoing-byte-total, group=consumer-node-metrics, description=The total number of outgoing bytes, tags={client-id=consumer-1, node-id=node--1}]
9  2020-03-13 14:24:37.870 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics  : Registered metric named MetricName [name=outgoing-byte-rate, group=consumer-node-metrics, description=The number of outgoing bytes per second, tags={client-id=consumer-1, node-id=node--1}]
10 2020-03-13 14:24:37.925 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Found least loaded node MY_KAFKA_HOST:9092 (id: -1 rack: null) connected with no in-flight requests
11 2020-03-13 14:24:37.932 DEBUG 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='my_test_topic_name')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node MY_KAFKA_HOST:9092 (id: -1 rack: null)
12 2020-03-13 14:24:37.935 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Sending METADATA {topics=[{name=my_test_topic_name}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false} with correlation id 2 to node -1
groupId=my_project_group_id] Sending METADATA {topics=[{name=my_test_topic_name}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false} with correlation id 2 to node -1
13 2020-03-13 14:24:37.938 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Sending FIND_COORDINATOR {key=my_project_group_id,key_type=0} with correlation id 0 to node -1
14 2020-03-13 14:24:37.949 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Completed receive from node -1 for METADATA with correlation id 2, received {throttle_time_ms=0,brokers=[{node_id=1,host=localhost,port=9092,rack=null}],cluster_id=kjU7kDUlSKq9UodK7-T6Wg,controller_id=1,topics=[{error_code=0,name=my_test_topic_name,is_internal=false,partitions=[{error_code=0,partition_index=0,leader_id=1,leader_epoch=0,replica_nodes=[1],isr_nodes=[1],offline_replicas=[]}],topic_authorized_operations=0}],cluster_authorized_operations=0}
2
15 2020-03-13 14:24:37.984 DEBUG 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-1, groupId=my_project_group_id] Updated cluster metadata updateVersion 2 to MetadataCache{cluster=Cluster(id = kjU7kDUlSKq9UodK7-T6Wg, nodes = [localhost:9092 (id: 1 rack: null)], partitions = [Partition(topic = my_test_topic_name, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = [])], controller = localhost:9092 (id: 1 rack: null))}
16 2020-03-13 14:24:37.985 TRACE 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=NONE,node_id=1,host=localhost,port=9092}
17 2020-03-13 14:24:37.991 DEBUG 9732 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=my_project_group_id] Received FindCoordinator response ClientResponse(receivedTimeMs=1584102277985, latencyMs=163, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='localhost', port=9092))
18 2020-03-13 14:24:37.992  INFO 9732 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=my_project_group_id] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
19 2020-03-13 14:24:37.993 DEBUG 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Initiating connection to node localhost:9092 (id: 2147483646 rack: null) using address localhost/127.0.0.1
20 2020-03-13 14:24:38.002  INFO 9732 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=my_project_group_id] Revoking previously assigned partitions []
21 2020-03-13 14:24:42.167  WARN 9732 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=my_project_group_id] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

这是我的Kafka docker撰写的:
 kafka:
    image: my_artifactory:5555/wurstmeister/kafka:latest
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_LISTENERS: INTERNAL://kafka:9093,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9093,EXTERNAL://my_kafka_host_ip:9092
      KAFKA_ADVERTISED_HOST_NAME: my_kafka_host_ip
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

最佳答案

欢迎来到StackOverflow :-)

您没有正确配置Kafka代理。当客户端连接到代理时,它会请求所有可用代理及其侦听器的详细信息。这就是通告的侦听器(因为它是代理“通告”监听器的身份)。

您可以在此处的客户端日志中看到这种情况:

Completed receive from node -1 for METADATA with correlation id 2
received … node_id=1,host=localhost,port=9092…

您需要将代理上的advertised.hosts配置为客户端可以在其上访问的主机和端口。这意味着Docker容器必须将其端口公开给主机,并且您的主机没有防火墙阻止该端口。

有关更多详细信息,请参见https://rmoff.net/2018/08/02/kafka-listeners-explained/

关于docker - 无法从Spring Boot应用程序连接Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60671982/

相关文章:

spring-boot - 有没有办法配置@KafkaListener的轮询间隔?

apache-kafka - 每个主题的 Kafka 保留.bytes 和全局 log.retention.bytes 不起作用

docker - 未知无法查询docker版本: Unable to read TLS config: tls: failed to find any PEM data in certificate input

docker - 如何使docker重置远程服务器上的镜像?

mongodb - 如何使用 docker-compose 为 mongo 数据库播种?

javascript - 可以在自己的容器中使用sqlite吗?

amazon-web-services - 本地访问amazon msk集群

java - 如何从 shell 脚本覆盖 Log4j 值?

postgresql - 等同于使用 ssh 隧道

apache-kafka - Hyperledger Fabric First 网络不与 kafka 一起使用并使用 TLS