c# - RabbitMQ 异步

标签 c# rabbitmq .net-core microservices

我想在消费者服务中处理 RabbitMQ 队列。当我尝试遵循教程时,我可以看到它逐条处理消息。但是,如果某些消息的处理需要更长的时间(例如更长的数据库响应)怎么办?然后它不会处理任何其他事情。

我想让它异步。因此它可以在等待期间处理其他消息。我尝试了这段代码,它有效,但在我看来它并不正确(没有等待任务,然后是ContinueWith):

private async Task ExecuteAsync(CancellationToken cancelationToken)
{
    Random random = new Random();
    var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "task_queue",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 30, global: false);

        Console.WriteLine(" [*] Waiting for messages.");

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);

            // Is it possible to write following part somehow,
            // 1) so that following task can be awaited ?
            // 2) so I doesn't have to use .ContinueWith ?
            #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
            Task.Run(async () =>
            {
                await Task.Delay(random.Next(100, 5000), cancelationToken);

                Console.WriteLine(" [x] Received {0}", message);
            }).ContinueWith((prevTask) =>
            {
                if (!prevTask.IsFaulted)
                {
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }

            });
            #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed


        };

        channel.BasicConsume(queue: "task_queue",
                                autoAck: false,
                                consumer: consumer);

        while (!cancelationToken.IsCancellationRequested)
        {
            await Task.Delay(100, cancelationToken);
        }
    }

}

如果我等待 Task.Run,​​那么在此之前它不会处理任何其他消息

consumer.Received += async (model, ea) =>
{
...
};

结束了。

最佳答案

经过一番搜索,我发现 EasyNetQ (SubscribeAsync) 正是我所需要的。该库使 Rabbit MQ 的使用变得更加容易。

关于c# - RabbitMQ 异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49393058/

相关文章:

c# - 尝试捕获最后的问题

c# - 以锯齿形顺序显示Excel数据?

.net-core - ReSharper 2018.1.2 NUnit TestCaseSource (.NET Core/VS2017)

c++ - .NET Core 5.0 程序集能否使用面向 .NET Framework 4.8 的程序集?

c# - dotnet : Works from Windows 10 command prompt, 但不在 VSCode 中

c# - 对象引用未设置为对象错误的实例

c# - BindAttribute 如何将数据库列绑定(bind)到程序中?

python - RabbitMQ 中的重新排队顺序是什么?

c# - 将消息插入到 RabbitMQ 中的特定队列

c++ - 具有最小依赖性的跨平台 C/C++ RabbitMQ 库