我使用 Confluent Kafka .Net 库版本 1.2.1,我已经实现了消费者来消费来自一个主题的消息,问题是 Consume 方法阻塞主线程并一直等待直到消息发布,但我想让它成为非阻塞或并行运行。有人可以帮助我吗?
下面是我用于消费者的代码
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
consumer.Subscribe(topicName);
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult!=null)
{
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
Console.WriteLine($"Received message => {consumeResult.Value}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
我找不到消费者的异步方法,但生产者有 ProduceAsync 来达到目的。
最佳答案
没有异步消费(还)。您可能不知道调用消耗只是从内部队列返回消息 - 不对应于对 Kafka 代理的网络提取。消息在后台线程中获取,默认情况下缓存非常积极(可通过此处概述的参数进行调整:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。如果有可用消息,消费者可以以 > 500,000 条消息/秒(对于小消息大小)的速度将它们传送到您的应用程序。要实现并行性,请使用多个消费者(通常在单独的进程中,并注意最好尽量充分利用每个消费者以获得最大效率),或者在消费一条消息后使用多个线程。
如上所述,如果您想同时阻塞多种类型的 IO(例如 kafka 消耗和 http 响应),异步消耗方法将非常有用。适应标准 C# 模式也很有用,特别是托管服务。由于这些原因,我们希望在将来添加它。
现在,对于第一个用例,您可以只使用另一个线程(一个不忙的额外线程不会对性能产生重大影响):
Task t = Task.Run(() => consumer.Consume(ct);
对于第二个用例,只需设置一个长时间运行的线程即可。
关于c# - 如何在 dot net 的融合 kafka 中使消费方法成为非阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59369752/