c# - Kafka 消费者不消费来自现有主题的消息

标签 c# apache-kafka confluent-kafka-dotnet

我已经安装了在 docker 上运行的融合 kafka。在主题中,我有 10 个分区。问题是我无法使用来自该主题的消息,但我可以在该主题中生成消息。我正在尝试使用 C# confluent.kafka 驱动程序 1.5.1(最新)和 librd.kafka 1.5.0(最新)从主题中消费。
我启动 kafka 的 docker-compose 文件如下

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    networks:
      - bridge_network
    ports:
      - "3001:3001"    
    environment:
      ZOOKEEPER_CLIENT_PORT: 3001
      ZOOKEEPER_TICK_TIME: 3000

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
 
  kafka_manager:
    image: sheepkiller/kafka-manager
    hostname: kafka_manager
    depends_on:
      - zookeeper
    ports:
      - '9000:9000'
    networks:
      - bridge_network
    environment:
      ZK_HOSTS: 'zookeeper:3001'
networks:
  bridge_network:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"
我在 C# 中的消费者配置如下:
            var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
            {
                { "bootstrap.servers", "PLAINTEXT://localhost:3002" },
                { "group.id", "some-test-group" },
                { "auto.offset.reset", "latest"},
                { "compression.codec", "gzip" },
                { "enable.auto.commit", "false" }
            }).Build();

            consumer.Subscribe("some-test-topic");

            while (true)
            {
                var cr = consumer.Consume(30_000);
                if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
                {
                    System.Console.WriteLine("that's it");
                    break;
                }

                System.Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
            }
我确定主题的分区中有消息,因为我可以使用 kafka 工具 2.0 检查主题
enter image description here
我用于 kafka 工具的配置是 enter image description here enter image description here
我很确定我在配置文件中遗漏了一些东西,但是在阅读了 2 天的文档并将我的头撞在墙上后,我仍然找不到问题。那么有人可以帮忙吗?

最佳答案

问题在于代理和主题复制因子。我使用您的 docker-compose 文件部署了 kafka,我连接以查看日志并且有消息:

ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
为了解决这个问题,我不得不为代理配置添加`KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1'。所以我的代理服务配置如下所示:
broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
重新启动代理后,我能够生成/使用消息。

关于c# - Kafka 消费者不消费来自现有主题的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63545786/

相关文章:

c# - 如何选择特定图像并从特定网址保存? (内有说明)

c# - 在按钮图像上设置数字

c# - 在不打开 Word 的情况下使用 Interop DLL 打开 Word 文档?

javascript - 卡夫卡 : Reading off __consumer_offsets with node-rdkafka

c# - 如何添加 'Microsoft.VisualStudio.TeamSystem.Data.UnitTesting' 引用

apache-kafka - 消费者组可以跨越多个服务器吗?

java - 通过与 kafka-streams 的连接批量处理数据导致 `Skipping record for expired segment`

c# confluence.kafka 无法使用 Protobuf-net 反序列化 protobuf 消息

apache-kafka - 为什么消费者重启后会读取Kafka主题的所有消息?

c# - Confluence Kafka Consumer 类中 key 的反序列化意味着什么?