我有下面的一段代码,当 writerObj 的内部数据结构是 CopyOnWriteArrayList (并发)时它工作得很好,当我使用 ArrayList 时它崩溃了。
这是我的问题:
- 但是 RxJava 默认情况下只有一个线程,不是吗?
- 线路(玩家 { ... } 之间)会同步执行吗?
我的代码如下所示:
.flatMapCompletable { player -> {
writerObj.write(player); // void write(Player player) adds player to inner data structure using ds.add()
return Completable.complete();
}
}
最佳答案
取决于链的其余部分的编码方式。
看看以下内容:
List<String> writerObj = new ArrayList<>();
Observable.range(0, 1000)
.map(i -> Observable.just("hello world"))
.flatMap(obs -> obs
.flatMapCompletable(elem -> {
writerObj.add(elem);
System.out.println(Thread.currentThread().getName() + " executing");
return Completable.complete();
})
.toObservable()
.subscribeOn(Schedulers.io())
)
.blockingSubscribe();
//Size of the list is not always 1000
System.out.println("The size of the list is : " + writerObj.size());
如果你执行上面的代码,你可能会注意到最后的List的大小并不总是1000。如果你将List的实现改为CopyOnWriteArrayList,我们就得到了想要的结果。
如果您希望 flatMap 中的代码一次由一个线程顺序执行,请将 flatMap 更改为 concatMap。
List<String> writerObj = new ArrayList<>();
Observable.range(0, 1000)
.map(i -> Observable.just("hello world"))
.concatMap(obs -> obs
.flatMapCompletable(elem -> {
writerObj.add(elem);
System.out.println(Thread.currentThread().getName() + " executing");
return Completable.complete();
})
.toObservable()
.subscribeOn(Schedulers.io())
)
.blockingSubscribe();
// Size is always 1000
System.out.println("The size of the list is : " + writerObj.size());
希望对你有帮助!
关于java - 如何从 void 方法返回 Completable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48300025/