我一直在寻找一种轻量级的,正在处理的异步消息总线,并且遇到了TPL Dataflow。
我当前的实现如下(在https://gist.github.com/4416655上的完整示例)。
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
关于在消息传递场景中使用TPL Dataflow的一些一般性问题。
BroadcastBlock<T>
同时将消息发送到多个处理程序的来源?这是我根据this post得出的结论。 BroadcastBlock<T>
实例。处理大量邮件时,这可能会引起问题吗?是否应该为每种消息类型创建一个单独的实例? BroadcastBlock<T>
始终存储最后发送的项目。这意味着任何新的订阅(链接)将自动传递给该消息。可以更改此行为(新订阅应仅接收新消息)。 // Subscribe to Message type
var subscription1 = bus.Subscribe<Message>(async m => {
await Task.Delay(2000);
Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
});
发送消息时,我希望每条消息以2s的增量逐一输出到控制台。而是在2秒后立即输出所有消息。我假设这是由于底层调度程序执行的并行性引起的,但我很好奇如何更改这些设置(设置
MaxDegreeOfParallelism = 1
没什么区别)。 SendAsync
允许我等待消息的发送,但不允许我等待目标的完成(ActionBlock<T>
)。我以为PropagateCompletion
会这样做,但事实并非如此。理想情况下,我想知道消息的所有处理程序何时执行。 更新
我没有通过
Task.Delay
获得预期的行为的原因是,这延迟了每个处理程序的执行,而不是所有处理程序的处理。 Thread.Sleep
是我所需要的。
最佳答案
在回答了您的问题之后(见下文),我意识到使用TPL Dataflow块对您的设计进行建模可能不是一个好主意。 TDF非常适合通过很大程度上独立的块来处理消息,而没有内置的跟踪单个消息的方式。但这就是您想要的:按处理程序顺序处理消息,并跟踪每条消息的完成情况。
因此,我认为您不应创建整个数据流网络,而应使用单个ActionBlock
作为异步消息处理器:
public class Bus
{
class Subscription
{
public Guid Id { get; private set; }
public Func<object, Task> HandlerAction { get; private set; }
public Subscription(Guid id, Func<object, Task> handlerAction)
{
Id = id;
HandlerAction = handlerAction;
}
}
private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();
private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;
public Bus()
{
// subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
var subscriptions = new List<Subscription>();
m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
async tuple =>
{
var message = tuple.Item1;
var completedAction = tuple.Item2;
// could be made more efficient, probably doesn't matter
Guid idToUnsubscribe;
while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
{
subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
}
Subscription handlerToSubscribe;
while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
{
subscriptions.Add(handlerToSubscribe);
}
foreach (var subscription in subscriptions)
{
await subscription.HandlerAction(message);
}
completedAction();
});
}
public Task SendAsync<TMessage>(TMessage message)
{
var tcs = new TaskCompletionSource<bool>();
Action completedAction = () => tcs.SetResult(true);
m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));
return tcs.Task;
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
return Subscribe<TMessage>(
message =>
{
handlerAction(message);
// we need a completed non-generic Task; this is a simple, efficient way to get it
// another option would be to use async lambda with no await,
// but that's less efficient and produces a warning
return Task.FromResult(false);
});
}
public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
{
Func<object, Task> actionWithCheck = async message =>
{
if (message is TMessage)
await handlerAction((TMessage)message);
};
var id = Guid.NewGuid();
m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
return id;
}
public void Unsubscribe(Guid subscriptionId)
{
m_idsToUnsubscribe.Enqueue(subscriptionId);
}
}
(我决定使用队列进行订阅和取消订阅,以便在处理消息时处理程序列表不会发生变化。)
您的问题的答案
Is
BroadcastBlock<T>
the recommended source for sending messages to multiple handlers concurrently?
是的,乍一看,听起来像
BroadcastBlock<T>
是您想要的。当然,TPL Dataflow中没有直接相似的块。In my implementation I'm using a single BroadcastBlock instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?
使用所有消息类型的单个块,您可以在单个线程上完成更多工作(发送到所有处理程序)。对于每种消息类型,只有一个块,您的工作量就会减少(仅发送给正确的处理程序),而这些工作可以在多个线程上执行。因此,我认为可以合理地假设后者会更快。
但是请不要忘记应用程序性能优化的规则:首先,编写简单易读的代码。仅当事实证明它实际上很慢时,才尝试对其进行优化。而且,在比较两个选择时,请始终使用概要分析来找出哪个选择实际上更快,而不仅仅是猜测哪个选择应该更快。
BroadcastBlock<T>
always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages)?
不,没有方法可以配置
BroadcastBlock<T>
来做到这一点。如果不需要BroadcastBlock<T>
的所有功能(发送到具有有限容量的块,这些块可能暂时已满,支持将非贪婪的块作为目标),则可能需要编写自定义版本的BroadcastBlock<T>
来执行此操作。When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting
MaxDegreeOfParallelism = 1
made no difference).
TDF的要点之一是每个块都是独立的,因此多个块可以在多个线程上执行。如果那不是您想要的,那么可能对每个处理程序使用单独的
ActionBlock<T>
可能不是最佳解决方案。实际上,TDF可能根本不是最好的解决方案。另外,
Subscribe()
接受Action<TMessage>
,这意味着您的lambda将被编译为async void
方法。这些仅应在没有其他选择的特定(且相对罕见)的情况下使用。如果要支持async
处理程序,则应接受async Task
方法,即Func<TMessage, Task>
。The reason I was not getting the expected behaviour with
Task.Delay
is that this was delaying the execution of each handler, not the processing of all handlers.Thread.Sleep
was what I needed.
使用
Thread.Sleep()
违反了整个异步的想法,如果可能的话,您不应该使用它。另外,我认为它实际上并没有按照您想要的方式工作:它为每个线程引入了延迟,但是TPL Dataflow将使用多个线程,因此这不会达到您的预期。Finally, whilst
SendAsync
allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (theActionBlock<T>
). I thought that this is whatPropagateCompletion
would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.
PropagateCompletion
以及Complete()
和Completion
用于处理整个块,而不是处理单个消息。造成这种情况的一个原因是数据流网络更加复杂,可能尚不清楚何时确切地处理了一条消息。例如,如果一条消息已经发送到BroadcastBlock<T>
的所有当前目标,但是也将发送到所有新添加的目标,是否应该认为它是完整的?如果要执行此操作,则可能必须使用
TaskCompletionSource
手动进行某种方式。
关于.net - 使用TPL Dataflow创建消息总线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14096614/