我现在正在学习 Kafka,并努力正确设置我的 docker-compose 配置。我想做的是基于 wurstmeister/kafka
运行一个经纪人图像,然后是另一个运行简单 python 脚本的容器 kafka-python
.
我一直在关注this主要是教程,但我怀疑我对端口的处理有点困惑。这是我的 docker-compose.yml:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "client-pusher:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
app-python:
build: .
ports:
- "9093:9093"
expose:
- "9093"
- "9092"
depends_on:
- "kafka"
说实话,当谈到 Docker 中的端口时,我有一半时间都不知道自己在做什么。
使用这个Dockerfile
FROM python:3.8
ENV PYTHONUNBUFFERED=1
# set the working directory in the container
WORKDIR /code
# copy the dependencies file to the working directory
COPY requirements.txt .
# install dependencies
RUN pip install -r requirements.txt
# copy the content of the local src directory to the working directory
COPY . .
# command to run on container start
CMD ["python","/code/consumer.py"]
我可以让这个脚本输出一些日志:
# consumer.py
import json
from datetime import date
from typing import Optional
import time
import logging
from kafka import KafkaConsumer
from pydantic import BaseModel
class Client(BaseModel):
first_name: str
email: str
group_id: Optional[int] = None
date: date
# consumer = KafkaConsumer(
# 'client-pusher',
# bootstrap_servers=['kafka:9093'],
# auto_offset_reset='earliest',
# enable_auto_commit=True,
# group_id='my-group-id',
# value_deserializer=lambda x: json.loads(x.decode('utf-8'))
# )
count = 0
while True:
# msg_pack = consumer.poll(timeout_ms=500)
logging.warning(f"Hi there {count}")
time.sleep(2)
count += 1
# for tp, messages in msg_pack.items():
# for message in messages:
# client = Client(**message.value)
# print(client)
但是当注释掉的代码取消注释时,连接失败。
bootstrap_servers=['kafka:9093'],
行结果
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
我觉得在 docker-compose 文件中正确公开或配置端口并在 python 脚本中正确使用它们,和/或正确配置服务名称有一些神奇的组合。但我迷路了。有人可以帮忙吗?
最佳答案
TLDR;删除全部expose
并将 app-python 的端口调整为尚未引用的端口。在您的代码中,而不是 kafka:9093
,使用localhost:9092
有两件事:
我。对于 app-python
,您将机器端口 9093 (localhost:9093) 暴露给容器端口 9093 (app-python:9093)。两个容器不能公开相同的机器端口,因此我建议让您的 kafka 容器端口配置与您的应用程序端口保持适当的距离(对于 kafka 可能是 9092/9093,对于您的应用程序可能是 8080)
localhost:9092
在你的Python脚本中。即您的代码通过其外部端口映射( OUTSIDE
)连接到 kafka
如果您像现在一样运行它,两个容器都在同一个 docker 网络中运行)我建议将环境变量(或您可以在代码中传递和引用的属性)传递给 app-python
使用引导服务器<Container name>:<INSIDE PORT>
---kafka:9093
这是我使用 Java 的示例,我可以在 docker-compose 内部或外部运行应用程序(其余)。如果在外面,我引用了localhost:9092
但如果在里面,我是这样引用的:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka_broker_1
image: wurstmeister/kafka
links:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOSTNAME: kafka
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock%
rest:
image: rest:latest
container_name: rest
build:
context: rest
dockerfile: Dockerfile
links:
- kafka
environment:
- SPRING_KAFKA_BOOTSTRAP-SERVERS=kafka:29092
- SERVER_PORT=8080
ports:
- "8080:8080"
depends_on:
- kafka
关于python - 设置 docker-compose 端口和主机名以使用 kafka-python 容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69245497/