rx-java - 如何在 RxJava 和 couchbase 中使用排序?

标签 rx-java couchbase

我编写了下面提到的方法来从 couchbase 服务器批量获取数据。

bucket.async()
            .query(N1qlQuery.simple(query))
            .doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t)))
            .flatMap(AsyncN1qlQueryResult::rows)
            .flatMap(row -> 
            bucket.async().
            get(row.value().getString("id")))
            .map(JsonDocument::content).
            toList()
            .toBlocking()
            .single();

当我传递查询时,此代码工作正常

"SELECT meta().id as id FROM bucket" 

但是当我使用类似的东西

"SELECT meta().id as id FROM bucket order by id ASC"

我得到的结果未排序。但是,当我在查询控制台上运行相同的查询时,结果符合预期。这让我相信我在 rxJava 中做错了什么。请帮我解决这个问题。

最佳答案

由于 flatMap() 运算符应用并发流,因此顺序丢失。
当您应用 flatMap() 时,您正在为每个 onNext() 创建并订阅一个新的 Observable,这意味着对于每一行,您并行执行这一行:

 bucket.async().
        get(row.value().getString("id")))

然后每个获取操作将在不同的时间完成,并且获取的内容将无序地发出。

如果你想保持顺序但又不想失去并行性,你应该使用 concatMap()它将仅维护 1 个事件流,并将按顺序订阅每个获取操作。

如果您确实需要/想要并行性,则应该使用 concatMapEager() ,它将并行执行每个创建的 Observable,但会按顺序发出项目。

关于rx-java - 如何在 RxJava 和 couchbase 中使用排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43088788/

相关文章:

android - RxJava,如果我不调用 dispose 会发生什么?

rxjs - 从不可预测的源 Observable 构建 "Heartbeat"Observable

java - 适配 RxJava 1.1.5 到 Reactor Core 3.1.0.M3

java - RxJava 将新对象添加到流中

couchbase - upsert() 是否在 couchbase 中保留 TTL?

java - 如果 RxJava 中结果无效,如何发送 onError

couchbase - 如何将 couchbase 恢复到备份状态?

couchbase - Couchbase 4.0 中分离数据、查询和索引服务

couchbase - 如何在列表中从 couchbase 中的存储桶中提取所有 key

web-services - 使用 PouchDB 作为常规 RESTful Web 服务的离线客户端临时存储