java - 为什么 rx-java 邮政编码失败

标签 java rx-java2

这是测试代码

    final Flowable<Integer> f1 = Flowable.fromPublisher(s -> {
        s.onNext(Integer.valueOf(1));
        s.onComplete();
    });


    final Flowable<Integer> f2 = Flowable.fromPublisher(s -> {
        s.onNext(Integer.valueOf(2));
        s.onComplete();
    });

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
            .blockingSubscribe(System.out::println);

会得到

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386)

不明白为什么?

如果我像这样更新代码

    final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> {
        s.onNext(Integer.valueOf(1));
        s.onComplete();
    }).onErrorResumeNext(Flowable.empty());


    final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> {
        s.onNext(Integer.valueOf(2));
        s.onComplete();
    }).onErrorResumeNext(Flowable.empty());

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
            .blockingSubscribe(System.out::println);

它将按预期打印 12。但为什么?这没有意义。

最佳答案

问题是你违反了 Publisher<T> 的契约(Contract)随着您使用 fromPublisher .

发布商需要以 Reactive Streams 中指定的非常具体的方式行事。契约(Contract)。该行为包括调用 Subscriber.onSubscribe()在进行任何其他调用并尊重该订阅者的背压之前。

因为你不打电话onSubscribe内部queue变量永远不会被初始化并且调用 queue.offer在其 onNext方法会导致 NPE。

大概是通过使用 onErrorResumeNext该实现确保一切都被正确调用,“修复”无效状态。

要解决您的问题,有两种可能性:

  1. 请勿使用Flowable.fromPublisher 。它旨在连接 react 流声明的其他实现,并且没有任何保障措施。而是使用Flowable.create它正确处理初始化和背压。
  2. 使用非背压感知 Observable因为您的用例似乎并不关心背压。再次使用Observable.create安全使用方法。

关于java - 为什么 rx-java 邮政编码失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41071413/

相关文章:

Java - 如何测试永远不会发生的异常?

java - 观察变量的变化

android - Rxjava 和工作管理器链式异步调用

rx-java - Rx 跳过,直到经过几秒

android - 内部类ViewHolder的构造函数只能用包含类的接收者调用

java - 从文本到数字

java - PHSchematron 无法验证 `p` 元素内包含 `pattern` 元素的 schematron 文件

java - 使用 BouncyCaSTLe API 进行 RSA 加密

java - 如何查找等待执行的可运行对象的数量

java - RxJava 中的单线程 ExecutorService 相当于什么?