我面临的问题如下: 我有两个可观察对象,一个是从网络获取数据,另一个是从数据库获取数据。第二个可能为空,但缺少第一个将被视为错误。然后,如果来自网络的结果出现,我需要将其与数据库的最新结果(如果存在)进行比较,如果它们不同,我想存储它们(如果数据库可观察为空,我无论如何都想存储网络结果)。
有专门的操作人员处理这样的情况吗?
到目前为止,我尝试了使用 zipWith 的解决方案(如果数据库为空,则该解决方案无法按预期工作),缓冲区(正在工作,但远非理想), 我还尝试了平面映射(这需要在订阅者中进行额外的转换)。
下面是带有缓冲区的解决方案。
Observable.concat(ratesFromNetwork(), latestRatesFromDB())
.buffer(3000, 2)
.filter(buffer -> !(buffer.size() == 2 && !buffer.get(0).differentThan(buffer.get(1))))
.map(buffer -> buffer.get(0))
.subscribe(this::save,
(ex) -> System.out.println(ex.getMessage()),
() -> System.out.println("completed"));
如果我修改latestRatesFromDb,使其不返回Observable,而是返回Optional,则整个问题变得微不足道,因为我可以使用此结果进行过滤。似乎无法以异步方式进行过滤(或者我错过了什么?)
最佳答案
好的,这就是我要如何写这篇文章的方法。
首先,任何具有 DifferentThan
函数的类都应该更改为覆盖 equals
。否则,您无法对这些对象使用许多基本方法。
出于本示例的目的,我使用 Integer 类作为类型参数编写了所有可观察量。然后我使用调度程序编写两个模拟方法:
static Observable<Integer> ratesFromNetwork(Scheduler scheduler) {
return Observable.<Integer>create(sub -> {
sub.onNext(2);
sub.onCompleted();
}).delay(99, TimeUnit.MILLISECONDS, scheduler);
}
static Observable<Integer> latestRatesFromDB(Scheduler scheduler) {
return Observable.<Integer>create(sub -> {
sub.onNext(1);
sub.onCompleted();
}).delay(99, TimeUnit.MILLISECONDS, scheduler);
}
正如您所看到的,两者很相似,但是,它们会发出不同的值。
lack of the first one is considered an error
实现此目的的最佳方法是使用超时
。您可以立即在此处记录错误并继续:
final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
.timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from network."));
当超时
失败时,rx将抛出错误。 doOnError
将使您更好地了解此错误从何处开始,并让它在序列的其余部分中传播。
The second one might be empty
在这种情况下,我会采取类似的策略,但是,不要使用方法 onErrorResumeNext
来传播错误。现在,您可以使用 firstOrDefault
确保可观察对象至少发出一个值。在此方法中,使用一些您希望永远不会与网络结果匹配的虚拟值。
final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
.timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from database"))
.onErrorResumeNext(Observable.empty())
.firstOrDefault(-1);
现在,通过使用 distinct
方法,只有当值与之前的值不同时,您才能获取该值(这就是为什么您需要重写 equals
) .
databaseRate.concatWith(networkRate).distinct().skip(1)
.subscribe(i -> System.out.println("Updating to " + i),
System.err::println,
() -> System.out.println("completed"));
这里,数据库速率被放置在网络速率之前,以利用distinct
。然后添加 skip
以始终忽略数据库速率值。
完整代码:
final long networkTimeOut = 100;
final long databaseTimeOut = 100;
final TestScheduler scheduler = new TestScheduler();
final Observable<Integer> networkRate = ratesFromNetwork(scheduler)
.timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from network."));
final Observable<Integer> databaseRate = latestRatesFromDB(scheduler)
.timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler)
.doOnError(e -> System.err.println("Failed to get rates from database"))
.onErrorResumeNext(Observable.empty())
.firstOrDefault(-1);
databaseRate.concatWith(networkRate).distinct().skip(1)
.subscribe(i -> System.out.println("Updating to " + i),
System.err::println,
() -> System.out.println("completed"));
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
<小时/>
当 networkTimeOut
和 databaseTimeOut
大于 100 时,它会打印:
Updating to 2
completed
当 networkTimeOut
小于 100 时,它会打印:
Failed to get rates from network.
java.util.concurrent.TimeoutException
当databaseTimeOut
小于100时,它会打印:
Failed to get rates from database
Updating to 2
completed
如果您修改 latestRatesFromDB
和 ratesFromNetwork
以返回相同的值,它只会打印:
completed
<小时/>
如果您不关心强制超时或日志记录,那么它可以归结为:
latestRatesFromDB().firstOrDefault(dummyValue)
.concatWith(ratesFromNetwork())
.distinct().skip(1)
.subscribe(this::save,
System.err::println,
() -> System.out.println("completed"));
关于java - 如何用其他 Observables 过滤 Observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42193841/