我想在消费者服务中处理 RabbitMQ 队列。当我尝试遵循教程时,我可以看到它逐条处理消息。但是,如果某些消息的处理需要更长的时间(例如更长的数据库响应)怎么办?然后它不会处理任何其他事情。
我想让它异步。因此它可以在等待期间处理其他消息。我尝试了这段代码,它有效,但在我看来它并不正确(没有等待任务,然后是ContinueWith):
private async Task ExecuteAsync(CancellationToken cancelationToken)
{
Random random = new Random();
var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 30, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// Is it possible to write following part somehow,
// 1) so that following task can be awaited ?
// 2) so I doesn't have to use .ContinueWith ?
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
await Task.Delay(random.Next(100, 5000), cancelationToken);
Console.WriteLine(" [x] Received {0}", message);
}).ContinueWith((prevTask) =>
{
if (!prevTask.IsFaulted)
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
while (!cancelationToken.IsCancellationRequested)
{
await Task.Delay(100, cancelationToken);
}
}
}
如果我等待 Task.Run,那么在此之前它不会处理任何其他消息
consumer.Received += async (model, ea) =>
{
...
};
结束了。
最佳答案
经过一番搜索,我发现 EasyNetQ (SubscribeAsync) 正是我所需要的。该库使 Rabbit MQ 的使用变得更加容易。
关于c# - RabbitMQ 异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49393058/