android - 如何将始终从特定线程调用其回调的监听器包装到符合 subscribeOn 定义的 Scheduler 的 Observable 中?

标签 android firebase firebase-realtime-database rx-java

对于不熟悉 Android 和/或 Firebase 开发的人的简要介绍:

在 Android 开发中,您应该始终从主线程(也称为 UI 线程)操作应用程序的 View ,但如果您的应用程序需要进行一些繁重的处理,则应使用后台线程,otherwise the app would seem unresponsive .

Firebase是一种服务,它提供了一种在云中存储数据并与 NoSQL 数据库同步数据的方法。它还提供了一个 Android SDK 来管理这个数据库。每次使用此 SDK 进行操作(如查询)时,Firebase 通过在其自己的内部后台线程上进行所有繁重的处理并始终调用其回调来避免这些线程陷阱 on the main thread .

例子:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

ValueEventListener postListener = new ValueEventListener() {
  @Override
  public void onDataChange(DataSnapshot dataSnapshot) {
    // This is always called on the main thread
    // Get Post object and use the values to update the UI
    Post post = dataSnapshot.getValue(Post.class);
    // ...
  }

  @Override
  public void onCancelled(DatabaseError databaseError) {
    // Getting Post failed, log a message
    printError(databaseError.toException());
    // ...
  }
};

postsQuery.addValueEventListener(postListener);

我面临的实际问题:

我正在尝试使用如下方法使用 RxJava 包装 Firebase 的查询监听器:

private static Observable<DataSnapshot> queryObservable(final Query query) {
  return Observable.fromEmitter(emitter -> {
    // This is called on the Scheduler's thread defined with .subscribeOn()
    printThread("emitter");
    final ValueEventListener listener = new ValueEventListener() {
      @Override public void onDataChange(final DataSnapshot dataSnapshot) {
        // This is always called on the main thread
        printThread("onDataChange");
        emitter.onNext(dataSnapshot);
      }

      @Override public void onCancelled(final DatabaseError databaseError) {
        // This is called on the main thread too
        emitter.onError(databaseError.toException());
      }
    };

    query.addValueEventListener(listener);

    emitter.setCancellation(() -> query.removeEventListener(listener));
  }, Emitter.BackpressureMode.BUFFER);
}

但是因为 Observable 正在从 Firebase 的回调(在主线程上调用)内部发射项目,任何进一步的 .subscribeOn() 运算符都将被忽略。

例如,调用上面的方法是这样的:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

queryObservable(postsQuery).doOnSubscribe(() -> printThread("onSubscribe"))
    .subscribeOn(Schedulers.io())
    .subscribe(dataSnapshot -> printThread("onNext"));

将打印以下内容:

onSubscribe Thread: RxIoScheduler-2
emitter Thread: RxIoScheduler-2
onDataChange Thread: main
onNext Thread: main

据我了解,当 Firebase 的 SDK 调用 onDataChange() 回调并从其内部后台线程切换到主线程时,它也会使 Observable 在主线程上发出新的项目,使流中的任何 .subscribeOn() 操作符变得无用。

实际问题:

我该怎么做才能不仅将这样的监听器正确包装到 Observable 中,而且使它们符合 .subscribeOn() 定义的调度程序?

谢谢!

更新:

我知道 .observeOn() 使我能够在另一个线程上处理 Firebase 返回的数据。这就是我已经在做的事情,但这不是这个问题的重点。关键是:当我通过 .subscribeOn() 传递调度程序时,我希望上游符合该调度程序的线程,但是当 Observable 有一个从在不同的线程上回调。发生这种情况时,我将失去 .subscribeOn() 保证。

这个问题的严重性乍一看可能并不明显,但如果 Observable 是库的一部分呢?那里的最佳做法是什么?库是否应该强制其客户端在调用该方法后始终调用 .observeOn()?库是否应该自己调用 .observeOn() 并将其称为“默认调度程序”?在任何这些情况下,.subscribeOn() 都是无用的,这对我来说似乎不正确。

最佳答案

只需使用 observeOn在 IO 和 subscribeOn在 Main Thread 中,这样您就可以管理在 MainThread 中收到的消息,并将 firebase 工作转移到不同的 Thread .

记得将 rxAndroid 导入你的 gradle(Rxjava 或 RxJava 2):

 compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

还建议您检查下一个库作为引用(或只是使用它):

RxJava:https://github.com/nmoskalenko/RxFirebase

RxJava 2.0:https://github.com/FrangSierra/Rx2Firebase

其中一个使用 RxJava,另一个使用 RxJava 2.0 的新 RC。如果你对它感兴趣,你可以看到两者之间的区别 here .

关于android - 如何将始终从特定线程调用其回调的监听器包装到符合 subscribeOn 定义的 Scheduler 的 Observable 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40853783/

相关文章:

java - 特定网站链接未在 WebView 中打开

java - 从特定的 userID firebase 回收器适配器获取键值

java - 如何为Firebase数据库设置 protected "write"规则?

angular - 无效管道参数 : '[object Object]' for pipe 'AsyncPipe'

android - 在 Android app_project 中读写 SQLite 数据库

java - Android Java 使用 switch/case 来计算不同的百分比

Android AutoCompleteTextView 下拉位置

ios - 使用 Firebase iOS Swift 将通知从特定设备推送到特定设备

firebase - 直接 Firebase 托管到部署在其他区域的 Cloud Functions

javascript - Firebase 函数错误 - firebase.functions 不是函数