Spring Integration Reactor配置

标签 spring spring-integration reactor

我正在运行一个使用 spring 集成处理任务的应用程序。

我想让它同时处理多个任务,但到目前为止任何尝试都失败了。

我的配置是:

ReactorConfiguration.java

@Configuration
@EnableAutoConfiguration
public class ReactorConfiguration {


    @Bean
    Environment reactorEnv() {
        return new Environment();
    }

    @Bean
    Reactor createReactor(Environment env) {
        return Reactors.reactor()
            .env(env)
            .dispatcher(Environment.THREAD_POOL)
            .get();
    }
}

TaskProcessor.java

@MessagingGateway(reactorEnvironment = "reactorEnv")
public interface TaskProcessor {
    @Gateway(requestChannel = "routeTaskByType", replyChannel = "")
    Promise<Result> processTask(Task task);
}

IntegrationConfiguration.java(简化)

@Bean
public IntegrationFlow routeFlow() {
    return IntegrationFlows.from(MessageChannels.executor("routeTaskByType", Executors.newFixedThreadPool(10)))
        .handle(Task.class, (payload, headers) -> {
            logger.info("Task submitted!" + payload);
            payload.setRunning(true);
            //Try-catch
            Thread.sleep(999999);
            return payload;
        })
        .route(/*...*/)
        .get();
}

我的测试代码可以这样简化:

Task task1 = new Task();
Task task2 = new Task();

Promise<Result> resultPromise1 = taskProcessor.processTask(task1).flush();
Promise<Result> resultPromise2 = taskProcessor.processTask(task2).flush();


while( !task1.isRunning() || !task2.isRunning() ){
    logger.info("Task2: {}, Task2: {}", task1, task2);
    Thread.sleep(1000);
}

logger.info("Yes! your tasks are running in parallel!");

但不幸的是,最后一行日志永远不会被执行!

有什么想法吗?

非常感谢

最佳答案

好吧,我只是用简单的 Reactor 测试用例复制了它:

@Test
public void testParallelPromises() throws InterruptedException {
    Environment environment = new Environment();
    final AtomicBoolean first = new AtomicBoolean(true);
    for (int i = 0; i < 10; i++) {
        final Promise<String> promise = Promises.task(environment, () -> {
                    if (!first.getAndSet(false)) {
                        try {
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return "foo";
                }
        );
        String result = promise.await(10, TimeUnit.SECONDS);
        System.out.println(result);
        assertNotNull(result);
    }
}

(使用 Reactor-2.0.6)。

问题的原因是:

public static <T> Promise<T> task(Environment env, Supplier<T> supplier) {
    return task(env, env.getDefaultDispatcher(), supplier);
}

其中DefaultDispatcherRingBufferDispatcher扩展SingleThreadDispatcher

由于@MessagingGateway基于请求/回复场景,我们正在该RingBufferDispatcher的线程中等待回复。由于您没有在那里返回回复 (Thread.sleep(999999);),因此我们无法接受 RingBuffer 内的下一个事件。

您的调度程序(Environment.THREAD_POOL)在这里没有帮助,因为它不会影响环境。您应该考虑使用reactor.dispatchers.default = threadPoolExecutor属性。像这样的文件:https://github.com/reactor/reactor/blob/2.0.x/reactor-net/src/test/resources/META-INF/reactor/reactor-environment.properties#L46 .

是的:请升级到最新的 Reactor。

关于Spring Integration Reactor配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34297249/

相关文章:

java - 如何支持连接池供应商特定的 DataSourceProperties 配置

java - Spring Integration JMS 端点的事务管理器定义

tcp-inbound-channel-adapter 和 tcp-outbound-channel-adapter bean 配置的 Java 等效项

java - react 器: Flux<object> .subscribe() 与 .toStream()

java - 当 ParallelFlux 发生错误时回滚所有更改

python - 在 Twisted 中将 react 器传递给协议(protocol)的首选方式是什么?

java - 如果请求映射方法返回列表,Spring Boot如何解析 View 名称?

java - 如何使用 Spring、Hibernate、EntityManager 和 JPA 连接到 mysql 数据库

java - 第三方聊天工具?

tomcat - 为 Spring 集成设置 JavaMail 代理