java - Spring Integration 5.0 + Project Reactor : controlling threads

标签 java spring spring-integration reactive-programming project-reactor

关于 https://stackoverflow.com/a/47136941/1776585 的后续问题

在使用 Flux + split() + FluxMessageChannel 时,我无法让我的集成处理程序在并行线程中运行。

考虑以下片段:

// ...
.handle(message -> Flux.range(0, 10)
    .doOnNext(i -> LOG.info("> " + i))
    .subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...

所有日志在一个线程中输出:

[     parallel-1] d.a.Application    : > 0
[     parallel-1] d.a.Application    :  -> 0
[     parallel-1] d.a.Application    : > 1
[     parallel-1] d.a.Application    :  -> 1
[     parallel-1] d.a.Application    : > 2
[     parallel-1] d.a.Application    :  -> 2
[     parallel-1] d.a.Application    : > 3
[     parallel-1] d.a.Application    :  -> 3
[     parallel-1] d.a.Application    : > 4
[     parallel-1] d.a.Application    :  -> 4
[     parallel-1] d.a.Application    : > 5
[     parallel-1] d.a.Application    :  -> 5
[     parallel-1] d.a.Application    : > 6
[     parallel-1] d.a.Application    :  -> 6
[     parallel-1] d.a.Application    : > 7
[     parallel-1] d.a.Application    :  -> 7
[     parallel-1] d.a.Application    : > 8
[     parallel-1] d.a.Application    :  -> 8
[     parallel-1] d.a.Application    : > 9
[     parallel-1] d.a.Application    :  -> 9

如何强制在多线程中处理?

我试过在 Flux 上使用 .parallel().runOn(),但这只会使获取数据并行化,但实际处理仍然在一个线程上运行.

我还在 Flux 上尝试了 .publishOn(Schedulers.parallel()) 但没有效果。

同时向处理程序添加 ExecutorChannel 或带有执行程序的 Poller 也没有帮助。

最佳答案

这有一些技巧:

.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))

FluxMessageChannel 使用的那些消息将与那个额外的 ExecutorChannel 并行。

我认为您要问的是使上述 FluxMessageChannel 可配置的功能请求。并且可以在那里配置这样的 subscribeOn/publishOn 等。

欢迎提出 JIRA关于这件事!

关于java - Spring Integration 5.0 + Project Reactor : controlling threads,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49496099/

相关文章:

java - 已编辑!未调用的 getter 函数导致 JPanel 在错误的位置重新打印

java - Spring一对多总是空的

java - 使用动态文件名将文件从 FtpOutBoundGateway 移动到另一个远程位置

java - MarshallingWebServiceInboundGateway 的问题

java - Spring集成消息处理链使用?

Java如何注册一个类实例,让任何类都能找到

java - *客户端*可扩展性适用于大量远程 Web 服务调用

java - 字符之间的正则表达式匹配

java - 并行多个步骤的Spring批处理分区?

java - Spring @transactional 不起作用