Flux.just(1, 2, 3)
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1, error -> true)
.subscribe(number -> System.out.println("I will never be here..."),
error -> System.out.println("will I be here? " + error),
() -> System.out.println("completed!"));
输出:
producing number: 1
throwing error.
producing number: 2
producing number: 3
预期输出:(根据我的逻辑 - 我确信我错了)
producing number: 1
throwing error.
producing number: 2
throwing error.
producing number: 3
为什么输出与预期输出不同?
最佳答案
这有点棘手,但归结为:
调用 publish
和 autoConnect
的效果是,一旦您订阅,就会创建一个内部订阅,即使外部订阅被取消,该内部订阅也会持续存在。将其视为通过 publish
和 autoConnect
连接的两个流。
--inner stream--> publish/connect-> --outer stream--> subscriber
无论外部流发生什么,内部流都会继续运行。这是有意为之的,因为您有一个热门的可观察对象,并且您可能有多个订阅者。如果其中一个断开连接,则其他人仍然希望从流中获取值。换句话说,发布运算符告诉源它可以根据需要生成值。
--inner stream--> publish/connect-> --outer stream--> subscriber1
--outer stream--> subscriber2
您可以通过删除重试
来验证该行为。数字 1 和 2 仍将被打印。如果您希望源停止生成值,可以使用 refCount
而不是 autoConnect
。如果没有更多订阅者,refCount
将取消内部流。
棘手的部分来了:这个流是同步的并且 Streams are just functions 。它的底层有点复杂,但是重试运算符内的订阅函数会一直运行,直到内部流完成。只有这样它才会重新订阅。
这与异步流不同,例如使用Flux.interval
创建。
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1)
.subscribe()
当您调用 .subscribe()
时,retry
将在内部调用 doOnNext
上的订阅,依此类推,直到第一个 doOnNext
在 Flux 上调用 subscribe
。间隔 Flux 表示“好的,我将在一秒钟内发出第一个值”,并且订阅已建立。
一秒钟后,发出一个值,抛出一个错误,重试运算符再次订阅。
在您的示例中,Flux 已经在订阅完全建立之前在订阅调用期间开始发出值,可以这么说。
关于java - 在热流上重试操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51107243/