需要帮助
我需要创建多个并行执行的 sqs 队列消费者,但我不知道如何使用 Sprint Integration 来实现这一点
我有以下架构
一个包含 20 万条消息的 Amazon SQS 队列
具有 5 个 EC2 实例的 Amazon 堆栈,每个实例都有 tomcat 服务器运行一个带有 Spring Integration 流程的 Spring boot 应用程序,该流程使用 spring-integration-aws 中的 sqs-message-driven-channel-adapter ( https://github.com/spring-projects/spring-integration-aws )
并将该消息发布到平均响应时间为 1 秒的 REST 服务(我不能修改 REST 服务是一个约束,但我可以并行发送消息)
SQS queue -> Stack(5 tomcat instances) -> Rest Service
约束 Amazon SQS 允许客户端按请求批量读取最多 10 条消息的消息,但我可以让多个客户端并行使用更多消息。
在 Amazon SQS 中,需要手动删除消息,这是使用 spring 集成完成的,我仅在 REST 服务返回 OK 时才删除消息。
我对可能的重复没有问题(SQS 向两个不同的客户端发送相同的消息)
我无法在我的 Spring Boot 应用程序中以任何方式存储消息
我的 Spring Integration 流程
<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
sqs="clientWithCredentials"
channel="channel_1"
queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
max-number-of-messages="10"/>
<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />
我如何在多个线程中并行执行此流程以并行使用更多消息?
我尝试将 <int:poller fixed-rate="1" task-executor="executor" />
放在 sqs-message-driven-channel-adapter 中,但不允许。
最佳答案
要实现这样的要求,您可以使用 ExecutorChannel
而不是默认的 DirectChannel
。
这样所有 SQS 消息都将分发到 ExecutorChannel
提供的线程,因此并行执行。
有关 ExecutorChannel
的更多信息在 Reference Manual 中.
更新
因此,我的建议应该反射(reflect)在您当前的配置中,例如:
<int:channel id="channel_1">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
更新
如果你还是坚持要多个SQS Adapter,那么简化版是这样的:
<int-aws:sqs-message-driven-channel-adapter
sqs="sqsAsyncClient"
channel="sqs-to-metricator"
queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
max-number-of-messages="10"
/>
<int-aws:sqs-message-driven-channel-adapter
sqs="sqsAsyncClient"
channel="sqs-to-metricator"
queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
max-number-of-messages="10"
/>
<int-aws:sqs-message-driven-channel-adapter
sqs="sqsAsyncClient"
channel="sqs-to-metricator"
queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
max-number-of-messages="10"
/>
<int:channel id="sqs-to-metricator" />
<int:outbound-channel-adapter ref="restService"
method="publish" channel="sqs-to-metricator" />
另外,为了避免重复,您可以考虑切换到 Java DSL 并开始使用其 ItengrationFlowContext
进行动态 IntegrationFlow
注册:https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows
关于java - 如何在多个线程中执行 Spring 集成流程以并行使用更多 Amazon SQS 队列消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49802891/