java - RxJava : OnNext Unsubscribe is not working

标签 java jakarta-ee rx-java reactive rx-java2

我在收到可观察到的第一个项目后尝试取消订阅。而且这似乎行不通。我做错了什么?

    public class ObservableAndSubscriber {

    public static void main(final String[] args) {
        final Observable<String> strObservable = Observable.create(s -> {
            while (true) {
                s.onNext("Hello World!!");
            }
        });

        final Subscriber<String> strSubscriber = new Subscriber<String>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onNext(final String t) {
                System.out.println(t);
                this.unsubscribe();

            }
        };
        strObservable.subscribe(strSubscriber);
    }
}

结果似乎在无限循环中打印“Hello World”。

最佳答案

要取消订阅工作,您需要检查循环中的订阅状态。否则它会无限运行,耗尽你的 CPU。

处理无限序列的最简单方法是利用库提供的方法 Observable.generate()Observable.fromIterable()。他们会为您进行适当的检查。

还要确保您在不同的线程上订阅和观察。否则,您将只能为单个订阅者提供服务。

你的例子:

strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);

关于java - RxJava : OnNext Unsubscribe is not working,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41367449/

相关文章:

java - 覆盖Android上的删除键?

java - UserTransaction.SetTransactionTimeout 不起作用?

android - RxJava、Proguard 和 sun.misc.Unsafe

java - RX Java - 重试一些抛出异常的代码

java - 我如何测试这个 rxJava 代码?

java - 重新排列数组中的数字

java - 如何让主线程等待其他线程在 ThreadPoolExecutor 中完成

java - Java EE 7 属性文件配置的最佳实践建议是什么?

java - 反序列化Android Retrofit时Gson stackOverflown

jakarta-ee - 无法使用 UnboundID SDK 从 WebLogic 12 建立 LDAP SSL 连接