java - RxJava : How to retain Thread from upstream

标签 java android rx-java

在 Android 上,一些回调总是根据设计在主线程上调用,例如以下ServiceConnection:

private Completable dismissService() {
    return Completable.fromEmitter(new Action1<CompletableEmitter>() {
        @Override
        public void call(final CompletableEmitter completableEmitter) {
            final ServiceConnection conn = new ServiceConnection() {
                @Override
                public void onServiceConnected(ComponentName name, IBinder service) {
                    // here always main Thread...
                    unbindService(this);
                    completableEmitter.onCompleted();
                }

                @Override
                public void onServiceDisconnected(ComponentName name) {
                    // no-op
                }
            };

            completableEmitter.setCancellation(new AsyncEmitter.Cancellable() {
                @Override
                public void cancel() throws Exception {
                    unbindService(conn);
                }
            });

            bindService(new Intent(MainActivity.this, MyService.class), conn, BIND_AUTO_CREATE);
        }
    });
}

但是,我希望由 dismissService() 返回的 Completable 在调用它的任何 Thread 上发出其结果。我使用 newSingleThreadExecutor() 尝试了以下(hacky?)解决方案:

private Completable dismissServiceRetainingThread() {
    return Single.fromCallable(new Callable<Thread>() {
        @Override
        public Thread call() throws Exception {
            return Thread.currentThread();
        }
    }).flatMapCompletable(new Func1<Thread, Completable>() {
        @Override
        public Completable call(final Thread thread) {
            return Completable.fromEmitter(new Action1<CompletableEmitter>() {
                @Override
                public void call(final CompletableEmitter completableEmitter) {
                    final ServiceConnection conn = new ServiceConnection() {
                        @Override
                        public void onServiceConnected(ComponentName name, IBinder service) {
                            // here always main Thread...
                            unbindService(this);
                            completableEmitter.onCompleted();
                        }

                        @Override
                        public void onServiceDisconnected(ComponentName name) {
                            // no-op
                        }
                    };

                    completableEmitter.setCancellation(new AsyncEmitter.Cancellable() {
                        @Override
                        public void cancel() throws Exception {
                            unbindService(conn);
                        }
                    });

                    bindService(new Intent(MainActivity.this, MyService.class), conn, BIND_AUTO_CREATE);
                }
            }).observeOn(Schedulers.from(
                           Executors.newSingleThreadExecutor(
                             new ThreadFactory() {
                               @Override
                               public Thread newThread(@NonNull Runnable runnable) {
                                 return thread;
                              }
                            }
              )));
        }
    });
}

但是,会因以下 IllegalThreadStateException 崩溃:

E/AndroidRuntime: FATAL EXCEPTION: main
              Process: com.jenzz.rxjavathreadingtest, PID: 5307
              java.lang.IllegalThreadStateException
                  at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:930)
                  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1348)
                  at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:591)
                  at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:79)
                  at rx.Completable$28$1.onCompleted(Completable.java:1805)
                  at rx.internal.operators.CompletableFromEmitter$FromEmitter.onCompleted(CompletableFromEmitter.java:73)
                  at com.jenzz.rxjavathreadingtest.MainActivity$5$2$1.onServiceConnected(MainActivity.java:117)

有什么想法可以跳回到之前在上游使用的原始Thread吗?使用subscribeOn(Schedulers.io())

最佳答案

您不想返回到您所在的确切线程。当同一个 Scheduler 中的其他线程空闲时,它可能会很忙,除非它是主线程,否则不能保证它在回调返回时仍然存在。一旦您的 call 方法返回,它就可以用于更多工作或清理。我认为您想要的是在与原始调用相同的 Scheduler 上执行。 AFIAK,也没有直接的方法来确定线程的当前调度程序(有一种脆弱的方法来确定某些调度程序,在本文的底部提到)。所以你不能轻易地做你想做的事。对于此方法来说,默认在主线程上进行通知似乎是明智的行为。如果您希望它默认为不同的 Scheduler,您可以附加 .observeOn(Schedulers.io()) 或您选择的调度程序。

正如另一个答案中所述,如果当前线程有循环器,您可以尝试创建一个Handler。您仍然需要依赖调用者来确保回调发生时线程仍然可行。这似乎比告诉调用者响应将出现在主线程上更高级别的责任,除非他们使用 observesOn 选择不同的响应。

最后一个想法是,通过查看线程名称,可以为 Schedulers 返回的标准调度程序确定正确的调度程序。它们具有可预测的前缀,例如 RxNewThreadScheduler-1,因此一些 String.startsWith() 调用可以隔离正确的调度程序。不过,这非常脆弱,因为它无法正确处理用户创建的调度程序,并且线程命名方案将来可能会发生变化。

关于java - RxJava : How to retain Thread from upstream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40900349/

相关文章:

java - 是否有可能获得从标准谷歌地图导航到我们的应用程序到达目的地的估计时间?

java - 查询字符串中扁平化 JSON 对象的 Struts2 类型转换

java - 音频输入编程

java - RxJava 条件中断链

android - onNext 启动另一个 Observable

java - 当 Activity 在其 AsyncTask 运行时被销毁时,内存中会发生什么?

java - 当我尝试使用整数值填充 TextView 时,为什么我的应用程序会崩溃?

android - 有没有一种方法可以自动化 Android 操作系统测试(而不仅仅是应用程序)?

Android,如何获取当前正在显示的 Activity 信息(在前台)?

java - 如何为 RxJava 中抛出异常的可观察返回编写文档