java - 为什么 toMap 的错误放置会导致 RXJava 出现问题?

标签 java multithreading rx-java reactive-programming

更新:我正在使用 RxJava 1.x

以下代码片段:

private static void tryObservableToMap() {
    bad();
    good();
}

private static void good() {
    System.out.println("GOOD CASE");
    String goodOutput =
            m(m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")
            .distinct(), "distinct")
            .flatMap(s ->
                    m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)
                            .map(intValue -> Pair.of(s, intValue)), "pair " + s)), "flatMap")
            .toMap(Pair::getKey, Pair::getValue), "toMap")
            .map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")
            .toBlocking()
            .first();


    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("\nOutput:");
    System.out.println(goodOutput);
}

private static void bad() {
    System.out.println("BAD CASE");
    String badOutput =
            m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")
            .distinct(), "distinct")
            .flatMap(s ->
                    m(m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)
                            .map(intValue -> Pair.of(s, intValue)), "pair " + s)
                            .toMap(Pair::getKey, Pair::getValue), "toMap " + s)), "flatMap")
            .map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")
            .toBlocking()
            .first();


    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("\nOutput:");
    System.out.println(badOutput);
}

private static <T> Observable<T> m(final Observable<T> observable, final String name) {
    return observable
            .doOnSubscribe(() -> logRxLifecycleEvent(name, "subscribe"))
            .doOnError((ex) -> logRxLifecycleEvent(name, "error: " + ex.getMessage()))
            .doOnCompleted(() -> logRxLifecycleEvent(name, "complete"))
            .doOnTerminate(() -> logRxLifecycleEvent(name, "terminating"))
            .doAfterTerminate(() -> logRxLifecycleEvent(name, "terminated"))
            .doOnUnsubscribe(() -> logRxLifecycleEvent(name, "unsubscribe"));
}

private static void logRxLifecycleEvent(final String name, final String event) {
    System.out.println("\tRXLOG " + name + " observable " + event);
}

private static int getIntForString(String s) {
    switch(s) {
        case "a":
            return 1;
        case "b":
            return 2;
        case "c":
            return 3;
        case "d":
            return 4;
        default:
            return 0;
    }
}

好与坏的区别在于,对于坏版本,我在 .flatMap 内部调用 .toMap,而不是在 .flatMap< 之后调用它.

如果运行此代码,您将看到执行过程中所有可观察量的不同事件。

我想知道为什么“OUTER”可观察对象永远不会因坏情况而终止。对RX有更深入了解的人可以解释一下吗?

最佳答案

缺少 RXLOG OUTER observable Complete 是因为 toBlocking().first() 和其上方完成的源之间存在竞争。它可能会很快被取消订阅,因此上面的源可能没有机会发出 onCompleted。在我的 i7 4770K 上,他们从不为我打印 completed。如果将 first 替换为 toIterable().iterator().next(),它会提供必要的机会,并且您应该始终看到丢失的日志。

关于java - 为什么 toMap 的错误放置会导致 RXJava 出现问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53180512/

相关文章:

java - RxJava : Watch custom class

android - 订阅的结果没有在 rxjava 上使用

java - 双向映射情况下如何在 spring-data-jpa 中删除

java - 使用elasticsearchoperations与elasticsearchtemplate有什么区别?

c# - 是否可以限制 EventWaitHandle 的设置/重置?

c# - 使用 C# 的暂停事件处理

java - Swing 的 Accordion ?

java - 如何将自定义布局设置为菜单项?

c# - Dekker 算法混淆的变体

java - RxJava 从 Observable 更新数据