.net - 使用TPL Dataflow创建消息总线

标签 .net task-parallel-library tpl-dataflow

我一直在寻找一种轻量级的,正在处理的异步消息总线,并且遇到了TPL Dataflow。

我当前的实现如下(在https://gist.github.com/4416655上的完整示例)。

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))
        {
            subscription.Dispose();
        }
    }

    private Guid AddSubscription(IDisposable subscription)
    {
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;
    }
}

关于在消息传递场景中使用TPL Dataflow的一些一般性问题。
  • 是否建议使用BroadcastBlock<T>同时将消息发送到多个处理程序的来源?这是我根据this post得出的结论。
  • 在我的实现中,我对所有消息类型都使用一个BroadcastBlock<T>实例。处理大量邮件时,这可能会引起问题吗?是否应该为每种消息类型创建一个单独的实例?
  • BroadcastBlock<T>始终存储最后发送的项目。这意味着任何新的订阅(链接)将自动传递给该消息。可以更改此行为(新订阅应仅接收新消息)。
  • 在我的测试应用程序中,我在第一个处理程序中引入了一个延迟:
        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
        });
    

    发送消息时,我希望每条消息以2s的增量逐一输出到控制台。而是在2秒后立即输出所有消息。我假设这是由于底层调度程序执行的并行性引起的,但我很好奇如何更改这些设置(设置MaxDegreeOfParallelism = 1没什么区别)。
  • 最后,虽然SendAsync允许我等待消息的发送,但不允许我等待目标的完成(ActionBlock<T>)。我以为PropagateCompletion会这样做,但事实并非如此。理想情况下,我想知道消息的所有处理程序何时执行。

  • 更新

    我没有通过Task.Delay获得预期的行为的原因是,这延迟了每个处理程序的执行,而不是所有处理程序的处理。 Thread.Sleep是我所需要的。

    最佳答案

    在回答了您的问题之后(见下文),我意识到使用TPL Dataflow块对您的设计进行建模可能不是一个好主意。 TDF非常适合通过很大程度上独立的块来处理消息,而没有内置的跟踪单个消息的方式。但这就是您想要的:按处理程序顺序处理消息,并跟踪每条消息的完成情况。

    因此,我认为您不应创建整个数据流网络,而应使用单个ActionBlock作为异步消息处理器:

    public class Bus
    {
        class Subscription
        {
            public Guid Id { get; private set; }
            public Func<object, Task> HandlerAction { get; private set; }
    
            public Subscription(Guid id, Func<object, Task> handlerAction)
            {
                Id = id;
                HandlerAction = handlerAction;
            }
        }
    
        private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
        private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();
    
        private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;
    
        public Bus()
        {
            // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
            var subscriptions = new List<Subscription>();
    
            m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
                async tuple =>
                {
                    var message = tuple.Item1;
                    var completedAction = tuple.Item2;
    
                    // could be made more efficient, probably doesn't matter
                    Guid idToUnsubscribe;
                    while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                    {
                        subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
                    }
    
                    Subscription handlerToSubscribe;
                    while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
                    {
                        subscriptions.Add(handlerToSubscribe);
                    }
    
                    foreach (var subscription in subscriptions)
                    {
                        await subscription.HandlerAction(message);
                    }
    
                    completedAction();
                });
        }
    
        public Task SendAsync<TMessage>(TMessage message)
        {
            var tcs = new TaskCompletionSource<bool>();
            Action completedAction = () => tcs.SetResult(true);
    
            m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));
    
            return tcs.Task;
        }
    
        public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
        {
            return Subscribe<TMessage>(
                message =>
                {
                    handlerAction(message);
                    // we need a completed non-generic Task; this is a simple, efficient way to get it
                    // another option would be to use async lambda with no await,
                    // but that's less efficient and produces a warning
                    return Task.FromResult(false);
                });
        }
    
        public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
        {
            Func<object, Task> actionWithCheck = async message =>
            {
                if (message is TMessage)
                    await handlerAction((TMessage)message);
            };
    
            var id = Guid.NewGuid();
            m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
            return id;
        }
    
        public void Unsubscribe(Guid subscriptionId)
        {
            m_idsToUnsubscribe.Enqueue(subscriptionId);
        }
    }
    

    (我决定使用队列进行订阅和取消订阅,以便在处理消息时处理程序列表不会发生变化。)

    您的问题的答案

    Is BroadcastBlock<T> the recommended source for sending messages to multiple handlers concurrently?



    是的,乍一看,听起来像BroadcastBlock<T>是您想要的。当然,TPL Dataflow中没有直接相似的块。

    In my implementation I'm using a single BroadcastBlock instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?



    使用所有消息类型的单个块,您可以在单个线程上完成更多工作(发送到所有处理程序)。对于每种消息类型,只有一个块,您的工作量就会减少(仅发送给正确的处理程序),而这些工作可以在多个线程上执行。因此,我认为可以合理地假设后者会更快。

    但是请不要忘记应用程序性能优化的规则:首先,编写简单易读的代码。仅当事实证明它实际上很慢时,才尝试对其进行优化。而且,在比较两个选择时,请始终使用概要分析来找出哪个选择实际上更快,而不仅仅是猜测哪个选择应该更快。

    BroadcastBlock<T> always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages)?



    不,没有方法可以配置BroadcastBlock<T>来做到这一点。如果不需要BroadcastBlock<T>的所有功能(发送到具有有限容量的块,这些块可能暂时已满,支持将非贪婪的块作为目标),则可能需要编写自定义版本的BroadcastBlock<T>来执行此操作。

    When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting MaxDegreeOfParallelism = 1 made no difference).



    TDF的要点之一是每个块都是独立的,因此多个块可以在多个线程上执行。如果那不是您想要的,那么可能对每个处理程序使用单独的ActionBlock<T>可能不是最佳解决方案。实际上,TDF可能根本不是最好的解决方案。

    另外,Subscribe()接受Action<TMessage>,这意味着您的lambda将被编译为async void方法。这些仅应在没有其他选择的特定(且相对罕见)的情况下使用。如果要支持async处理程序,则应接受async Task方法,即Func<TMessage, Task>

    The reason I was not getting the expected behaviour with Task.Delay is that this was delaying the execution of each handler, not the processing of all handlers. Thread.Sleep was what I needed.



    使用Thread.Sleep()违反了整个异步的想法,如果可能的话,您不应该使用它。另外,我认为它实际上并没有按照您想要的方式工作:它为每个线程引入了延迟,但是TPL Dataflow将使用多个线程,因此这不会达到您的预期。

    Finally, whilst SendAsync allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (the ActionBlock<T>). I thought that this is what PropagateCompletion would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.


    PropagateCompletion以及Complete()Completion用于处理整个块,而不是处理单个消息。造成这种情况的一个原因是数据流网络更加复杂,可能尚不清楚何时确切地处理了一条消息。例如,如果一条消息已经发送到BroadcastBlock<T>的所有当前目标,但是也将发送到所有新添加的目标,是否应该认为它是完整的?

    如果要执行此操作,则可能必须使用 TaskCompletionSource 手动进行某种方式。

    关于.net - 使用TPL Dataflow创建消息总线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14096614/

    相关文章:

    c# - 如何从 Span<T> 创建 Memory<T>?

    c# - 多次调用异步方法

    .net - .net 4 中添加的所有并发数据结构的列表?

    c# - ShowDialog 中的 Catel async wait 命令 - 死锁

    c# - 处理大量文件

    c# - 多对多 TPL 数据流不处理所有输入

    c# - 自定义 ActionBlock<T>

    c# - Saxon 9 Compiled Transform 的线程安全

    c# - 特定 url 模式的正则表达式

    c# - MySql 从列中选择所有数据并计数