android - RxJava onBackpressureBuffer 不发射项目

标签 android rx-java rx-android ui-thread backpressure

我目睹了 onBackpressureBuffer 的奇怪行为,我不确定它是有效行为还是错误。

我有一个以特定速率发射项目的 tcp 调用(使用流和 inputStream 但这只是为了一些信息)

在此之上,我使用 create 创建了一个可观察对象,它会在每次准备就绪时发出一个项目。

我们称它为 messages()。

然后我这样做:

messages()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe({//do some work});

我注意到使用分析工具很少会抛出 MissingBackPressureException,因此我在调用中添加了 onBackpressureBuffer。

如果我在 observeOn 之后添加它:

messages()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .onBackpressureBuffer()
    .subscribe({//do some work})

一切正常,但这意味着它只有在进入 UI 主线程后才会缓冲,所以我更喜欢这样:

messages()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({//do some work});

事情开始变得奇怪的地方。

我注意到虽然 messages() 一直在发送项目,但在某些时候它们将停止传送给订阅者。

更准确地说,恰好在 16 个项目之后,显然令人高兴的是缓冲区将开始保存项目而不将它们向前传递。

一旦我用某种超时机制取消了 messages(),它就会导致 messages() 发出 onError() 并且缓冲区将立即发出它保留的所有项目(它们将被处理)。

我检查过是否是订户做太多工作的错,但不是,他已经完成但仍然没有收到元素...

我也尝试过在订阅者中使用 request(n) 方法在 onNext() 完成后请求一个项目,但缓冲区不运行。

我怀疑是 Android UI 主线程的消息传递系统背压导致的,但我无法解释原因。

有人可以解释为什么会这样吗?这是错误还是有效行为? 谢谢!

最佳答案

不知道如何 messages(),根据所描述的行为,这是一个与 this question 类似的同池死锁。

解决方法是将 .onBackpressureBuffer 放在 subscribeOnobserveOn.

messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer()           // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});

关于android - RxJava onBackpressureBuffer 不发射项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40149801/

相关文章:

java - 将 Countrylist 模型、数据和 Arraylist 从 Java 重写为 kotlin

java - 按需执行热 Observable

Android 无法解析 rxjava 中的订阅方法

android - ViewPager 阻止加载下一个 View

安卓 : Capture a document & Scan it using camera

java - 如何正确地与doOnNext/doOnCompleted进行副作用同步?

android - share() 运算符不适用于 Rxjava 中的 Observable

android - RxJava onErrorResumeNext 调用 java.io.InterrupedIOException

android - 在我的 android 项目中使用 android box 2d

android - 在 Kotlin 中使用 RXJava 映射?