android - 长链中的 rxjava StackOverflowError 异常

标签 android retrofit rx-java stack-overflow jack-compiler

我用很多运算符构建了很长的 rxjava 链(带有改造请求):doOnNext、doOnError、switchIfEmpty、onErrorResumeNext、flatMap 在某些设备(例如 Android 4.1)上,当链经过最长的路线时,它会抛出 StackOverflowError 异常。

是否有一些方法或最佳实践来优化链或防止 StackOverflowError?

现在我只看到一种方式 - 断开链,并从第一个 onComplete(OnNext) 调用第二部分,但我认为这不是 react 方式。

还有另一种方法 - 使用 .subscribeOn(Schedulers.newThread()); 改变线程运算符(operator)。似乎也不是最佳解决方案。

我的代码: 1) 订阅代码

fastSearch(keyphrase)
    .onErrorResumeNext(throwable -> {
        return correctKeyphraseAndSearch(keyphrase);
    })
    .doOnNext(resultsDao -> {...})
    .subscribe(...)

2) 辅助方法

public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
    String SRD = "true";
    final HttpQueryParams params = new HttpQueryParams();
    //read from cache chain
    Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
            .doOnNext(...)
            .doOnError(...)
            .onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
    //network request chain
    Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
            .retryWhen(new OnNewSessionRequired())
            .doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
    //combine cache+network chains
    return cacheChain
            .switchIfEmpty(networkChain)
            .doOnNext(resultsDao -> resultsDao.setKeyphrase(keyphrase));
}

public static Observable<SearchResultsDao> correctKeyphraseAndSearch(final String keyphrase) {
    return mainDiv()
            .flatMap(str -> syntax(str, keyphrase, true))
            .flatMap(syntaxDao -> {
                //get corrected keyphrase from server
                StringBuilder newKeyphrase = ... ;
                //repeat search request with new keyphrase
                return fastSearch(newKeyphrase.toString());
            });
}

3) 一些评论:

mainDiv() 和 syntax() 方法与 fastSearch() 方法相同,但对服务器执行另一个请求

getCache().fastSearch() - 创建从自己的缓存中读取数据的可观察对象(类似改造:getCache() 实现改造 Api 方法接口(interface))

堆栈跟踪:

Uncaught Exception
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:442)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:305)
at java.util.concurrent.FutureTask.run(FutureTask.java:137)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1076)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:569)
at java.lang.Thread.run(Thread.java:856)
Caused by: java.lang.StackOverflowError
okhttp3.HttpUrl.newBuilder (HttpUrl.java:633)
retrofit2.RequestBuilder.addQueryParam (RequestBuilder.java:144)
retrofit2.ParameterHandler$Query.apply (ParameterHandler.java:109)
retrofit2.ServiceMethod.toRequest (ServiceMethod.java:108)
retrofit2.OkHttpCall.createRawCall (OkHttpCall.java:178)
retrofit2.OkHttpCall.execute (OkHttpCall.java:162)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:171)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar (OperatorMerge.java:368)
rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit (OperatorMerge.java:330)
rx.internal.operators.OperatorMerge$InnerSubscriber.onNext (OperatorMerge.java:807)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError (OperatorSwitchIfEmpty.java:116)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OnSubscribeRedo$4$1.onError (OnSubscribeRedo.java:331)
rx.internal.operators.OperatorMerge$MergeSubscriber.reportError (OperatorMerge.java:243)
rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate (OperatorMerge.java:779)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop (OperatorMerge.java:540)
rx.internal.operators.OperatorMerge$MergeSubscriber.emit (OperatorMerge.java:529)
rx.internal.operators.OperatorMerge$InnerSubscriber.onError (OperatorMerge.java:813)
rx.Observable$ThrowObservable$1.call (Observable.java:10200)
rx.Observable$ThrowObservable$1.call (Observable.java:10190)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:307)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:289)
rx.internal.operators.NotificationLite.accept (NotificationLite.java:150)
rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext (SubjectSubscriptionManager.java:253)
rx.subjects.BehaviorSubject.onNext (BehaviorSubject.java:160)
rx.internal.operators.OnSubscribeRedo$2$1.onError (OnSubscribeRedo.java:242)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:43)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:38)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:173)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSubscribeOn$1.call (OperatorSubscribeOn.java:94)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:442)

最佳答案

Stackoverflows 很少发生在典型的流中;没有代码我不能确定出了什么问题。

在早期版本的 RxJava 中,某些运算符存在重入问题,可能会导致 StackOverflows,但我们已经有一段时间没有收到任何类似的错误报告了。请确保您使用的是 1.x 系列的最新 RxJava(RxAndroid 插件的依赖项可能需要覆盖,因为它在这方面很少更新)。

您可以做一些事情:

  • 将后续的 doOnNextdoOnError 调用压缩到 doOnEach
  • 偶尔添加observeOn(Schedulers.computation())
  • 避免链接过多的 mergeWith,将源收集到 List 中并使用 merge(Iterable)
  • 升级到 RxJava 2.x,应该有更少的堆栈深度(尚未验证)

编辑:

根据堆栈跟踪,我的理解是您有太多空序列,并且 switchIfEmpty 在从一个空序列切换到下一个空序列时不断加深堆栈。正如您所发现的,subscribeOn 有助于“重启”堆栈。我将研究 RxJava 本身的可能解决方案。

关于android - 长链中的 rxjava StackOverflowError 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42363452/

相关文章:

android - 如何在 ionic 中使用图像作为按钮

java - 使用 Retrofit 解析要映射的 JSON 标记名称和值

android - Retrofit 2 如何从嵌套的 json 对象获取响应 (android)

android - RxAndroid消息队列超时

android - 单击 Flutter 应用程序图标时在外部 Chrome 应用程序中启动 URL,并按下“返回”按钮关闭 Flutter 和 Chrome 应用程序

安卓如何: pass and store user Input and return it back to main class(MainActivity)

android - Kotlin和Retrofit:如何处理HTTP 400响应?

java - RxJava2 - 为多个 Rx 链重用运算符序列

android - 使用 RxJava 批量插入

php - 将 doc、pdf、xls 等从 android 应用程序上传到 php 服务器