java - 如何从 void 方法返回 Completable

标签 java rx-java reactive-programming

我有下面的一段代码,当 writerObj 的内部数据结构是 CopyOnWriteArrayList (并发)时它工作得很好,当我使用 ArrayList 时它崩溃了。

这是我的问题:

  • 但是 RxJava 默认情况下只有一个线程,不是吗?
  • 线路(玩家 { ... } 之间)会同步执行吗?

我的代码如下所示:

.flatMapCompletable { player -> {
    writerObj.write(player); // void write(Player player) adds player to inner data structure using ds.add()
    return Completable.complete();
    }
}

最佳答案

取决于链的其余部分的编码方式。

看看以下内容:

List<String> writerObj = new ArrayList<>();

Observable.range(0, 1000)
        .map(i -> Observable.just("hello world"))
        .flatMap(obs -> obs
                .flatMapCompletable(elem -> {
                        writerObj.add(elem);
                        System.out.println(Thread.currentThread().getName()  + " executing");
                        return Completable.complete();
                })
                .toObservable()
                .subscribeOn(Schedulers.io())
        )
        .blockingSubscribe();
//Size of the list is not always 1000
System.out.println("The size of the list is : " + writerObj.size());

如果你执行上面的代码,你可能会注意到最后的List的大小并不总是1000。如果你将List的实现改为CopyOnWriteArrayList,我们就得到了想要的结果。

如果您希望 flatMap 中的代码一次由一个线程顺序执行,请将 flatMap 更改为 concatMap。

List<String> writerObj = new ArrayList<>();

Observable.range(0, 1000)
    .map(i -> Observable.just("hello world"))
    .concatMap(obs -> obs
            .flatMapCompletable(elem -> {
                    writerObj.add(elem);
                    System.out.println(Thread.currentThread().getName()  + " executing");
                    return Completable.complete();
            })
            .toObservable()
            .subscribeOn(Schedulers.io())
    )
    .blockingSubscribe();
// Size is always 1000
System.out.println("The size of the list is : " + writerObj.size());

希望对你有帮助!

关于java - 如何从 void 方法返回 Completable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48300025/

相关文章:

java - 当我关闭连接时,Spring Reactive : java. io.IOException : An established connection was aborted by the software in your host machine,

asynchronous - 发生某些情况时如何停止 Kotlin 流程

android - RxJava - 如何在另一个流等待第一个项目时缓冲流中的所有项目?

java - 通过 Utgard 连接到 Matrikon OPC 模拟服务器

java - 为什么每次我们使用 new 关键字创建字符串时 jvm 都会创建新的字符串对象

java - 将 Scala 编织到现有的 Java EE 项目中?

java - Android rxJava 错误处理与改造

java - volatile 与非 volatile

android-studio - Kotlin 和 RxJava2 zip 运算符 - 不能使用提供的参数调用以下函数

kotlin - 如何计算 observable 的执行时间