c# - RabbitMQ 异步

我想在消费者服务中处理 RabbitMQ 队列。当我尝试遵循教程时,我可以看到它逐条处理消息。但是,如果某些消息的处理需要更长的时间(例如更长的数据库响应)怎么办?然后它不会处理任何其他事情。


private async Task ExecuteAsync(CancellationToken cancelationToken)
    Random random = new Random();
    var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
        channel.QueueDeclare(queue: "task_queue",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 30, global: false);

        Console.WriteLine(" [*] Waiting for messages.");

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);

            // Is it possible to write following part somehow,
            // 1) so that following task can be awaited ?
            // 2) so I doesn't have to use .ContinueWith ?
            #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
            Task.Run(async () =>
                await Task.Delay(random.Next(100, 5000), cancelationToken);

                Console.WriteLine(" [x] Received {0}", message);
            }).ContinueWith((prevTask) =>
                if (!prevTask.IsFaulted)
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

            #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed


        channel.BasicConsume(queue: "task_queue",
                                autoAck: false,
                                consumer: consumer);

        while (!cancelationToken.IsCancellationRequested)
            await Task.Delay(100, cancelationToken);


如果我等待 Task.Run,​​那么在此之前它不会处理任何其他消息

consumer.Received += async (model, ea) =>



经过一番搜索,我发现 EasyNetQ (SubscribeAsync) 正是我所需要的。该库使 Rabbit MQ 的使用变得更加容易。

