假设我们有 3 个不同的服务产生事件,每个服务都发布到自己的事件存储。
这些服务中的每一个都使用其他生产者服务事件。
这是因为每个服务都必须处理另一个服务的事件并创建自己的投影。每个服务都在多个实例上运行。
最直接的方法(对我来说)是在每个 ES 前面放置“某物”,这些 ES 正在挑选事件并将它们发布(发布/订阅)到每个其他服务的队列中。
这是完美的,因为每个服务都可以订阅它喜欢的每个主题,而事件发布者正在做这项工作,如果服务不可用,事件仍然会被传递。在我看来,这可以保证高可扩展性和可用性。
我的问题是队列。我无法获得一个易于扩展的队列来保证消息的排序。它实际上保证了至少一次交付的“轻微故障”:明确地说,它是 AWS SQS。
所以,排序问题是:
我虽然可以通过跟踪来自同一个 ES 的事件的“序列号”来解决前两个问题。
这将通过跟踪我们从中消费事件的每个主题的最后一个序列号来完成
这应该很容易对事件使用react并构建我们的投影。
然后,当我从队列中弹出一个事件时,如果
eventSequenceNumber > previousAppliedEventSequenceNumber + 1
我将它重新排队(或使其在一段时间内不可见)。但事实证明,使用此解决方案,当事件以高速率产生时会破坏性能(我可以使用可见性超时或其他东西,结果应该是相同的)。
这是因为当我期待事件 10 并且我暂时忽略事件 11 时,我还应该忽略所有事件(来自 ES)以及在事件 11 之后的序列号,直到事件 11 再次出现并得到有效处理。
其他困难是:
lastSequenceNumber
. 我缺少什么?
附注:对于第三个问题,请考虑以下场景。我们有一个
UserService
和一个 CartService
. CartService
有一个投影,每个用户可以跟踪购物车中的产品。每个购物车的投影还必须具有来自 UserCreated
的用户名和其他信息。事件发布自 UserService
.如 UserCreated
紧随其后 ProductAddedToCart
正常流程需要抛出异常,因为用户还不存在。
最佳答案
What I'm missing?
您缺少流程 - 消费者从源中提取消息,而不是让源将消息推送给消费者。
当我醒来时,我会检查我的书签以找出我最后阅读了您的哪些信息,然后问您是否有任何此后。如果有,我会按顺序从您那里检索它们(想想“文档消息”),同时记下新书签。然后我回去 sleep 。
推送通知的主要目的是中断 sleep 期(从而减少延迟)。
将 SQS 用作队列,其想法是您一次读取所有排队的消息。如果没有间隙,那么您可以订购集合,然后开始处理它们并确认它们。如果有间隙,您要么等待(将消息留在队列中),要么去事件存储区获取丢失消息的副本。
没有什么魔法——如果消息管道 promise “至少一次”传递,那么消费者必须采取措施在重复消息到达时识别它们。
If UserCreated comes after ProductAddedToCart the normal flow requires to throw an exception because the user doesn't exist yet.
评论 Race Conditions Don't Exist ,作者 Udi Dahan:“时间上的微秒差异不应该对核心业务行为产生影响。”
关于microservices - CQRS - 乱序消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53270770/