java - RxJava2 - 在并行轨道上发出 onError 将得到 UndeliverableException

标签 java rx-java rx-java2

虽然其中一个 Rails 发出了 onError(),但另一个 Rails 中的 isCancelled() 仍然返回 false,这导致了 UndeliverableException >。如何检查并行轨道中下游是否取消?

Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        System.out.println("Flowable.create-emitter.isCancelled:" + emitter.isCancelled());
        for (int i = 1; i < 10; i++) {
            emitter.onNext(i);
        }
        emitter.onComplete();
    }

}, BackpressureStrategy.BUFFER).parallel(6).runOn(Schedulers.io())
        .flatMap(new Function<Integer, Publisher<String>>() {
            @Override
            public Publisher<String> apply(Integer t) throws Exception {
                // TODO Auto-generated method stub
                return Flowable.create(new FlowableOnSubscribe<String>() {

                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                            System.out.println("flatMap-before onError-isCancelled:" + emitter.isCancelled());
                            try {
                                if (true) { // assume trigger the error
                                    throw new Exception("Test");
                                }
                                if (!emitter.isCancelled()) {
                                    emitter.onNext(String.valueOf((t + 1)));
                                    emitter.onComplete();
                                }
                            } catch (Exception ex) {
                                if (!emitter.isCancelled()) {
                                    emitter.onError(ex);
                                }
                            }
                            System.out.println("flatMap-after onError-isCancelled:" + emitter.isCancelled());
                    }
                }, BackpressureStrategy.BUFFER);
            }

        }).sequential().subscribeOn(scheduler).observeOn(Schedulers.single())
        .subscribeWith(new ResourceSubscriber<String>() {

            public void onComplete() {
                System.out.println("onComplete");

            }

            public void onError(Throwable arg0) {
                System.out.println("onError:" + arg0.toString());

            }

            public void onNext(String arg0) {
                System.out.println("onNext:" + arg0);
            }

        });

最佳答案

我找到了解决方案。我需要添加一个全局错误消费者来解决问题。 https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

关于java - RxJava2 - 在并行轨道上发出 onError 将得到 UndeliverableException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50007501/

相关文章:

java - Selenium 3.3.1 和 FirefoxDriver 的依赖性错误

java - 尝试使用 Netbeans 8.0 在 javadoc 注释中的 {@code} block 中转义 "@"符号

java - 如何使用 Java 将新注释附加到 JIRA 上已有的注释

android - RxJava : closing a resource after every subscriber handled it

java - 当我们连接两个可观察的重复项时,输出流中的重复项会被消除吗?

java - 房间数据库观察者

java - RxJava retry当重试整个方法时返回一个Completable

java - 需要有关 apache poi api 中的数组公式实现的帮助

java - RxJava方法从数据库读取多个条目并将它们写入多个文件

java - 接收Java : Sleep before OnNext (Sleep before emitting from Observable)