我们正在尝试使用响应式(Reactive)编程将数据从一个存储桶复制到另一个存储桶(大约 100 万个文档)。我们在这段代码中遇到了 OOM。我不是 rxjava 专家,需要帮助以防止 OOM。我认为读取速度比写入速度快,这会因缓冲区已满而导致 OOM。代码如下:
CountDownLatch countDownLatch5 = new CountDownLatch(1);
Observable
.from(n1qlKeysForDocsGPC)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String key) {
return readPrimaryMainAsyncBucket
.get(key, 10, TimeUnit.SECONDS)
.onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
.retry(50)
.switchIfEmpty(Observable.empty())
.onErrorResumeNext(Observable.empty());
}
})
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(JsonDocument jsonDocument) {
return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
}
})
.last()
.doOnTerminate(new Action0() {
@Override
public void call() {
countDownLatch5.countDown();
}
})
.subscribe();
try {
countDownLatch5.await();
logger.info("DataRecoverySchedulers | Completed countDownLatch5");
} catch (InterruptedException e) {
e.printStackTrace();
}
最佳答案
Couchbase Java SDK 3.x 之前的版本(在撰写本文时尚未发布)使用 RxJava 版本 1。
flatmap
正如您现在所拥有的那样,调用会将操作发布到内部缓冲区以异步执行,并返回 Observable
跟踪每一个。这意味着第一个 flatmap
将消耗您的 from
的输出以无限制的方式调用。换句话说,它读取整个列表的速度比操作发生的速度快得多。我预计您看到的 OOM 错误是由于 Couchbase 内部缓冲区溢出造成的。
要纠正此问题,您可以使用 flatmap
的变体这限制了未完成订阅的数量。您只需将第二个整数参数添加到 flatmap
中即可。称呼。所以你会有 .flatmap(new Func1<~>..., 10)
将自己限制为一次 10 个未完成的操作。
Couchbase 中的默认缓冲区约为 16000 个未完成的操作,但这远远超出了大多数系统饱和所需的数量。
有关引用,请参阅相关的 Stack Overflow post关于限制文件上传的吞吐量。
关于rx-java - 使用响应式(Reactive)编程将 couchbase 文档从一个存储桶复制到另一个存储桶时出现 OOM 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54998435/