android - MissingBackpressureException 即使在调用 onBackpressureBlock() 之后

标签 android reactive-programming rx-java

我正在尝试定期(每 150 毫秒)发出事件,即使上游可观察对象会更快地发送事件。

但我得到了 MissingBackpressureException,即使我调用了 onBackpressureBlock()

代码:

    SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create());

    return subject
            .subscribeOn(Schedulers.computation())
            .doOnSubscribe(() -> {
                NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> {
                            Log.d(TAG, "new quotation " + quotation.hashCode());
                            NetworkRequestsManager.instance().getSeller(quotation.sellerId)
                                    .subscribe(seller -> {
                                                for (Outlet outlet : seller.outlets) {
                                                    if (outlet.latitude != null && outlet.longitude != null)
                                                        subject.onNext(new QuotationMarker(outlet, quotation.price));
                                                }
                                            },
                                            error -> Log.fatalError(new RuntimeException(error)));
                        },
                        error -> Log.fatalError(new RuntimeException(error)));

            })
            .doOnError(throwable -> Log.fatalError(new RuntimeException(
                    "error response in subscribe after doOnSubscribe",
                    throwable)))
                    // combine with another observable that emits items regularly (every 100ms)
                    // so that a new event is received every 100ms :
                    // also, first event itself is delayed.
            .zipWith(Observable.interval(150, TimeUnit.MILLISECONDS),
                    (seller, aLong) -> seller)
            .onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s
            .doOnError(throwable -> Log.fatalError(new RuntimeException(
                    "error response after onBackpressureBlock()",
                    throwable))); // <-- error is thrown here

追踪:

    05-06 00:38:25.532  28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock()
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234)
    05-06 00:38:25.582  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
    05-06 00:38:25.592  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException
    05-06 00:38:25.612  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
    05-06 00:38:25.642  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330)
    05-06 00:38:25.642  28106-28166/com.instano.buyer W/System.err﹕ ... 10 more

PS:Log.fatalError(err) 只是我对 Android.util.Log.e(...) 的包装

编辑

经过大量的尝试和错误,这对我来说已经成为一个不会解决的问题。 zipWith(Observable.interval...) 似乎是罪魁祸首和可能的框架错误。删除这些行(并因此删除我的定期发射功能)我的代码有效。 我正在使用一个可能从不同线程调用 onNext 的主题,然后在其上执行 Obeservable 运算符。

最佳答案

我认为(但我不确定)问题在于您的背压配置是在 zip 运算符之后。

zip 运算符需要缓冲一个 Observable 的项目以将其与另一个 Observable 压缩。这是应该抛出异常的缓冲区。 (参见 here)

为了解决您的问题,我认为您应该尝试在 zip 运算符中使用的一个(或每个)Observable 上添加背压配置。

例子:

obs.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS).onBackPressureDrop());

obs.onBackPressureBlock().zipWith(Observable.interval(150, TimeUnit.MILLISECONDS));

关于android - MissingBackpressureException 即使在调用 onBackpressureBlock() 之后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30061331/

相关文章:

java - 添加 Recyclerview 的 itemview 总计 "views"的最佳实践是什么?

android - 我可以在没有电话服务的手机上测试 Android 应用程序吗

angular - 在 RxJs 中执行操作后如何重试可观察值?

c# - 有没有办法将观察者订阅为异步

computer-science - 响应式(Reactive)编程和pi演算有共同点吗?

android - 使用 Rx Java 读取文件

java - RxJava 在链式网络请求中获取先前 observable 的结果

android - CircleCI ./gradlew : Permission denied

android - 单元测试通过,但有错误

java - 处理 Observable 中的错误