rabbitmq - 使用 RabbitMQ 时为每个远程方法创建一个队列?

标签 rabbitmq rpc mq

让我们暂时接受在消息队列(如 RabbitMQ)上实现 RPC 并不是一个可怕的想法——有时在与遗留系统交互时可能需要它。

在 RPC over RabbitMQ 的情况下,客户端向代理发送消息,代理将消息路由到工作人员,工作人员通过代理将结果返回给客户端。但是,如果工作人员实现了多个远程方法,则需要以某种方式将不同的调用路由到不同的监听器。

在这种情况下,一般做法是什么?所有 RPC over MQ 示例仅显示一种远程方法。将方法名称设置为路由规则/队列名称会很好也很容易,但我不知道这是否是正确的方法。

最佳答案

Let's just accept for a moment that it is not a horrible idea to implement RPC over message queues (like RabbitMQ)



这一点都不可怕!它很常见,并且在许多情况下都被推荐 - 不仅仅是遗留集成。

...好吧,现在你的实际问题:)

从非常高的角度来看,这是您需要做的。

您的请求和响应需要包含两个关键信息:
  • 一个 correlation-id
  • 一个 reply-to排队

  • 这些信息将允许您关联原始请求和响应。

    在您发送请求之前

    让您的请求代码为自己创建一个独占队列。此队列将用于接收回复。

    创建一个新的关联 id - 通常是一个 GUID 或 UUID 以保证唯一性。

    发送请求时

    将您生成的相关 ID 附加到消息属性。有一个correlationId您应该为此使用的属性。

    将关联 ID 与请求的关联回调函数(回复处理程序)存储在发出请求的代码内部的某个位置。收到回复时,您将需要这样做。

    将您创建的独占队列的名称附加到 replyTo消息的属性,以及。

    完成所有这些后,您可以通过rabbitmq发送消息

    回复的时候

    回复码需要同时使用correlationIdreplyTo原始消息中的字段。所以一定要捕获那些

    回复应直接发送至replyTo队列。不要通过交换使用标准发布。相反,使用您正在使用的任何库的“发送到队列”功能将回复消息直接发送到队列,并将响应直接发送到 replyTo队列。

    请务必包含 correlationId在回应中,也是如此。这是回答您问题的关键部分

    处理回复时

    发出原始请求的代码将收到来自 replyTo 的消息。队列。然后它将拉动correlationId出消息属性。

    使用相关 id 来查找请求的回调方法......处理响应的代码。将消息传递给这个回调方法,你就完成了。

    实现细节

    从高层次的角度来看,这是可行的。当您深入研究代码时,实现细节将根据您使用的语言和驱动程序/库而有所不同。

    对于任何给定语言,大多数好的 RabbitMQ 库都将内置请求/响应。如果你没有,你可能想寻找一个不同的图书馆。除非您在 AMQP 协议(protocol)之上编写基于模式的库,否则您应该寻找一个为您实现了通用模式的库。

    如果您需要有关请求/回复模式的更多信息,包括我在此处提供的所有详细信息(以及更多信息),请查看以下资源:
  • 我自己的RabbitMQ Patterns电子邮件类(class)/电子书
  • RabbitMQ Tutorials
  • Enterprise Integration Patterns - 一定要购买完整的描述/实现模式的书。这本书值得拥有

  • 如果您使用 Node.js,我建议使用 wascally库,其中包括您需要的请求/回复功能。对于 Ruby,请查看 bunny .对于 Java 或 .NET,请查看周围的许多服务总线实现中的一些。在 .NET 中,我推荐 NServiceBus 或 MassTransit。

    关于rabbitmq - 使用 RabbitMQ 时为每个远程方法创建一个队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31687652/

    相关文章:

    queue - 队列发送者如何知道消费者崩溃了?

    rabbitmq - 如何使用spring-rabbitmq将消息标记为持久?

    rabbitmq - 通过ajax使用消息队列进行同步rpc调用是否合适

    java - Apache Pulsar Reader 仍然可以获取应删除的消息

    java - MQDestination 覆盖记帐 token 值

    c# - RabbitMQ C# API : How to check if a binding exists?

    linux - 为什么 rpcbind 每次重启都会打开一个新的不同的端口?

    java - 轻松从 RPC/编码的 WSDL 生成 Spring WS 客户端?

    java - Android中如何正确升级AIDL接口(interface)?

    c# - 读取和写入 header 消息 MQ C#