更新:我正在使用 RxJava 1.x
以下代码片段:
private static void tryObservableToMap() {
bad();
good();
}
private static void good() {
System.out.println("GOOD CASE");
String goodOutput =
m(m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")
.distinct(), "distinct")
.flatMap(s ->
m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)
.map(intValue -> Pair.of(s, intValue)), "pair " + s)), "flatMap")
.toMap(Pair::getKey, Pair::getValue), "toMap")
.map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")
.toBlocking()
.first();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\nOutput:");
System.out.println(goodOutput);
}
private static void bad() {
System.out.println("BAD CASE");
String badOutput =
m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")
.distinct(), "distinct")
.flatMap(s ->
m(m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)
.map(intValue -> Pair.of(s, intValue)), "pair " + s)
.toMap(Pair::getKey, Pair::getValue), "toMap " + s)), "flatMap")
.map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")
.toBlocking()
.first();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\nOutput:");
System.out.println(badOutput);
}
private static <T> Observable<T> m(final Observable<T> observable, final String name) {
return observable
.doOnSubscribe(() -> logRxLifecycleEvent(name, "subscribe"))
.doOnError((ex) -> logRxLifecycleEvent(name, "error: " + ex.getMessage()))
.doOnCompleted(() -> logRxLifecycleEvent(name, "complete"))
.doOnTerminate(() -> logRxLifecycleEvent(name, "terminating"))
.doAfterTerminate(() -> logRxLifecycleEvent(name, "terminated"))
.doOnUnsubscribe(() -> logRxLifecycleEvent(name, "unsubscribe"));
}
private static void logRxLifecycleEvent(final String name, final String event) {
System.out.println("\tRXLOG " + name + " observable " + event);
}
private static int getIntForString(String s) {
switch(s) {
case "a":
return 1;
case "b":
return 2;
case "c":
return 3;
case "d":
return 4;
default:
return 0;
}
}
好与坏的区别在于,对于坏版本,我在 .flatMap
内部调用 .toMap
,而不是在 .flatMap< 之后调用它
.
如果运行此代码,您将看到执行过程中所有可观察量的不同事件。
我想知道为什么“OUTER”可观察对象永远不会因坏情况而终止。对RX有更深入了解的人可以解释一下吗?
最佳答案
缺少 RXLOG OUTER observable Complete
是因为 toBlocking().first()
和其上方完成的源之间存在竞争。它可能会很快被取消订阅,因此上面的源可能没有机会发出 onCompleted
。在我的 i7 4770K 上,他们从不为我打印 completed
。如果将 first
替换为 toIterable().iterator().next()
,它会提供必要的机会,并且您应该始终看到丢失的日志。
关于java - 为什么 toMap 的错误放置会导致 RXJava 出现问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53180512/