我正在尝试弄清楚如何采用作为输入数据序列的 Flux,并行通过可能对序列重新排序的阻塞调用运行它们,然后通过第二个单线程阻塞调用。这个想法是最后的单线程调用将重新排序的并行工作输出记录到磁盘上。我正在尝试做的最终目标是并行算法是一种共识算法,它将确定数据输入的实际顺序。单线程写入是模拟按照共识算法确定的顺序处理事物。
查看this article它建议我应该将我的阻塞调用转换为在调度程序上运行的 Mono,该调度程序可以给我并行或单线程处理:
public class BlockingRemoteCall {
private final static Random r = new Random();
private final static Scheduler scheduler = Schedulers.newParallel("myWebservice", 10);
static private String blockingWebService(final String in) {
try {
// fakes blocking for up to a second
Thread.sleep((long) (1000 * r.nextFloat()));
System.out.println("webserver returned: "+in+" on "+Thread.currentThread().getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
return in;
}
public static Mono<String> blockingMethodParallelThread(final String in) {
return Mono.fromSupplier(() -> blockingWebService(in))
.subscribeOn(scheduler);
}
}
public class BlockingJournal {
private final static Scheduler scheduler = Schedulers.newSingle("myJournal");
private static String blockingWrite(String in){
try {
// fakes blocking for disk write
Thread.sleep(5L);
System.out.println("journal wrote: "+in+" on "+Thread.currentThread().getName());
} catch (Exception e){
throw new RuntimeException(e);
}
return in;
}
public static Mono<String> blockingMethodSingleThread(final String in) {
return Mono.fromSupplier(() -> blockingWrite(in))
.subscribeOn(scheduler);
}
}
我一直在尝试获取整数 Flux 并通过这些方法以某种方式映射或平面映射,但我无法记录任何内容。这是我最近的尝试:
final Scheduler parallelScheduler = Schedulers.newParallel("p");
final Scheduler singleScheduler = Schedulers.single();
Flux<String> flux = Flux.range(1, 10).map(i -> i.toString()).publishOn(parallelScheduler);
Flux<String> pipeline = flux.map(s->{
Mono<String> async = BlockingRemoteCall.blockingMethodParallelThread(s);
String r1 = async.block();
Mono<String> r2 = BlockingJournal.blockingMethodSingleThread(r1);
return r2.block();
});
pipeline.subscribeOn(singleScheduler).doOnNext(System.out::println).blockLast();
这实际上并没有输出任何东西,但每当我能够生成任何输出时,我只看到 println
语句显示数据流在一个线程上按顺序处理。我希望看到的是调用 blockingMethodParallelThread(s)
时的任意延迟导致输入序列被乱序记录。
我如何设置才能并行处理输入的 Flux(最终从 reactor-netty 输入冒泡),重新排序,然后最终按顺序处理,保留重新排序?重新排序是由于并行进行阻塞调用造成的?谢谢!
最佳答案
这里有几点:
- 简单地调用
publishOn(parallelScheduler)
不会使您的Flux
并行执行,它只意味着您的顺序Flux
现在将发布在一个并行调度器。相反,您可能希望调用parallel()
使其并行,然后使用您选择的调度程序指定runOn()
。 (类似地,在并行Flux
上调用sequential()
将使其再次顺序。) - 您几乎肯定不想为这项工作使用并行调度程序 - 有界弹性调度程序将是更好的选择(它是专门为包装阻塞 IO 而设计的。)
- 执行
map()
调用然后在其中阻塞没有多大意义 - 您也可以使用flatMap()
并只返回生成的发布者。< - 创建新的调度程序在这里没有任何优势,您也可以只使用默认的调度程序。
因此,考虑到这些要点,您的代码将变为:
ParallelFlux<String> flux = Flux.range(1, 10).map(i -> i.toString()).parallel().runOn(Schedulers.elastic());
ParallelFlux<String> pipeline = flux.flatMap(s -> {
Mono<String> async = BlockingRemoteCall.blockingMethodParallelThread(s);
String r1 = async.block();
return BlockingJournal.blockingMethodSingleThread(r1);
});
pipeline.sequential().doOnNext(System.out::println).blockLast();
...如您所料,这将乱序输出结果。
关于java - 使用 Reactor 乱序处理输入通量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58708240/