java - 如何在多个线程中执行 Spring 集成流程以并行使用更多 Amazon SQS 队列消息?

标签 java multithreading spring-integration aws-sdk amazon-sqs

需要帮助

我需要创建多个并行执行的 sqs 队列消费者,但我不知道如何使用 Sprint Integration 来实现这一点

我有以下架构

一个包含 20 万条消息的 Amazon SQS 队列

具有 5 个 EC2 实例的 Amazon 堆栈,每个实例都有 tomcat 服务器运行一个带有 Spring Integration 流程的 Spring boot 应用程序,该流程使用 spring-integration-aws 中的 sqs-message-driven-channel-adapter ( https://github.com/spring-projects/spring-integration-aws )

并将该消息发布到平均响应时间为 1 秒的 REST 服务(我不能修改 REST 服务是一个约束,但我可以并行发送消息)

SQS queue -> Stack(5 tomcat instances) -> Rest Service

约束 Amazon SQS 允许客户端按请求批量读取最多 10 条消息的消息,但我可以让多个客户端并行使用更多消息。

在 Amazon SQS 中,需要手动删除消息,这是使用 spring 集成完成的,我仅在 REST 服务返回 OK 时才删除消息。

我对可能的重复没有问题(SQS 向两个不同的客户端发送相同的消息)

我无法在我的 Spring Boot 应用程序中以任何方式存储消息

我的 Spring Integration 流程

<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
  sqs="clientWithCredentials" 
  channel="channel_1"
  queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
  max-number-of-messages="10"/>

<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />

我如何在多个线程中并行执行此流程以并行使用更多消息?

我尝试将 <int:poller fixed-rate="1" task-executor="executor" /> 放在 sqs-message-driven-channel-adapter 中,但不允许。

最佳答案

要实现这样的要求,您可以使用 ExecutorChannel 而不是默认的 DirectChannel

这样所有 SQS 消息都将分发到 ExecutorChannel 提供的线程,因此并行执行。

有关 ExecutorChannel 的更多信息在 Reference Manual 中.

更新

因此,我的建议应该反射(reflect)在您当前的配置中,例如:

<int:channel id="channel_1">
   <int:dispatcher task-executor="someExecutor"/>
</int:channel>

更新

如果你还是坚持要多个SQS Adapter,那么简化版是这样的:

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />


<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int:channel id="sqs-to-metricator" />

<int:outbound-channel-adapter ref="restService"
    method="publish" channel="sqs-to-metricator" />

另外,为了避免重复,您可以考虑切换到 Java DSL 并开始使用其 ItengrationFlowContext 进行动态 IntegrationFlow 注册:https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows

关于java - 如何在多个线程中执行 Spring 集成流程以并行使用更多 Amazon SQS 队列消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49802891/

相关文章:

spring-integration - 如何在 Spring 集成测试中调用 channel

xml - 获取 xsi :type with xpath 的值

android - 每 10 秒后使用 Handler 在 Thread 中调用方法的最简单方法

java - Spring会自动关闭TCP入站网关吗?

java - Android Studio 和 Gradle : content is not allowed in prolog when having a . jar 位于/libs 文件夹中

java - php 的 hmac sha256 实现与 java 的不匹配

java - 当其他线程正在执行该类的静态同步方法时,一个线程能否获取该类实例的锁?

java - 在线程内使用 isFocused() 时出错?

java - 曲线内带有控制点的贝塞尔曲线

java - 单元测试,你能测试单个方法吗?