java - Spring Integration 在事务中向 Executor 发送消息

标签 java spring-integration spring-integration-http

我有大量来自 CSV 文件的消息,然后这些消息被发送到速率受限的 API。我使用由数据库 channel 消息存储支持的队列 channel 来使消息在处理时持久。我希望尽可能接近速率限制,因此我需要跨多个线程向 API 发送消息。

我脑子里的想法是它应该如何工作,读取数据库,查看可用的消息,然后将每条消息委托(delegate)给要在事务中处理的线程之一。

但我无法做到这一点,我必须做的是有一个事务轮询器,它有一个 N 线程的线程池,固定速率为 5 秒,每次轮询的最大消息为 10 条(超过 5 秒内可以处理的消息)...它工作正常,但是当等待的消息不多时就会出现问题(即,如果有 10 条消息,它们将由单个线程处理),这在实践中不会成为问题,因为我们将1000 条消息。但它在概念上似乎比我想象的更复杂。

我可能没有很好地解释这一点,但是当消息传入速度快但传出速度较慢时,这似乎是一个常见问题?

最佳答案

您的解决方案确实是正确的,但您需要考虑不要将消息转移到 Exetuor 中,因为这样您就会跳出事务边界。

事实上,您在同一个线程中处理了 10 条消息,这正是一个实现细节,如下所示:

AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
            int count = 0;
            while (AbstractPollingEndpoint.this.initialized
                    && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                    || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                try {
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
                    count++;
                }

因此,我们在同一线程中轮询消息,直到 maxMessagesPerPoll

为了使其真正更加并行并仍然保持事务不丢失消息,您需要考虑使用fixedRate:

/**
 * Specify whether the periodic interval should be measured between the
 * scheduled start times rather than between actual completion times.
 * The latter, "fixed delay" behavior, is the default.
 */
public void setFixedRate(boolean fixedRate)

并增加TaskScheduler用于轮询的线程数量。 您可以声明一个名为 IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAMEThreadPoolTask​​Scheduler bean,以覆盖池为 10 的默认 bean。或者使用全局属性来覆盖默认 TaskScheduler 中的池大小:https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/configuration.html#global-properties

关于java - Spring Integration 在事务中向 Executor 发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50945720/

相关文章:

java - Spring Integration 从 REST 服务获取分页结果

java - Spring Integration - 在消息中添加自定义 header

java - 发送 http post 请求不起作用

java - 我是否需要关闭 PipedInputStream 和 PipedOutputStream

java - 线程亲和性是什么意思?

spring-boot - Spring Integration DSL 添加中流事务

java - 如何在 Spring 集成中使用 Java DSL 创建 ws 入站网关?

java - 是否有一种通用的格式/方法可以在 windows 和 linux 中定位配置文件?

java - 获取数据报的IP地址 spring-integration