C# 无法在 Kafka 主题上使用消息?

标签 c# apache-kafka confluent-platform

我一直在查看 Confluent.Kafka 客户端 ( https://github.com/confluentinc/confluent-kafka-dotnet/ ) 的几个示例,虽然我可以成功地让生产者将消息推送到 Kafka,但我无法将任何消息拉回给消费者.

通过 UI 我可以看到主题已创建并且消息正在进入该主题(目前有 10 个分区和 3 条消息),但是我的消费者总是报告“分区结束”,没有任何消息的消费( 3 保持在主题上并且“OnMessage”永远不会触发)。

但是消费者确实在访问主题,并且可以在其中一个分区上看到 3 条消息:

分区结束:dotnet-test-topic [6] @3

它只是不消费消息并触发 OnMessage()。有什么想法吗?

var conf = new Dictionary<string, object> 
{ 
    { "group.id", Guid.NewGuid().ToString() },
    { "bootstrap.servers", "mykafkacluster:9094" },
    { "sasl.mechanisms", "SCRAM-SHA-256" },
    { "security.protocol", "SASL_SSL" },
    { "sasl.username", "myuser" },
    { "sasl.password", "mypass" }
};

using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
    producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
            .ContinueWith(result => 
            {
                var msg = result.Result;
                if (msg.Error.Code != ErrorCode.NoError)
                {
                    Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
                }
                else 
                {
                    Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
                }
            });

    producer.Flush(TimeSpan.FromSeconds(10));
}

using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{ 
    consumer.Subscribe("dotnet-test-topic");

    consumer.OnConsumeError += (_, err)
        => Console.WriteLine($"consume error: {err.Error.Reason}");

    consumer.OnMessage += (_, msg)
        => Console.WriteLine($"consumed: {msg.Value}");

    consumer.OnPartitionEOF += (_, tpo)
        => Console.WriteLine($"end of partition: {tpo}");

    while (true)
    {
        consumer.Poll(TimeSpan.FromMilliseconds(100));
    }  
}

最佳答案

如果没有提供以下配置,看起来 OnMessage 事件不会触发:

{ "auto.offset.reset", "smallest" }

添加这个后,我就可以阅读有关该主题的消息了。

关于C# 无法在 Kafka 主题上使用消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51673641/

相关文章:

apache-kafka - 重试Deadly Queue for Kafka中的消息的最佳实践是什么

apache-kafka - 使用 Kafka Avro Console Consumer 时如何为特定的 Schema 注册表传递参数?

apache-kafka - 从主题的两个日期时间之间获取 Kafka 消息?

c# - 跨多个相关表进行批量插入?

c# - REQ/REP 模式中的 ZeroMQ FiniteStateMachineException

amazon-web-services - 使用 IAM 的 AWS MSK Spring Boot 应用程序示例

java - Apache Kafka - 关于主题/分区的 KafkaStream

c# - 'Emgu.CV.CvInvoke' 的类型初始值设定项抛出异常

javascript - 在 javascript 确认的所有选项卡中重置计时器

apache-kafka - Kafka Connect Confluence S3 Sink 连接器 : Class io. confluence.connect.avro.AvroConverter 找不到