我编写了下面提到的方法来从 couchbase 服务器批量获取数据。
bucket.async()
.query(N1qlQuery.simple(query))
.doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t)))
.flatMap(AsyncN1qlQueryResult::rows)
.flatMap(row ->
bucket.async().
get(row.value().getString("id")))
.map(JsonDocument::content).
toList()
.toBlocking()
.single();
当我传递查询时,此代码工作正常
"SELECT meta().id as id FROM bucket"
但是当我使用类似的东西
"SELECT meta().id as id FROM bucket order by id ASC"
我得到的结果未排序。但是,当我在查询控制台上运行相同的查询时,结果符合预期。这让我相信我在 rxJava 中做错了什么。请帮我解决这个问题。
最佳答案
由于 flatMap()
运算符应用并发流,因此顺序丢失。
当您应用 flatMap()
时,您正在为每个 onNext()
创建并订阅一个新的 Observable
,这意味着对于每一行,您并行执行这一行:
bucket.async().
get(row.value().getString("id")))
然后每个获取操作将在不同的时间完成,并且获取的内容将无序地发出。
如果你想保持顺序但又不想失去并行性,你应该使用 concatMap()它将仅维护 1 个事件流,并将按顺序订阅每个获取操作。
如果您确实需要/想要并行性,则应该使用 concatMapEager() ,它将并行执行每个创建的 Observable,但会按顺序发出项目。
关于rx-java - 如何在 RxJava 和 couchbase 中使用排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43088788/