c# - Azure ServiceBus 和异步 - 是,还是不是?

标签 c# multithreading asynchronous servicebus context-switch

我在 Azure 上运行服务总线,每秒传输大约 10-100 条消息

最近我切换到 .net 4.5 并且兴奋地重构了所有代码以在每一行中至少有两次 'async' 和 'await'确保它“正确地”完成了 :)

现在我想知道它实际上是好还是坏。如果您可以查看代码片段并让我知道您的想法。我特别担心 thread context switching 不会给我带来更多的悲伤而不是好处,所有的异步......(看看 !dumpheap 这绝对是一个因素)

只是一些描述 - 我将发布 2 种方法 - 一种在 ConcurrentQueue 上执行 while 循环,等待新消息,另一种方法一次发送一条消息。我还完全按照 Azure 博士的规定使用 transient 故障处理 block 。

发送循环(从头开始,等待新消息):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码来自每秒发送 1 条消息的“发件人”类。我有大约 50-100 个实例在任何给定时间运行,因此它可能有很多线程。

顺便说一句,不要担心 EnsureMessageSender、RecreateMessageFactory、EnsureTopicExists 太多,它们并不经常被调用。

如果我只需要一个后台线程通过消息队列工作并同步发送消息,我是否会更好,只要我一次发送一条消息,而不用担心异步的事情并避免随之而来的开销。

请注意,将一条消息发送到 Azure 服务总线通常只需要几毫秒,并不十分昂贵。 (除非有时它很慢、超时或服务总线后端有问题,否则它可能会在尝试发送内容时挂起一段时间)。

感谢并抱歉发了这么长的帖子,

史蒂夫

建议的解决方案

这个例子能解决我的情况吗?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }

最佳答案

你说:

The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.

这是一个很好的异步案例。你在这里保存了很多线程。异步减少上下文切换,因为它不是基于线程的。它不会在需要等待的情况下进行上下文切换。相反,下一个工作项正在同一个线程上处理(如果有的话)。

因此,您的异步解决方案肯定会比同步解决方案更好地扩展。需要衡量它是否真的在 50-100 个工作流实例中使用更少的 CPU。实例越多,异步越快的可能性就越高。

现在,实现存在一个问题:您使用的 ConcurrentQueue 不是异步就绪的。因此,即使在您的异步版本中,您实际上也确实使用了 50-100 个线程。它们会阻塞(您希望避免)或忙等待消耗 100% 的 CPU(在您的实现中似乎就是这种情况!)。您需要摆脱这个问题并使排队异步。也许 SemaphoreSlim 在这里有帮助,因为它可以异步等待。

关于c# - Azure ServiceBus 和异步 - 是,还是不是?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15612407/

相关文章:

c# - 如何使用 Entity Framework 获取表中的最新行(考虑性能)?

c# - 在数据库中表示 "Recurring Events"的最佳方式是什么?

c# - 打开MediaPlayer-线程中未处理事件

javascript - 如何从异步调用返回响应?

c# - 发送广播包问题

c# - 如何从 SelectionModel 外部的 Ext.Net.GridPanel 中的当前选定行获取值?

c# - 在线程频繁写的情况下,哪种方式更好?

multithreading - Spring批处理多线程处理单个文件到多个文件

java - 调用其他有界上下文的策略

asynchronous - Ember 中基于异步数据的计算属性