stream - RxJava2 使用可完成的然后操作符调用的序列顺序

标签 stream sequence rx-java2

我正在尝试从 RxJava1 迁移到 RxJava2。我正在替换我以前拥有的所有代码部分 Observable<Void>Compleatable .但是我遇到了一个关于流调用顺序的问题。当我之前处理 Observables 并使用 map 和 flatMaps 时,代码“按预期”工作。然而andthen()运算符(operator)的工作方式似乎有点不同。这是一个用于简化问题本身的示例代码。

public Single<String> getString() {
    Log.d("Starting flow..")
    return getCompletable().andThen(getSingle());
}

public Completable getCompletable() {
    Log.d("calling getCompletable");
    return Completable.create(e -> {
                Log.d("doing actuall completable work");
                e.onComplete();
            }
    );
}

public Single<String> getSingle() {
    Log.d("calling getSingle");
    if(conditionBasedOnActualCompletableWork) {
        return getSingleA();
    }else{
        return getSingleB();
    }
}

最后我在日志中看到的是:
  1-> Log.d("Starting flow..")
  2-> Log.d("calling getCompletable");
  3-> Log.d("calling getSingle");
  4-> Log.d("doing actuall completable work");

正如您可能想出的那样,我希望在第 3 行之前调用第 4 行(之后 andthen() 运算符的名称表明代码将在 Completable 完成它的工作之后被称为“之后”)。以前我正在创建 Observable<Void>使用 Async.toAsync()运算符和现在称为 getSingle 的方法在 flatMap流 - 它像我预期的那样工作,所以日志 4 会出现在 3 之前。现在我尝试改变 Compleatable 的创建方式 - 比如使用 fromActionfromCallable但它的行为是一样的。我也找不到任何其他运算符来替换 andthen() .强调 - 该方法必须是 Completable ,因为它没有任何意义可返回 - 它更改了应用程序首选项和其他设置(并且像那样全局使用,主要是“按预期”工作)并且稍后需要这些更改在流中。我也试图包装 getSingle()方法以某种方式创建一个 Single 并将 if 语句移动到 create 块内,但我不知道如何在那里使用 getSingleA/B() 方法。我需要使用它们,因为它们有自己的复杂性,复制代码没有意义。任何人都知道如何在 RxJava2 中修改它以使其行为相同?在继续进行流之前,有多个地方我依赖 Compleatable 作业来完成(例如刷新 session token 、更新数据库、首选项等 - 在使用 flatMap 的 RxJava1 中没有问题)。

最佳答案

您可以使用延迟:

getCompletable().andThen(Single.defer(() -> getSingle()))

这样,您就不会执行 getSingle() 的内容立即但仅当 Completable完成和 andThen切换到 Single .

关于stream - RxJava2 使用可完成的然后操作符调用的序列顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43157437/

相关文章:

python - 从 FTP python 读取缓冲区中的文件

arrays - 以 repeatedValue 作为序列的数组初始化谜语,Swift

sql - 在 Sql Server 中创建序列

xsd - 包含 <xs :all> and <xs:any>? 的复杂类型的 XML 模式

android - 如何忽略combineLatest 中的一个可观察值,但在组合器函数中可以使用其最新值?

c++ - std::ostringstream 打印 c 字符串的地址而不是其内容

java - 当我不知道文件流在哪里打开时,有什么方法可以关闭它吗?

android - 我怎样才能同步订阅一个可观察对象,这样我就不会错过该可观察对象的发射?

java - RxJava2 中的 flatMapPublisher 是什么?

scala - 构造 Jooq 流太慢