c# - 如何使用分区以便使用 .NET Core C# 并行使用 kafka 中的一个主题?

标签 c# .net-core apache-kafka kafka-consumer-api

我们使用 .NET Kafka 客户端在 C# 代码中使用来自一个主题的消息。 然而,这似乎有点太慢了。

想知道我们是否可以稍微并行化该过程,所以我在那里检查了这个答案:Kafka how to consume one topic parallel

但在下面的示例中,我真的不知道如何使用 .NET Kafka 客户端来实现此分区:

var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
    .SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));

using (var consumer = consumerBuilder.Build())
{
    consumer.Subscribe(RevenueResponseTopicName);

    try
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var consumeResult = consumer.Consume(stoppingToken);

            RevenueTopicResponseModel revenueResponse;
            try
            {
                revenueResponse = JsonConvert.DeserializeObject<RevenueTopicResponseModel>(consumeResult.Value);
            }
            catch
            {
                _logger.LogCritical("Impossible to deserialize the response. {@RevenueConsumeResult}", consumeResult);
                continue;
            }
            _logger.LogInformation("Revenue response received from Kafka. {RevenueTopicResponse}",
                consumeResult.Value);

            await _revenueService.RevenueResultReceivedAsync(revenueResponse);
        }
    }
    catch (OperationCanceledException)
    {
        _logger.LogInformation($"Operation canceled. Closing {nameof(RevenueResponseConsumer)}.");
        consumer.Close();
    }
    catch (Exception e)
    {
        _logger.LogCritical(e, $"Unhandled exception during {nameof(RevenueResponseConsumer)}.");
    }
}

最佳答案

您需要创建具有多个分区的主题,假设有 10 个分区。 在您的代码中创建 10 个具有相同消费者组的消费者 - 代理将在您的消费者之间分发主题消息。

基本上,只需将代码放入 for 循环中即可:

for (int i = 0; i < 10; i++)
{
    var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
    .SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));

    using (var consumer = consumerBuilder.Build())
    {
        // your processing here
    }
}

关于c# - 如何使用分区以便使用 .NET Core C# 并行使用 kafka 中的一个主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57464072/

相关文章:

java - Kafka 事务生产者——尽管中止,read_committed 仍显示记录

c# - 为了我们可以在其上放置 foreach,对集合的要求是什么?

在 Program.cs 中登录

apache-kafka - kafka 消费者机器是否需要运行 zookeeper?

asp.net - Visual Studio 2019 似乎没有看到我安装了 .NET Core 3.1(带 SDK)

angular - 请求的资源上存在“Access-Control-Allow-Origin” header

apache-spark - Spark-Streaming 最早在 kafka 开始偏移时挂起(Kafka 2,spark 2.4.3)

c# - C#中如何访问不同域的文件

c# - 尝试在 Unity 中自动运行单元测试

c# - Streamwriter 仅从字符串列表中写入一定数量的字节