我有两个使用 Rebus 的控制台应用程序。它们都引用定义消息(命令和事件)的程序集。 控制台应用程序“A”发送命令并监听事件以进行记录(例如:发送 CreateTCommand 并监听 TCreatedEvent)。控制台应用程序“B”(实际上是 ASP.NET Core 应用程序)监听命令并处理它们(例如:由 CreateTCommand 启动 saga,创建聚合并引发 TCreatedEvent)。在应用程序“B”进程内的另一个 DLL 中,有一个 TCreatedEvent 的处理程序。
因此,我有一个由应用程序“A”发送的创建命令和两个用于创建事件的处理程序,一个在应用程序“A”中,一个在应用程序“B”中。
问题:当我第一次从应用程序“A”发送命令时,应用程序“B”引发创建的事件,该事件在同一进程中触发处理程序。处理程序 在应用程序“A”中未触发。 来自应用程序“A”的进一步命令始终由应用程序“B”中的传奇处理,但创建的事件永远不会再次命中该过程中的处理程序,而是由应用程序“A”处理! 有时(我不明白如何重现)来自应用程序“A”的命令不会被应用程序“B”中的传奇处理(我在 MSMQ 的错误队列中找到该命令,但出现异常“无法调度带有 Id 的消息”任何处理程序”)。 有时(非常罕见)两个处理员都被击中。但我无法一致地重现该行为......
我对此的感受(对 Rebus 几乎一无所知,这对我来说很新):
- 这可能是并发问题吗?我的意思是:Rebus 配置为将订阅存储在进程外部(使用 SQL 或 Mongo,问题不会消失),所以我认为可能第一个处理程序太快,并在第二个处理程序之前将事件标记为已处理调用
- 检查订阅 SQL 表,我发现 5 行(我在代码中订阅的每种事件类型(在应用程序启动中使用bus.Subscribe())具有相同的地址(队列名称链接到我的本地计算机名称)。只有一个地址并且有 2 个进程试图使用它,这会是一个问题吗?
两个应用程序中 Rebus 的配置代码相同,如下所示:
const string inputQueueAddress = "myappqueue";
var mongoClient = new MongoClient("mongodb://localhost:27017");
var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence");
var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services))
.Logging(l => l.Trace())
.Routing(r => r.TypeBased()
.MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress)
.MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress)
)
.Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions"))
.Sagas(s => s.StoreInMongoDb(mongoDatabase))
.Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts"))
.Transport(t => t.UseMsmq(inputQueueAddress));
var bus = config.Start();
bus.Subscribe<AlertDefinitionCreatedEvent>();
bus.Subscribe<AlertStateAddedEvent>();
bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>();
services.AddSingleton(bus);
services.AutoRegisterHandlersFromThisAssembly();
我希望有人能帮忙,这让我发疯......
p.s.:将 isCentralized: true 传递给 subscription.StoreInMongoDb() 时也存在该问题。
编辑1:我添加了控制台日志记录,您可以看到这种奇怪的行为: https://postimg.org/image/czz5lchp9/
第一个命令发送成功。它由传奇处理,事件触发控制台应用程序“A”中的处理程序。 Rebus 说第二个命令没有分派(dispatch)给任何处理程序,但它实际上是由 saga 处理的(我跟踪了调试中的代码),并且该事件是由应用程序“B”而不是“A”中的处理程序处理的......为什么? ;(
编辑2:我正在调试Rebus的源代码。我注意到在 ThreadPoolWorker.cs 中,方法 TryAsyncReceive
async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation)
{
try
{
using (parallelOperation)
using (var context = new DefaultTransactionContext())
{
var transportMessage = await ReceiveTransportMessage(token, context);
if (transportMessage == null)
{
context.Dispose();
// no need for another thread to rush in and discover that there is no message
//parallelOperation.Dispose();
_backoffStrategy.WaitNoMessage();
return;
}
_backoffStrategy.Reset();
await ProcessMessage(context, transportMessage);
}
}
应用程序“B”发布 TCreatedEvent 后,应用程序“A”中的代码到达 等待 ProcessMessage(上下文,传输消息) 其中 TransportMessage 是实际事件。这行代码在应用程序“B”的进程中未到达。似乎消息的第一个接收者将其从 MSMQ 队列中删除。正如我所说,我对 Rebus 和总线都很陌生,但如果这种行为是按照设计的,我很困惑......多个进程中的多个总线如何监听同一个队列???
最佳答案
永远不应该让两个总线实例从同一队列接收消息,除非它们是同一端点的多个实例。
当两个进程使用同一个输入队列时,它们会互相获取消息。如果您正在实现competing consumers pattern,这绝对没问题。 ,使用它在工作进程集群之间均匀分配工作,但不能在多个不同端点之间共享输入队列。
我的猜测是,如果您让每个总线实例使用自己的输入队列,一切看起来都会更加可预测;)
Now you're telling me these handlers need to live inside the main app
不,我不是:)我告诉您,如果您让两个不同的应用程序抢夺彼此的消息,您将得到不可预测的结果。
虽然所有处理程序都位于同一个总线实例中(因此由来自同一队列的消息调用)是完全可以的,但最常见的情况是您将按照您想要的方式拆分应用程序发展系统。
这样您就可以一次更新一个应用程序,避免大的“停止世界”更新。
您可以通过启动多个端点来实现此目的,每个端点使用自己的队列,然后在端点之间路由消息以进行通信。
考虑一个场景,您想要向命令处理器发送命令。命令处理器是一个 Rebus 端点,它从 command_processor
队列获取消息。
在发送者端,您将配置一个“端点映射”(您可以在 the routing section on the Rebus wiki 中阅读更多相关信息,如下所示:
Configure.With(...)
.Transport(t => t.UseMsmq("sender"))
.Routing(r => {
r.TypeBased()
.Map<TheCommand>("command_processor");
})
.Start();
这将使发件人能够简单地走
await bus.Send(new TheCommand(...));
然后总线就会知道将命令消息传递到哪个队列。
我希望这样能更清楚:)
请注意,这是 point-to-point messaging 的一个非常简单的案例其中一个端点发送一条消息,该消息旨在由另一个端点使用。 Rebus 还可以帮助您使用其他几种模式,例如request/reply和 publish/subscribe .
关于c# - Rebus:2 个进程中的 2 个处理程序。击球不一致且交替,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40680346/