android - 用于缓存项提供程序的 RxJava 运算符

标签 android reactive-programming rx-java

我正在尝试实现一个在 memory 中查找项目的提供程序, disk , network ,按此顺序。这样做的主要目的是在我有正确的本地缓存的情况下避免网络调用。有一个问题,因为我对网络的调用使用过滤器来获取项目,所以我可以从本地查询中获得 10 个项目,但仍然需要访问网络,因为这些项目来自不同的网络调用,具有不同的查询参数。

现在我正在使用 concatfirstOrDefault ,检查列表是否为 null 或空。我已经实现了一种方法来检查我是否已经使用特定查询调用了服务器,并且在从磁盘读取时使用它返回 null。

我现在需要改进提供程序,以便它:

  1. 发出本地项目
  2. 上网如果需要
  3. 发出在线项目

(现在它停在第一个好的项目列表处)。

我正在尝试使用 takeWhile ,使用一个方法,如果数据为 null 或为空或者我尚未为该查询调用服务器,则返回 true。问题是,如果该项目的检查为 false,则 takeWhile 不会发出该项目,这意味着我不会获得最后一个好项目(这也是最好的项目)。

我能想到的最好的解决方案是一个运算符,它会发出项目,直到出现特定条件,然后自行取消订阅。我找不到。

编辑:一些代码 解决方案1)使用firstOrDefault :如果 !DiskService.wasDownloaded() 则不会发出本地项目,因为DiskService返回 null List<Item>!DiskService.wasDownloaded()

public Observable<List<Item>> items() {
    List<Observable> obs = new ArrayList<>();

    Observable<List<Item>> memoryObs = Observable.defer(this::getMemoryItems);

    Observable<List<Item>> diskObs = Observable.defer(this::getDiskItems);

    Observable<List<Item>> networkObs = Observable.defer(this::getNetworkItems);

    Observable<List<Item>> concat = Observable.concat(memoryObs, diskObs, networkObs;


    return concat.firstOrDefault(new ArrayList<>(), this::canAccept);
}

private boolean canAccept(List<Item> data) {
    return data != null && data.size() > 0;
}

//Method in DiskService
public boolean wasDownloaded(){
    return true if the query was performed on the server, false otherwise.
} 

解决方案 2) 使用takeWhile 。 takeWhile 的问题是 Observable 不会发出不检查其条件的项目,这意味着我不会获得最佳列表。黑客解决方案是将错误检查推迟到下一项,但这样即使不需要,网络请求也会被触发。通过这个解决方案,我使用 TrustedItemList它只包含 List 和一个 bool 值,告诉 Observable 是否可以信任非空项目列表(对于 memorynetwork 始终为 true,对于 wasDownloaded() 则为 true if disk )

public Observable<List<Item>> items() {
    List<Observable> obs = new ArrayList<>();

    Observable<TrustedItemList> memoryObs = Observable.defer(this::getMemoryItems);

    Observable<TrustedItemList> diskObs = Observable.defer(this::getDiskItems);

    Observable<TrustedItemList> networkObs = Observable.defer(this::getNetworkItems);

    Observable<TrustedItemList> concat = Observable.concat(memoryObs, diskObs, networkObs;


    return concat.takeWhile(this::shouldContinueSearching)
                 .filter(trustedItemList -> trustedItemList.items != null && !trustedItemList.items.isEmpty())
                 .map(trustedItemList -> trustedItemList.items);
}

private boolean shouldContinueSearching(TrustedPoiList data) {
     return data == null || data.items == null || data.items.isEmpty() || !data.canTrustIfNotEmpty; 
}

最佳答案

我最终使用了一个自定义的 Observable.Operator,无耻地从 OperatorTakeWhile 复制而来,唯一的改变是调用 subscriber.onNext(t) 就在 onNext 方法中的 subscriber.onCompleted() 之前。这样,最后一项(即在 bool 检查中返回 false 的一项)就会被发出。

public final class OperatorTakeWhileWithLast<T> implements Observable.Operator<T, T> {

    private final Func2<? super T, ? super Integer, Boolean> predicate;

    public OperatorTakeWhileWithLast(final Func1<? super T, Boolean> underlying) {
        this((input, index) -> {
            return underlying.call(input);
        });
    }

    public OperatorTakeWhileWithLast(Func2<? super T, ? super Integer, Boolean> predicate) {
        this.predicate = predicate;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        Subscriber<T> s = new Subscriber<T>(subscriber, false) {
            private int counter = 0;
            private boolean done = false;

            @Override
            public void onNext(T t) {
                boolean isSelected;
                try {
                    isSelected = predicate.call(t, counter++);
                } catch (Throwable e) {
                    done = true;
                    Exceptions.throwIfFatal(e);
                    subscriber.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                    unsubscribe();
                    return;
                }
                if (isSelected) {
                    subscriber.onNext(t);
                } else {
                    done = true;
                    subscriber.onNext(t); //Just added this line
                    subscriber.onCompleted();
                    unsubscribe();
                }
            }

            @Override
            public void onCompleted() {
                if (!done) {
                    subscriber.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!done) {
                    subscriber.onError(e);
                }
            }
        };
        subscriber.add(s);
        return s;
    }
}

我的 items() 方法(解决方案 2)现在以:

return concat.lift(new OperatorTakeWhileWithLast<TrustedItemList>(this::shouldContinueSearching))  
             .filter(trustedItemList -> trustedItemList.items != null && !trustedItemList.items.isEmpty()) 
             .map(trustedItemList -> trustedItemList.items); 

关于android - 用于缓存项提供程序的 RxJava 运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33613645/

相关文章:

swift - 如何使用发布主题来观察变量的值?

java - Android sqlite语法错误: near ")"

android - 从服务使用 : runOnUiThread or AsyncTask or Handler or Post

android - 更新到 buildTools 27.0.3 后 Gradle 失败

android - 如何将响应 json 字符串图像转换为可显示图像

java - 如何在响应式(Reactive) Java 中将新对象添加到现有流中?

r - 非发光上下文中的响应式(Reactive)对象绑定(bind)

kotlin - RxJava 返回单,执行完之后

java - 我的 rxjava 程序中的 Scheduler.Worker 线程引发致命异常

java - 如何将 Observable<Observable<List<T>>> 转换为 Observable<List<T>>