java - 如何用其他 Observables 过滤 Observables

标签 java reactive-programming observable

我面临的问题如下: 我有两个可观察对象,一个是从网络获取数据,另一个是从数据库获取数据。第二个可能为空,但缺少第一个将被视为错误。然后,如果来自网络的结果出现,我需要将其与数据库的最新结果(如果存在)进行比较,如果它们不同,我想存储它们(如果数据库可观察为空,我无论如何都想存储网络结果)。

有专门的操作人员处理这样的情况吗?

到目前为止,我尝试了使用 zipWith 的解决方案(如果数据库为空,则该解决方案无法按预期工作),缓冲区(正在工作,但远非理想), 我还尝试了平面映射(这需要在订阅者中进行额外的转换)。

下面是带有缓冲区的解决方案。

Observable.concat(ratesFromNetwork(), latestRatesFromDB())
                .buffer(3000, 2)
                .filter(buffer -> !(buffer.size() == 2 && !buffer.get(0).differentThan(buffer.get(1))))
                .map(buffer -> buffer.get(0))
                .subscribe(this::save,
                        (ex) -> System.out.println(ex.getMessage()),
                        () -> System.out.println("completed"));

如果我修改latestRatesFromDb,使其不返回Observable,而是返回Optional,则整个问题变得微不足道,因为我可以使用此结果进行过滤。似乎无法以异步方式进行过滤(或者我错过了什么?)

最佳答案

好的,这就是我要如何写这篇文章的方法。

首先,任何具有 DifferentThan 函数的类都应该更改为覆盖 equals 。否则,您无法对这些对象使用许多基本方法。

出于本示例的目的,我使用 Integer 类作为类型参数编写了所有可观察量。然后我使用调度程序编写两个模拟方法:

static Observable<Integer> ratesFromNetwork(Scheduler scheduler) {
    return Observable.<Integer>create(sub -> {
        sub.onNext(2);
        sub.onCompleted();
    }).delay(99, TimeUnit.MILLISECONDS, scheduler);
}

static Observable<Integer> latestRatesFromDB(Scheduler scheduler) {
    return Observable.<Integer>create(sub -> {
        sub.onNext(1);
        sub.onCompleted();
    }).delay(99, TimeUnit.MILLISECONDS, scheduler);
}

正如您所看到的,两者很相似,但是,它们会发出不同的值。

lack of the first one is considered an error

实现此目的的最佳方法是使用超时。您可以立即在此处记录错误并继续:

final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from network."));

超时失败时,rx将抛出错误。 doOnError 将使您更好地了解此错误从何处开始,并让它在序列的其余部分中传播。

The second one might be empty

在这种情况下,我会采取类似的策略,但是,不要使用方法 onErrorResumeNext 来传播错误。现在,您可以使用 firstOrDefault 确保可观察对象至少发出一个值。在此方法中,使用一些您希望永远不会与网络结果匹配的虚拟值。

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from database"))
    .onErrorResumeNext(Observable.empty())
    .firstOrDefault(-1);

现在,通过使用 distinct 方法,只有当值与之前的值不同时,您才能获取该值(这就是为什么您需要重写 equals) .

databaseRate.concatWith(networkRate).distinct().skip(1)
    .subscribe(i -> System.out.println("Updating to " + i),
        System.err::println,
        () -> System.out.println("completed"));

这里,数据库速率被放置在网络速率之前,以利用distinct。然后添加 skip 以始终忽略数据库速率值。

<小时/>

完整代码:

final long networkTimeOut = 100;
final long databaseTimeOut = 100;

final TestScheduler scheduler = new TestScheduler();

final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from network."));

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
    .doOnError(e -> System.err.println("Failed to get rates from database"))
    .onErrorResumeNext(Observable.empty())
    .firstOrDefault(-1);

databaseRate.concatWith(networkRate).distinct().skip(1)
    .subscribe(i -> System.out.println("Updating to " + i),
        System.err::println,
        () -> System.out.println("completed"));

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
<小时/>

networkTimeOutdatabaseTimeOut 大于 100 时,它会打印:

Updating to 2
completed

networkTimeOut 小于 100 时,它会打印:

Failed to get rates from network.
java.util.concurrent.TimeoutException

databaseTimeOut小于100时,它会打印:

Failed to get rates from database
Updating to 2
completed

如果您修改 latestRatesFromDBratesFromNetwork 以返回相同的值,它只会打印:

completed
<小时/>

如果您不关心强制超时或日志记录,那么它可以归结为:

latestRatesFromDB().firstOrDefault(dummyValue)
    .concatWith(ratesFromNetwork())
    .distinct().skip(1)
    .subscribe(this::save, 
        System.err::println, 
        () -> System.out.println("completed"));

关于java - 如何用其他 Observables 过滤 Observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42193841/

相关文章:

java - 猜谜游戏逻辑

apache-kafka - 如何通过自动确认按主题和分区同时处理 Reactor Kafka 流?

java - 如何持续观察对象直到 onError()/取消订阅

Angular Looped HTTP 请求将值映射到响应

java - 计算器编码错误。请更正

java - 为什么findFragmentById总是返回null

javascript - 如何将数据从父组件发送到特定的动态创建的子组件?

Angular 4 RXJS Observable .interval() 无法在后台工作

c# - Observable.Zip 当要压缩的序列数在运行时未知时

java - 在 JSF - 尝试实现类似于 "#{bean.run(3)}"的东西