c# - 如何在 dot net 的融合 kafka 中使消费方法成为非阻塞

标签 c# .net-core apache-kafka kafka-consumer-api confluent-platform

我使用 Confluent Kafka .Net 库版本 1.2.1,我已经实现了消费者来消费来自一个主题的消息,问题是 Consume 方法阻塞主线程并一直等待直到消息发布,但我想让它成为非阻塞或并行运行。有人可以帮助我吗?

下面是我用于消费者的代码

using (var consumer = new ConsumerBuilder<Ignore, string>(config) 
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .SetPartitionsAssignedHandler((c, partitions) =>
    {
        Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");        
    })
    .SetPartitionsRevokedHandler((c, partitions) =>
    {
        Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
    })
    .Build())
{
    consumer.Subscribe(topicName);

    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume(cancellationToken);

            if (consumeResult!=null)
            {
                if (consumeResult.IsPartitionEOF)
                {
                    Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                    continue;
                }

                Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");

                Console.WriteLine($"Received message => {consumeResult.Value}");

            }
        }

        catch (ConsumeException e)
        {
            Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
    }
}

我找不到消费者的异步方法,但生产者有 ProduceAsync 来达到目的。

最佳答案

没有异步消费(还)。您可能不知道调用消耗只是从内部队列返回消息 - 不对应于对 Kafka 代理的网络提取。消息在后台线程中获取,默认情况下缓存非常积极(可通过此处概述的参数进行调整:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。如果有可用消息,消费者可以以 > 500,000 条消息/秒(对于小消息大小)的速度将它们传送到您的应用程序。要实现并行性,请使用多个消费者(通常在单独的进程中,并注意最好尽量充分利用每个消费者以获得最大效率),或者在消费一条消息后使用多个线程。

如上所述,如果您想同时阻塞多种类型的 IO(例如 kafka 消耗和 http 响应),异步消耗方法将非常有用。适应标准 C# 模式也很有用,特别是托管服务。由于这些原因,我们希望在将来添加它。

现在,对于第一个用例,您可以只使用另一个线程(一个不忙的额外线程不会对性能产生重大影响):

Task t = Task.Run(() => consumer.Consume(ct);



对于第二个用例,只需设置一个长时间运行的线程即可。

关于c# - 如何在 dot net 的融合 kafka 中使消费方法成为非阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59369752/

相关文章:

c# - 如何在另一个线程中访问 HttpContext.Current

ubuntu - 无法在 ubuntu 17.10 上安装 .net 核心

c# - 在派生的 DbContext 中获取服务

c#将表达式中的字符串转换为int

C# - 当每个 'row' 有多个值时对列表进行排序?

c# - 我如何(优雅地)从内部关闭工作人员服务?

apache-spark - 提供的 Maven 坐标必须采用 'groupId:artifactId:version' 形式 PySpark 和 Kafka

streaming - 是否有任何模拟器/工具来生成流消息?

java - 卡夫卡给 : "The group member needs to have a valid member id before actually entering a consumer group"

c# - Entity Framework Core 2.0 中每种类型的表