rx-java - RxJava 2 相当于 isUnsubscribed

标签 rx-java rx-java2 rx-kotlin

我一直在研究书中的例子Reactive Programming with RxJava ,这是针对版本 1 而不是 2。无限流的介绍有以下示例(并注意有更好的方法来处理并发):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
    Runnabler = () -> {
        BigInteger i = ZERO;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

...

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();

然而,在 RxJava 2 中,lambda 表达式传递给了 create()方法的类型为 ObservableEmitter这没有 isUnsubscribed()方法。我看过 What's Different in 2.0并且还对存储库进行了搜索,但找不到任何此类方法。

如何在 2.0 中实现相同的功能?

编辑以包含如下给出的解决方案(n.b. 使用 kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
    Thread({
        var int: BigInteger = BigInteger.ZERO
        while (!emitter.isDisposed) {
            emitter.onNext(int)
            int = int.add(BigInteger.ONE)
        }
    }).start()
}

val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }

Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()

最佳答案

订阅 Observable 后,Disposable被退回。您可以将其保存到本地变量并检查 disposable.isDisposed()看看它是否仍在订阅。

关于rx-java - RxJava 2 相当于 isUnsubscribed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44525885/

相关文章:

java - 取出元素直到某个字符并用 RxJava 将它们分组

java - 使用另一个 Observable<List<AnotherObject>> 过滤 Observable<List<Object>>

android - RxJava : How to handle error with zip operator ?

java - RxJava 异步缓存 : proper way to dispose replay(). autoConnect() Observable

rx-java - RxJava2,Observable/Flowable 的 2 个订阅者,但 onNext 在任何一个上被调用

java - 改造 2 - RxJava : Unable to invoke no-args constructor for retrofit2. 调用 <RemoteDataObjectModel>

java - 如何动态更新 RX Observable?

java - 在 RxKotlin/RxJava 中使用 BehaviourSubject 惯用地创建热可观察对象

java - 来自缓存的 Flowable 和使用 RxJava 的其他 Flowable for DataSource

android - 订阅 Android UI 线程