我们使用 .NET Kafka 客户端在 C# 代码中使用来自一个主题的消息。 然而,这似乎有点太慢了。
想知道我们是否可以稍微并行化该过程,所以我在那里检查了这个答案:Kafka how to consume one topic parallel
但在下面的示例中,我真的不知道如何使用 .NET Kafka 客户端来实现此分区:
var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
.SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));
using (var consumer = consumerBuilder.Build())
{
consumer.Subscribe(RevenueResponseTopicName);
try
{
while (!stoppingToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(stoppingToken);
RevenueTopicResponseModel revenueResponse;
try
{
revenueResponse = JsonConvert.DeserializeObject<RevenueTopicResponseModel>(consumeResult.Value);
}
catch
{
_logger.LogCritical("Impossible to deserialize the response. {@RevenueConsumeResult}", consumeResult);
continue;
}
_logger.LogInformation("Revenue response received from Kafka. {RevenueTopicResponse}",
consumeResult.Value);
await _revenueService.RevenueResultReceivedAsync(revenueResponse);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation($"Operation canceled. Closing {nameof(RevenueResponseConsumer)}.");
consumer.Close();
}
catch (Exception e)
{
_logger.LogCritical(e, $"Unhandled exception during {nameof(RevenueResponseConsumer)}.");
}
}
最佳答案
您需要创建具有多个分区的主题,假设有 10 个分区。 在您的代码中创建 10 个具有相同消费者组的消费者 - 代理将在您的消费者之间分发主题消息。
基本上,只需将代码放入 for
循环中即可:
for (int i = 0; i < 10; i++)
{
var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
.SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));
using (var consumer = consumerBuilder.Build())
{
// your processing here
}
}
关于c# - 如何使用分区以便使用 .NET Core C# 并行使用 kafka 中的一个主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57464072/