我还有一个问题。这次我在执行此代码期间遇到此错误 Caused by: rx.exceptions.MissingBackpressureException
:
class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>
init {
numberOfFileToUpdate = PublishSubject.create()
}
public fun startUpdate(): Observable<Int>{
return getProducts().flatMap { products: ArrayList<Product> ->
numberOfFileToUpdate.onNext(products.size)
return@flatMap saveRows(products)
}
}
private fun getProducts(): Observable<ArrayList<Product>> {
return Observable.create {
var products: ArrayList<Product> = ArrayList()
var i = 0
while (i++ < 100) {
products.add(Product())
}
it.onNext(products)
it.onCompleted()
}
}
private fun saveRows(products: ArrayList<Product>): Observable<Int> {
return Observable.create<Int> {
var totalNumberOfRow = products.size
while (totalNumberOfRow-- > 0){
it.onNext(products.size - totalNumberOfRow)
Thread.sleep(100)
}
it.onCompleted()
}
}
代码只是两个进程的测试代码。第一个过程从 Web 获取 Product
列表,然后将这些产品保存到应用程序的本地数据库中。这是主要思想。
方法 getProducts
完成获取数据的工作,在本例中我只创建了一个包含 100 个产品的 ArrayList。 saveRows
执行持久化工作。
saveRows
方法发出一个代表已保存行的 Int
。我这样做是因为在 UI 中我有一个报告进度的进度条。
我从应用程序的另一个点调用方法 startUpdate
并在发出一些项目后我得到 describe 异常
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)
我理解为什么会发生此异常 https://github.com/ReactiveX/RxJava/wiki/Backpressure
但我不知道我做错了什么或如何解决它。
谁能给我一些建议。
最佳答案
问题是您的 Observable 源发出的速度比消费者消耗的快。保存每个产品需要 100 毫秒。您可以添加 onBackpressureBuffer()。
UpdateHelper().startUpdate()
.onBackpressureBuffer() // Add this
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.d(TAG, "next $it")
}, {
Log.d(TAG, it.message)
}, {
})
此外,您可以尝试删除 Thread.sleep(100)
。
平面图使用OperatorMerge ( merge(map(func))
):您可以看到,在您的情况下,map
的 onNext 发送速度比请求的速度快。
关于android - 由 : rx. 异常引起。MissingBackpressureException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34447356/