我正在尝试部署 2 个 Spring boot 应用程序(kafka 生产者和消费者)。当我将 Producer 部署到 docker 时一切正常,但是当我部署 Consumer 时却无法工作,因为没有与 kafka 容器的连接。
日志显示了这个错误
2019-11-17 05:32:22.644 WARN 1 --- [main] o.a.k.c.NetworkClient: [Consumer clientId=consumer-1, groupId=exampleGroup] Connection to node -1 could not be established. Broker may not be available.
我的 docker-compose.yml 是
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic1:1:1"
在我的 KafkaConfig 类上:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaConstants.ENABLE_AUTO_COMMIT_CONFIG);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
// config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, KafkaConstants.SESSION_TIMEOUT_MS);
return new DefaultKafkaConsumerFactory<>(config);
}
以及常量类
public class KafkaConstants {
public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT=1000;
public static String TOPIC_NAME="demo";
public static String GROUP_ID_CONFIG="exampleGroup";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;
public static String OFFSET_RESET_LATEST="latest";
public static String OFFSET_RESET_EARLIER="earliest";
public static Integer MAX_POLL_RECORDS=1;
public static Integer SESSION_TIMEOUT_MS = 180000;
public static Integer REQUEST_TIMEOUT_MS_CONFIG = 181000;
public static String ENABLE_AUTO_COMMIT_CONFIG = "false";
public static Integer AUTO_COMMIT_INTERVAL_MS_CONFIG = 8000;
}
当我在计算机上安装zookepper和kafka并使用intellij运行这2个Spring Boot应用程序时效果很好。问题是当我部署到本地 Docker 时。
你能帮我吗?
更新
更新我的 docker-compose:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "ACC_GROUP_CREATE:1:1"
consumer:
image: micro1
container_name: micro1
depends_on:
- kafka
restart: always
ports:
- 8088:8088
depends_on:
- kafka
links:
- kafka:kafka
producer:
image: micro2
container_name: micro2
depends_on:
- kafka
restart: always
ports:
- 8087:8087
depends_on:
- kafka
links:
- kafka:kafka
工作正常!基于@hqt的响应,但我不知道为什么我需要添加消费者/生产者的这些行
最佳答案
由于 KAFKA_ADVERTISED_HOST_NAME
属性导致的问题。这是documentation这解释了为什么 Kafka 需要公布的地址。
The key thing is that when you run a client, the broker you pass to it is just where it’s going to go and get the metadata about brokers in the cluster from. The actual host & IP that it will connect to for reading/writing data is based on the data that the broker passes back in that initial connection—even if it’s just a single node and the broker returned is the same as the one connected to.
当您将 KAFKA_ADVERTISED_HOST_NAME
设置为本地主机时:
- 您的应用程序在“Intellij”上运行,这意味着在主机环境上运行。该主机创建 Kafka 的容器,因此来自 localhost:9092 的访问将指向 Kafka 的容器。
- 当您的应用程序在容器内运行时,localhost:9092 表示容器本身。所以这是没有意义的。 (这个容器甚至没有任何监听端口9092的进程)
在容器环境中运行 Web 应用时,将 KAFKA_ADVERTISED_HOST_NAME
属性更新为 kafka
即可。请注意,您的 Web 应用程序和 kafka 容器必须位于同一 docker 网络上。
这里是建议的 docker-compose,用于使用 Wurstmeister 的镜像运行 Kafka 集群。
version: "2"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "topic1:1:1"
web_app:
# your definition of the web_app goes here
然后您可以连接到容器环境内地址 kafka:9092
上的 Kafka 代理。
关于java - Sprint启动kafka Consumer无法连接到kafka容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58897876/