这是测试代码
final Flowable<Integer> f1 = Flowable.fromPublisher(s -> {
s.onNext(Integer.valueOf(1));
s.onComplete();
});
final Flowable<Integer> f2 = Flowable.fromPublisher(s -> {
s.onNext(Integer.valueOf(2));
s.onComplete();
});
Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
.blockingSubscribe(System.out::println);
会得到
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386)
不明白为什么?
如果我像这样更新代码
final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> {
s.onNext(Integer.valueOf(1));
s.onComplete();
}).onErrorResumeNext(Flowable.empty());
final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> {
s.onNext(Integer.valueOf(2));
s.onComplete();
}).onErrorResumeNext(Flowable.empty());
Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
.blockingSubscribe(System.out::println);
它将按预期打印 12。但为什么?这没有意义。
最佳答案
问题是你违反了 Publisher<T>
的契约(Contract)随着您使用 fromPublisher
.
发布商需要以 Reactive Streams
中指定的非常具体的方式行事。契约(Contract)。该行为包括调用 Subscriber.onSubscribe()
在进行任何其他调用并尊重该订阅者的背压之前。
因为你不打电话onSubscribe
内部queue
变量永远不会被初始化并且调用 queue.offer
在其 onNext
方法会导致 NPE。
大概是通过使用 onErrorResumeNext
该实现确保一切都被正确调用,“修复”无效状态。
要解决您的问题,有两种可能性:
- 请勿使用
Flowable.fromPublisher
。它旨在连接 react 流声明的其他实现,并且没有任何保障措施。而是使用Flowable.create
它正确处理初始化和背压。 - 使用非背压感知
Observable
因为您的用例似乎并不关心背压。再次使用Observable.create
安全使用方法。
关于java - 为什么 rx-java 邮政编码失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41071413/