java - RabbitMQ 到 BlockingQueue 的绑定(bind)

标签 java queue rabbitmq messaging spring-amqp

我正在开发一个多线程应用程序,其中多个“处理器”(线程池中的可运行对象)相互发送消息。他们使用 BlockingQueue 进行通信接口(interface):当处理器 A任务 T1 已完成,它将其推送到队列 Q1 (例如, BlockingQueue<MyTask> 如果 T1 由类 MyTask 表示);之后,处理器BQ1 提取任务,执行计算并将结果推送至 Q2 ;等等。

我使用LinkedBlockingQueue ,因为我的应用程序是整体式的,所有处理器都“生活”在同一个 JVM 中。但是,我希望我的应用程序变得模块化( Microservice Architecture ),因此我决定使用 RabbitMQ 作为消息代理。

问题是从队列的 java 实现迁移到 RabbitMQ,对客户端源代码进行最小的更改。因此,我尝试在 RabbitMQ 抽象和 BlockingQueue 之间找到某种绑定(bind)。界面。因此,当有人向 amqp 队列发送消息时,它应该出现在 java 队列中。反之亦然:当有人将一个对象推送到 java 队列时,它应该被传播到 amqp 的交换器。

下面提供了轮询的示例实现(来自 amqp 的队列,使用 spring-amqp )。

<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
    LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

    MessageConverter messageConverter = listenerContainer.getMessageConverter();
    listenerContainer.setupMessageListener((MessageListener) message -> {
        Object task = messageConverter.fromMessage(message);
        queue.offer(elementType.cast(task));
    });

    return queue;
}

我找不到实现 BlockingQueue 的框架现在使用 RabbitMQ 队列的接口(interface)。如果这种框架不存在,那么我的想法在架构上是否在某种程度上是错误的,或者只是还没有人实现这个?

最佳答案

我不确定您是否真的想按照您所描述的方式进行操作 - 入站消息将被传递到队列并位于内存中,而不是在 RabbitMQ 中。

我认为一个简单的 BlockingQueue 实现使用下面的 RabbitTemplate 从兔子队列中提取消息(使用 receive()receiveAndConvert()) 可能更适合 take/poll 操作 - 它将在需要时将消息留在 RabbitMQ 中,并且只需使用 RabbitTemplate.convertAndSend() 进行 Offer/put 操作。

虽然非常简单,但它可能是对框架的有用补充;考虑contributing .

关于java - RabbitMQ 到 BlockingQueue 的绑定(bind),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32510029/

相关文章:

java - 将字符串转换为日历。最简单的方法是什么?

java - 使用 Homebrew 软件安装后找不到 opencv jar

swift - DispatchQueue 中的目标参数

C# 将 ReadOnlyMemory<byte> 转换为 byte[]

java - Spring Data REST - 如何在投影中包含计算数据?

Java - 从构造函数填充数组

java - 抛出 NullPointerException 并且程序未按预期终止

Azure 服务总线队列 - 使用 BrokeredMessage 进行序列化

java - 从 Docker 容器连接到本地主机上的 rabbitmq

scala - Prismarabbitmq 在 kubernetes 上的部署不起作用