我正在尝试学习 RxJava2 库的基础知识,但现在我陷入了以下时刻:
我已通过 Flowable.generate(...)
生成了 myFlowable
,现在我需要等待所有任务完成执行,然后才能继续下一步。
这是展示问题的代码:
myFlowable.parallel()
.runOn(Schedulers.computation())
.map(val -> myCollection.add(val))
.sequential()
.subscribe(val -> {
System.out.println("Thread from subscribe: " + Thread.currentThread().getName());
System.out.println("Value from subscribe: " + val.toString());
});
System.out.println("Before sleep - Number of objects: " + myCollection.size());
try {
Thread.sleep(1000);
System.out.println("After sleep - Number of objects: " + myCollection.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
我运行了所有任务并将结果添加到集合中。如果我在 myFlowable block 之后立即检查集合大小,那么如果我在小 Thread.sleep()
之后检查它,情况将会有所不同。有什么方法可以检查所有任务是否已完成执行并且我们可以进一步进行?任何帮助或指导将不胜感激。
最佳答案
由于 RxJava 是异步的,observable 下面的 java 代码将运行,而 observable 将在不同的线程中运行,这就是为什么如果您想在 Flowable 已完成发送数据时收到通知,您应该在 RxJava 流中执行此操作。为此,您有一个运算符 .doOnComplete 这里有一个如何检测流何时完成的示例
Flowable.range(0, 100).parallel()
.runOn(Schedulers.computation())
.map(integer -> {
return integer;
})
.sequential()
.doOnComplete(() -> {
System.out.println("finished");
})
.subscribe(integer -> System.out.println(integer));
关于java - 如何等待 JavaRx2 Flowable 完成所有任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57964401/