node.js - 乱序处理后的"Resequencing"消息

标签 node.js redis message-queue distributed kue

我正在研究基本上是高度可用的分布式消息传递系统。系统通过HTTP或TCP从某个地方接收消息,对其进行各种转换,然后将其发送到一个或多个目的地(也使用TCP/HTTP)。

系统要求所有发送到给定目标的消息都是按顺序排列的,因为某些消息建立在先前消息的内容之上。这限制了我们按顺序处理消息,每条消息大约需要750毫秒。因此,例如,如果有人每隔250毫秒向我们发送一封邮件,我们将被迫将这些邮件排在后面。最终,这会在高负载下导致消息处理中令人无法忍受的延迟,因为每个消息可能不得不等待其他数百条消息才能轮流处理。

为了解决此问题,我希望能够并行化消息处理,而又不会违反按顺序发送消息的要求。

我们可以轻松地水平扩展我们的处理。丢失的部分是一种确保即使对消息进行乱序处理也可以对其进行“重新排序”并按照接收顺序将其发送到目的地的方法。我正在尝试找到实现这一目标的最佳方法。

Apache Camel的a thing called a Resequencer可以做到这一点,并且它包含一个不错的图表(我没有足够的代表直接嵌入)。这正是我想要的:一种处理乱序消息并将其排序的东西。

但是,我不希望它用Java编写,并且我需要该解决方案具有高可用性(即可以抵抗典型的系统故障(如崩溃或系统重启)),而我认为Apache Camel无法提供该解决方案。

我们的应用程序是用Node.js编写的,带有Redis和Postgresql的数据持久性。我们将Kue库用于我们的消息队列。尽管Kue提供了优先级队列,但是对于上述用例来说,功能集太有限了,因此我认为我们需要一种替代技术来与Kue协同工作,以便对消息进行重新排序。

我正在尝试在线研究此主题,但是找不到所需的信息。似乎有大量文章和实现的分布式体系结构模式类型,但是我看不到那么多。搜索诸如“消息重新排序”,“乱序处理”,“并行化消息处理”等之类的解决方案,会发现大多数解决方案只是放宽了基于分区或主题的“按序”需求。另外,他们谈论的是在一台机器上的并行化。我需要一个解决方案:

  • 可以以任何顺序同时处理多个消息。
  • 将始终按照消息到达系统的顺序发送消息,而不管其处理顺序如何。
  • 可从Node.js使用
  • 可以在高可用性环境中运行(即,它的多个实例一次在同一消息队列上运行,而不会出现不一致的情况。)

  • 我们当前的计划对我有意义,但我无法在网上任何地方找到它的描述,该计划是使用Redis维护一组进行中的和准备发送的消息,并按其到达时间进行排序。大致来说,它是这样的:
  • 收到消息后,该消息将放入正在进行的集中。
  • 消息处理完成后,该消息将放入准备发送的集合中。
  • 只要正在进行的和准备发送的集的前面都存在相同的消息,就可以发送该消息,并且消息会是有序的。

  • 我将编写一个小的Node库,该库使用原子Redis事务使用优先级队列式API来实现此行为。但这只是我自己想到的,所以我想知道:是否有其他技术(理想情况下使用我们已经在使用的Node/Redis堆栈)来解决重新排序无序消息的问题?还是针对这个问题,我可以用其他术语作为研究的关键词吗?谢谢你的帮助!

    最佳答案

    这是的常见问题,因此肯定有许多解决方案可用。这也是一个相当简单的问题,并且是分布式系统领域中的一个很好的学习机会。我建议自己写。

    您将在构建此文件时遇到一些问题,即

    2: Exactly-once delivery
    1: Guaranteed order of messages
    2: Exactly-once delivery



    您已经找到1号,并通过在redis中对其重新排序来解决此问题,这是一个不错的解决方案。但是,另一个没有解决。

    看起来您的体系结构不适合容错,因此,当前,如果服务器崩溃,您可以重新启动它并继续使用。当按顺序处理所有请求时,这可以很好地工作,因为这样可以根据上次成功完成的请求确切地知道崩溃的时间。

    您需要的是一种策略,用于找出您实际完成了哪些请求,哪些请求失败了,或者是当事情崩溃时发送给您的客户的书面赔礼道歉信。

    如果未分片Redis,则它是高度一致的。如果该单个 Node 崩溃,它将失败并可能丢失所有数据,但是对于乱序数据或数据突然出现或消失不存在,您不会有任何问题。因此,单个Redis Node 可以保证,如果将一条消息插入到“待处理集”中,然后再插入“完成集”中,则没有 Node 将在完成集中看到该消息,除非它也位于“完成集”中。过程集。

    我会怎么做

    使用redis似乎过于模糊,假设消息不是很大,并且在进程崩溃时丢失消息是可以的,并且多次运行它们,甚至同时运行单个请求的多个副本,这不是一个错误。问题。

    我建议设置一个主管服务器,该服务器接收传入的请求,将每个请求分发到随机选择的从属服务器,存储响应,然后在发送之前将它们重新放回原处。您说您希望处理过程要花费750毫秒。如果从站在2秒钟之内没有响应,请在0-1秒钟内将其再次随机分配到另一个 Node 。第一个响应是我们将要使用的响应。当心重复的答复。

    如果重试请求也失败,则将最大等待时间加倍。在5次左右的失败之后,每一次等待最多两次(或者是大于前一次的任何倍数),我们可能会遇到永久性错误,因此我们可能应该寻求人工干预。此算法称为指数补偿,可防止请求突然增加而使整个群集崩溃。不使用随机间隔,并且在n秒钟后重试可能会导致每n秒钟进行一次DOS攻击,直到群集消失为止(如果群集获得了足够大的负载峰值)。

    这有很多方法可能会失败,因此请确保不是仅此系统存储数据的地方。但是,这可能在99%以上的时间内都能正常工作,它至少与您当前的系统一样好,并且您可以用几百行代码来实现它。只要确保您的主管正在使用异步请求,就可以处理重试和超时。 Java本质上是单线程的,因此这比正常情况下要复杂一些,但是我相信您可以做到。

    关于node.js - 乱序处理后的"Resequencing"消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39212533/

    相关文章:

    java - Redis Cluster中如何使用Redisson上传字节数组?

    Python。如何从队列/主题 ActiveMQ 中删除任何消息

    php - 您如何搜索具有值的键?例如获取值为 "somevalue"的所有 KEYS

    json - 如何更新 Nodejs/MongoDB 对象数组中的字段

    javascript - 有没有一种好方法可以跨事件发射器/事件循环边界显示生产中的错误痕迹?

    javascript - 在node js中发送没有状态的json响应

    redis - 如何计算 ElastiCache 上的 Redis 内存使用百分比

    multithreading - 如果消息队列关闭,如何在事件驱动微服务中处理?

    java - 如果我想快速失败,ActiveMQ 的最佳连接 URI 是什么?

    javascript - 基于 Promise 的队列