c# - Rebus:2 个进程中的 2 个处理程序。击球不一致且交替

标签 c# handler bus rebus

我有两个使用 Rebus 的控制台应用程序。它们都引用定义消息(命令和事件)的程序集。 控制台应用程序“A”发送命令并监听事件以进行记录(例如:发送 CreateTCommand 并监听 TCreatedEvent)。控制台应用程序“B”(实际上是 ASP.NET Core 应用程序)监听命令并处理它们(例如:由 CreateTCommand 启动 saga,创建聚合并引发 TCreatedEvent)。在应用程序“B”进程内的另一个 DLL 中,有一个 TCreatedEvent 的处理程序。

因此,我有一个由应用程序“A”发送的创建命令和两个用于创建事件的处理程序,一个在应用程序“A”中,一个在应用程序“B”中。

问题:当我第一次从应用程序“A”发送命令时,应用程序“B”引发创建的事件,该事件在同一进程中触发处理程序。处理程序 在应用程序“A”中未触发。 来自应用程序“A”的进一步命令始终由应用程序“B”中的传奇处理,但创建的事件永远不会再次命中该过程中的处理程序,而是由应用程序“A”处理! 有时(我不明白如何重现)来自应用程序“A”的命令不会被应用程序“B”中的传奇处理(我在 MSMQ 的错误队列中找到该命令,但出现异常“无法调度带有 Id 的消息”任何处理程序”)。 有时(非常罕见)两个处理员都被击中。但我无法一致地重现该行为......

我对此的感受(对 Rebus 几乎一无所知,这对我来说很新):

  • 这可能是并发问题吗?我的意思是:Rebus 配置为将订阅存储在进程外部(使用 SQL 或 Mongo,问题不会消失),所以我认为可能第一个处理程序太快,并在第二个处理程序之前将事件标记为已处理调用
  • 检查订阅 SQL 表,我发现 5 行(我在代码中订阅的每种事件类型(在应用程序启动中使用bus.Subscribe())具有相同的地址(队列名称链接到我的本地计算机名称)。只有一个地址并且有 2 个进程试图使用它,这会是一个问题吗?

两个应用程序中 Rebus 的配置代码相同,如下所示:

        const string inputQueueAddress = "myappqueue";
        var mongoClient = new MongoClient("mongodb://localhost:27017");
        var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence");
        var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services))
            .Logging(l => l.Trace())
            .Routing(r => r.TypeBased()
                .MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress)
                .MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress)
            )
            .Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions"))
            .Sagas(s => s.StoreInMongoDb(mongoDatabase))
            .Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts"))
            .Transport(t => t.UseMsmq(inputQueueAddress));

        var bus = config.Start();
        bus.Subscribe<AlertDefinitionCreatedEvent>();
        bus.Subscribe<AlertStateAddedEvent>();
        bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>();
        services.AddSingleton(bus);

        services.AutoRegisterHandlersFromThisAssembly();

我希望有人能帮忙,这让我发疯......

p.s.:将 isCentralized: true 传递给 subscription.StoreInMongoDb() 时也存在该问题。

编辑1:我添加了控制台日志记录,您可以看到这种奇怪的行为: https://postimg.org/image/czz5lchp9/

第一个命令发送成功。它由传奇处理,事件触发控制台应用程序“A”中的处理程序。 Rebus 说第二个命令没有分派(dispatch)给任何处理程序,但它实际上是由 saga 处理的(我跟踪了调试中的代码),并且该事件是由应用程序“B”而不是“A”中的处理程序处理的......为什么? ;(

编辑2:我正在调试Rebus的源代码。我注意到在 ThreadPoolWorker.cs 中,方法 TryAsyncReceive

    async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation)
    {
        try
        {
            using (parallelOperation)
            using (var context = new DefaultTransactionContext())
            {
                var transportMessage = await ReceiveTransportMessage(token, context);

                if (transportMessage == null)
                {
                    context.Dispose();

                    // no need for another thread to rush in and discover that there is no message
                    //parallelOperation.Dispose();

                    _backoffStrategy.WaitNoMessage();
                    return;
                }

                _backoffStrategy.Reset();

                await ProcessMessage(context, transportMessage);
            }
        }

应用程序“B”发布 TCreatedEvent 后,应用程序“A”中的代码到达 等待 ProcessMessage(上下文,传输消息) 其中 TransportMessage 是实际事件。这行代码在应用程序“B”的进程中未到达。似乎消息的第一个接收者将其从 MSMQ 队列中删除。正如我所说,我对 Rebus 和总线都很陌生,但如果这种行为是按照设计的,我很困惑......多个进程中的多个总线如何监听同一个队列???

最佳答案

永远不应该让两个总线实例从同一队列接收消息,除非它们是同一端点的多个实例

当两个进程使用同一个输入队列时,它们会互相获取消息。如果您正在实现competing consumers pattern,这绝对没问题。 ,使用它在工作进程集群之间均匀分配工作,但不能在多个不同端点之间共享输入队列。

我的猜测是,如果您让每个总线实例使用自己的输入队列,一切看起来都会更加可预测;)

Now you're telling me these handlers need to live inside the main app

不,我不是:)我告诉您,如果您让两个不同的应用程序抢夺彼此的消息,您将得到不可预测的结果。

虽然所有处理程序都位于同一个总线实例中(因此由来自同一队列的消息调用)是完全可以的,但最常见的情况是您将按照您想要的方式拆分应用程序发展系统。

这样您就可以一次更新一个应用程序,避免大的“停止世界”更新。

您可以通过启动多个端点来实现此目的,每个端点使用自己的队列,然后在端点之间路由消息以进行通信。

考虑一个场景,您想要向命令处理器发送命令。命令处理器是一个 Rebus 端点,它从 command_processor 队列获取消息。

在发送者端,您将配置一个“端点映射”(您可以在 the routing section on the Rebus wiki 中阅读更多相关信息,如下所示:

Configure.With(...)
    .Transport(t => t.UseMsmq("sender"))
    .Routing(r => {
        r.TypeBased()
            .Map<TheCommand>("command_processor");
    })
    .Start();

这将使发件人能够简单地走

await bus.Send(new TheCommand(...));

然后总线就会知道将命令消息传递到哪个队列。

我希望这样能更清楚:)

请注意,这是 point-to-point messaging 的一个非常简单的案例其中一个端点发送一条消息,该消息旨在由另一个端点使用。 Rebus 还可以帮助您使用其他几种模式,例如request/replypublish/subscribe .

关于c# - Rebus:2 个进程中的 2 个处理程序。击球不一致且交替,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40680346/

相关文章:

c# - Azure 搜索服务顶部值超过 1000

c# - RenderTexture 到 Texture2D 非常慢

c++使用字符串句柄对对象进行通用存储和检索

java - 使用 SolrJ 时,我可以将它指向请求处理程序吗?

python - 如何处理 balebot 中的 receipt_message

javascript - location.href 的回调函数

c# - 00 :00 Time Format > 24 的正则表达式

c++ - 调用删除时的总线错误 (C++)

c# - 在 Windows 机器上用 C# 从 SMBus 读取值的简单方法?

c - 来自命令行参数的输入有效,但来自内部 char* 的输入给出总线错误。不应该是一样的吗?