microservices - CQRS - 乱序消息

标签 microservices distributed-computing cqrs event-sourcing

假设我们有 3 个不同的服务产生事件,每个服务都发布到自己的事件存储。

这些服务中的每一个都使用其他生产者服务事件。
这是因为每个服务都必须处理另一个服务的事件并创建自己的投影。每个服务都在多个实例上运行。

最直接的方法(对我来说)是在每个 ES 前面放置“某物”,这些 ES 正在挑选事件并将它们发布(发布/订阅)到每个其他服务的队列中。

这是完美的,因为每个服务都可以订阅它喜欢的每个主题,而事件发布者正在做这项工作,如果服务不可用,事件仍然会被传递。在我看来,这可以保证高可扩展性和可用性。

我的问题是队列。我无法获得一个易于扩展的队列来保证消息的排序。它实际上保证了至少一次交付的“轻微故障”:明确地说,它是 AWS SQS。

所以,排序问题是:

  • 来自同一事件流的事件之间没有顺序保证。
  • 来自同一 ES 的事件不保证顺序。
  • 不保证来自不同 ES(不同服务)的事件的顺序。

  • 我虽然可以通过跟踪来自同一个 ES 的事件的“序列号”来解决前两个问题。
    这将通过跟踪我们从中消费事件的每个主题的最后一个序列号来完成
    这应该很容易对事件使用react并构建我们的投影。
    然后,当我从队列中弹出一个事件时,如果 eventSequenceNumber > previousAppliedEventSequenceNumber + 1我将它重新排队(或使其在一段时间内不可见)。

    但事实证明,使用此解决方案,当事件以高速率产生时会破坏性能(我可以使用可见性超时或其他东西,结果应该是相同的)。

    这是因为当我期待事件 10 并且我暂时忽略事件 11 时,我还应该忽略所有事件(来自 ES)以及在事件 11 之后的序列号,直到事件 11 再次出现并得到有效处理。

    其他困难是:
  • 在哪里跟踪事件的序列号以构建投影。
  • 如何跟踪事件的序列号以构建投影,以便在应用它时,我有一个一致的 lastSequenceNumber .

  • 我缺少什么?

    附注:对于第三个问题,请考虑以下场景。我们有一个 UserService和一个 CartService . CartService有一个投影,每个用户可以跟踪购物车中的产品。每个购物车的投影还必须具有来自 UserCreated 的用户名和其他信息。事件发布自 UserService .如 UserCreated紧随其后 ProductAddedToCart正常流程需要抛出异常,因为用户还不存在。

    最佳答案

    What I'm missing?



    您缺少流程 - 消费者从源中提取消息,而不是让源将消息推送给消费者。

    当我醒来时,我会检查我的书签以找出我最后阅读了您的哪些信息,然后问您是否有任何此后。如果有,我会按顺序从您那里检索它们(想想“文档消息”),同时记下新书签。然后我回去 sleep 。

    推送通知的主要目的是中断 sleep 期(从而减少延迟)。

    将 SQS 用作队列,其想法是您一次读取所有排队的消息。如果没有间隙,那么您可以订购集合,然后开始处理它们并确认它们。如果有间隙,您要么等待(将消息留在队列中),要么去事件存储区获取丢失消息的副本。

    没有什么魔法——如果消息管道 promise “至少一次”传递,那么消费者必须采取措施在重复消息到达时识别它们。

    If UserCreated comes after ProductAddedToCart the normal flow requires to throw an exception because the user doesn't exist yet.



    评论 Race Conditions Don't Exist ,作者 Udi Dahan:“时间上的微秒差异不应该对核心业务行为产生影响。”

    关于microservices - CQRS - 乱序消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53270770/

    相关文章:

    translation - CQRS、事件溯源和翻译的应用程序

    azure - CQRS:读取端 ACID

    azure - 使用 Azure 应用程序配置在微服务环境中进行动态配置

    docker - 这是在多主机网络上的生产中部署 spring boot cloud netflix 的正确方法吗?

    node.js - 我应该转向基于微服务的架构吗?

    security - Erlang通过cookie的安全性是否足够?

    open-source - 云虚拟机可免费用于开源测试?

    domain-driven-design - 如何使用 DDD 为 StackOverflow 网站建模

    java - 如何从任何地方访问安装在 Kubernetes 上的服务?

    sql-server - 并行化存储过程是否会在集群上产生更高的性能?