android - 由 : rx. 异常引起。MissingBackpressureException

标签 android rx-java kotlin

我还有一个问题。这次我在执行此代码期间遇到此错误 Caused by: rx.exceptions.MissingBackpressureException:

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            return@flatMap saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)
            Thread.sleep(100)
        }
        it.onCompleted()
    }
}

代码只是两个进程的测试代码。第一个过程从 Web 获取 Product 列表,然后将这些产品保存到应用程序的本地数据库中。这是主要思想。

方法 getProducts 完成获取数据的工作,在本例中我只创建了一个包含 100 个产品的 ArrayList。 saveRows 执行持久化工作。

saveRows 方法发出一个代表已保存行的 Int。我这样做是因为在 UI 中我有一个报告进度的进度条。

我从应用程序的另一个点调用方法 startUpdate 并在发出一些项目后我得到 describe 异常

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)

我理解为什么会发生此异常 https://github.com/ReactiveX/RxJava/wiki/Backpressure 但我不知道我做错了什么或如何解决它。

谁能给我一些建议。

最佳答案

问题是您的 Observable 源发出的速度比消费者消耗的快。保存每个产品需要 100 毫秒。您可以添加 onBackpressureBuffer()。

UpdateHelper().startUpdate()
    .onBackpressureBuffer() // Add this
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
      Log.d(TAG, "next $it")
    }, {
      Log.d(TAG, it.message)
    }, {
    })

此外,您可以尝试删除 Thread.sleep(100)

平面图使用OperatorMerge ( merge(map(func))):您可以看到,在您的情况下,map 的 onNext 发送速度比请求的速度快。

关于android - 由 : rx. 异常引起。MissingBackpressureException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34447356/

相关文章:

android - Android 上的 Wifi 感知和 Wifi P2P 之间的区别?

java - 在后台运行 Activity

java - RXJava 暂停缓冲区

kotlin - 使用 IntelliJ IDEA 时,Kotlin 中的 "An illegal reflective access operation has occurred"是什么意思?

java - 如何制作 Activity 来显示从服务获取的数据?

android - 指定为非null的参数为null:参数投影Content Provider Kotlin

android - 如何锁定屏幕以不允许在一个 fragment 上旋转

c# - Xamarin 表格 : Resource strings

带有比较器类的android rxjava排序列表

java - 用来自 RxJava 的可观察对象替换回调