rx-java - 如何使用rxjava处理多个数据源?

标签 rx-java retrofit2 rx-android sqlbrite

情况是这样的:

我有领域层为业务逻辑提供数据获取接口(interface),并且我有2个数据源:本地数据库和远程网络。

它的工作原理如下:

  1. 请求所有用户:DataRepository.getInstance().getUsers();
  2. 在 DataRepository 中,有 2 个来源:
    • LocalDataSource.getUsers()从本地数据库获取所有用户,如果没有数据则忽略此请求。
    • RemoteDataSource.getUsers()它向我们的服务器请求最新的用户列表(即使本地数据库中有数据,也是为了保持数据更新),当请求数据时,将其保存或更新到本地数据库并发送回结果。

我知道通过 DataRepository 这样做我可以实现我的目标:

public Observable<List<User>> getUsers() {
    return Observable.create(new Observable.OnSubscribe<List<User>>() {
        @Override
        public void call(Subscriber<? super List<User>> subscriber) {
            // 1. Request users from local database 
            List<User> localUsers = mLocalDataSource.getUsers();
            if (!localUsers.isEmpty()) {
                subscriber.onNext(localUsers);
            }
            // 2. Request the latest user list from server
            // Send a retrofit2 request
            Call<List<User>> call = mRemoteDataSource.getUsers();
            try {
                List<User> networkUsers = call.execute().body();
                mLocalDataSource.saveUsers(networkUsers);
                subscriber.onNext(networkUsers);
                subscriber.onCompleted();
            } catch (IOException e) {
                subscriber.onError(e);
            }
        }
    });
}

现在想到我已经在项目中使用了rxjava,为什么不使用SqlBrite和Retrofit2 RxAdapters来做这件事以更方便呢? 所以LocalDataSource.getUsers()现在返回 Observable<List<User>> RemoteDataSource.getUsers()也是如此.

LocalDataSource.java

public Observable<User> getUsers() {
    final String sqlQuery = String.format("SELECT * FROM %s", UserTable.TABLE_NAME);
    return mDatabaseHelper.createQuery(UserTable.TABLE_NAME, sqlQuery)
            .mapToList(new Func1<Cursor, User>() {
                @Override
                public User call(Cursor c) {
                    return UserTable.parseCursor(c);
                }
            });
}

RemoteDataSource.java

public Observable<List<User>> getUsers() {
    return mRetrofitApi.users();
}

问题:

我应该在 DataRepository.getUsers() 做什么?达到我用老把戏所做的同样的事情?

public Observable<List<User>> getUsers() {
    Observable<List<User>> localUsers = mLocalDataSource.getUsers();
    Observable<List<User>> remoteUsers = mRemoteDataSource.getUsers()
            .flatMap(new Func1<List<User>, Observable<User>>() {
                @Override
                public Observable<User> call(List<User> users) {
                    return Observable.from(users);
                }
            })
            .doOnNext(new Action1<User>() {
                @Override
                public void call(User user) {
                    mLocalDataSource.saveUser(user);
                }
            })
            .toList();
    // What should I return to make two observables both able to emit results to the Subscriber
    return Observable.concat(localUsers, remoteUsers); // ???
}

最佳答案

What should I do in DataRepository.getUsers() to achieve the same thing I did with the old tricks?

在这种情况下,您可以使用concat:

public Observable<List<User>> getUsers() {
    return Observable.concat(localUsers.first(), remoteUsers);
}

但是如果本地结果还是远程结果先出现并不重要,您可以使用merge:

public Observable<List<User>> getUsers() {
    return Observable.merge(localUsers.first(), remoteUsers);
}

此外,如果您只想要一个结果(更快的结果),您可以使用amb:

public Observable<List<User>> getUsers() {
    return Observable.amb(localUsers.first(), remoteUsers);
}

关于rx-java - 如何使用rxjava处理多个数据源?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37569906/

相关文章:

android - 我们应该为 Rxjava3 使用哪个 rxjava3 改造适配器

java - 转换 Observable onError 并发出一个项目

java - 如何使用 id 在 URL android 中传递 "?",因为我的 url 如下所示

java - RxJava2 问题 : How Zip handles error when calling multiple services in parallel?

android - 使用 Retrofit 2.0 发布多部分表单数据,包括图像

java - fromCallable 和 defer 有什么区别?

java - RxJava : Range operator is not working

android - 如果未授予 RxPermissions,请避免征求他人许可

android - 将 RxJava 与 Room 一起使用时循环

android - 如何处理 fragment 内的 UI 事件的简单方法(onClick,...)