.net-core - 当消费者运行时,在 Confluence.Kafka 中使用主题消息

标签 .net-core apache-kafka message-queue apache-zookeeper

我在 .netCore 项目中使用 Confluence.Kafka(1.4.4) 作为消息代理。在项目启动时,我仅将“bootstrapservers”设置为 appSetting.json 文件中的特定服务器,并在必要时使用相关类中的以下代码在 API 中生成消息:

public async Task WriteMessage<T>(string topicName, T message)
    {
        using (var p = new ProducerBuilder<Null, string>(_producerConfig).Build())
        {
            try
            {
                var serializedMessage= JsonConvert.SerializeObject(message);
                var dr = await p.ProduceAsync(topicName, new Message<Null, string> { Value = serializedMessage });
                logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                logger.LogInformation($"Delivery failed: {e.Error.Reason}");
            }
        }
    }

我还在消费者解决方案中添加了以下代码:

public async Task Run()
    {
        using (var consumerBuilder = new ConsumerBuilder<Ignore, string>(_consumerConfig).Build())
        {
            consumerBuilder.Subscribe(new List<string>() { "ActiveMemberCardForPanClubEvent", "CreatePanClubEvent", "RemovePanClubEvent"
            });

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var consumer = consumerBuilder.Consume(cts.Token);
                        if (consumer.Message != null)
                        {
                            using (LogContext.PushProperty("RequestId", Guid.NewGuid()))
                            {
                                //Do something
                                logger.LogInformation($"Consumed message '{consumer.Message.Value}' at: '{consumer.TopicPartitionOffset}'.");
                                await DoJob(consumer.Topic, consumer.Message.Value);
                                consumer.Topic.Remove(0, consumer.Topic.Length);
                            }

                        }
                        else
                        {
                            logger.LogInformation($"message is null for topic '{consumer.Topic}'and partition : '{consumer.TopicPartitionOffset}' .");
                            consumer.Topic.Remove(0, consumer.Topic.Length);
                        }
                    }
                    catch (ConsumeException e)
                    {

                        logger.LogInformation($"Error occurred: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                consumerBuilder.Close();
            }
        }
    }

我生成一条消息,当消费者项目运行时,一切都很顺利,并且消息正在消费者解决方案中被读取。 当消费者项目未运行并且我将 API 中的消息与 API 中的消息生产者一起排队时,就会出现此问题。运行消费者后,该主题没有任何有效消息表明正在生成消息。 我熟悉消息代理并有相关经验,我知道通过发送消息,它会一直在总线上直到被使用,但我不明白为什么它不能在这个项目中与 Kafka 一起使用。

最佳答案

“auto.offset.reset”消费者属性的默认设置是“最新”。

这意味着(在尚未写入偏移量的情况下)如果您向某个主题写入消息然后随后启动使用者,它将跳过在启动使用者之前写入的任何消息。这可能就是您的消费者看不到生产者排队的消息的原因。

解决方案是将“auto.offset.reset”设置为“earliest”,这意味着消费者将从该主题的最早偏移量开始。

https://docs.confluent.io/current/installation/configuration/consumer-configs.html#auto.offset.reset

关于.net-core - 当消费者运行时,在 Confluence.Kafka 中使用主题消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64568376/

相关文章:

apache-spark - 如何根据数据大小重新分区rdd

twitter - Kafka Twitter 流 TwitterException 错误

scala - 在我停止作业之前,Spark Structured Streaming writestream 不会写入文件

java - 如何向 MQ 服务器运行消息并获取输出消息

c# - DllImport 在 Docker 上不起作用 - DllNotFoundException

.net-core - 部署到 Azure 后,在 Angular 7 中使用 i18n 的本地化不起作用

c# - 通过 .NET Core 上的 MEF 将参数传递给插件构造函数?

c# - 实现分布式队列

java - 如何使用 Spring 4 的 JmsTemplate 监听来自 MQ 的消息?

.net-core - 使用 OData 8 设置自定义路线