android - 不使用 Observable.create 创建 Observable

标签 android reactive-programming rx-java

我在我的 Android 应用程序中使用 RxJava,我想从数据库中加载数据。

这样,我使用 Observable.create() 创建一个新的 Observable,它返回 EventLog

的列表
public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

虽然它可以正常工作,但我读到使用 Observable.create() 实际上并不是 Rx Java 的最佳实践(参见 here)。

所以我就这样改了这个方法。

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

这是使用 Rx Java 的更好方法吗?为什么?这两种方法到底有什么区别?

此外,由于数据库加载元素列表,一次发出整个列表是否有意义?或者我应该一次发射一个项目?

最佳答案

这两种方法可能看起来相似并且行为相似,但 fromCallable 为您解决了背压的困难,而 create 版本则没有。在 OnSubscribe 实现中处理背压的范围从简单到彻底令人费解;但是,如果省略,您可能会在异步边界(例如 observeOn)甚至连续边界(例如 concat)上得到 MissingBackpressureException

RxJava 试图为尽可能多的工厂和运算符(operator)提供适当的背压支持,但是,有很多工厂和运算符(operator)无法支持它。

手动 OnSubscribe 实现的第二个问题是缺乏取消支持,尤其是当您生成大量 onNext 调用时。其中许多可以替换为标准工厂方法(例如 from)或帮助程序类(例如 SyncOnSubscribe),为您处理所有复杂性。

您可能会发现很多介绍和示例(仍然)使用 create 有两个原因。

  1. 通过展示事件的推送如何以命令式的方式工作,介绍基于推送的数据流要容易得多。在我看来,这些来源花费太多时间在 create 上,而不是谈论标准工厂方法并展示如何安全地完成某些常见任务(例如您的)。
  2. 其中许多示例是在 RxJava 不需要背压支持甚至适当的同步取消支持时创建的,或者只是从 Rx.NET 示例中移植而来的(迄今为止,这些示例不支持背压和同步取消工作,礼貌我猜是 C# 的。)当时通过调用 onNext 生成值是无忧无虑的。然而,这种使用确实会导致缓冲区膨胀和过多的内存使用,因此,Netflix 团队想出了一种限制内存使用的方法,要求观察者说明他们愿意继续执行多少项目。这被称为背压。

对于第二个问题,即是否应该创建一个列表或一系列值,这取决于您的来源。如果您的源支持某种类型的迭代或单个数据元素的流式传输(例如 JDBC),您可以 Hook 并逐个发出(参见 SyncOnSubscribe)。如果它不支持它,或者无论如何您都需要它的列表形式,那么保持原样。如有必要,您始终可以通过 toListflatMapIterable 在两种形式之间进行转换。

关于android - 不使用 Observable.create 创建 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34204802/

相关文章:

android - OSMdroid 由 : java. lang.ClassNotFoundException : org. osmdroid.views.MapView 引起

android - 我想知道 android 中 Activity 和服务之间的线程流

Android 从 URL 下载 PDF,然后使用 PDF 阅读器打开它

arrays - 如何使用响应式(Reactive)扩展将一个事件拆分为多个事件?

java - RxJava - 观察可能总是变化的数据

java - 调整 fragment 的尺寸

ios - RxSwift 订阅 <Bool> 变化取决于一个变量 (MVVM)

java - 响应式(Reactive) Java : Insert item in stream based on comparison of two items

android - RxView.debounce等待debounce的时间才执行命令,如何立即执行命令?

java - 为什么这个简单的 RxJava 示例无法运行?