rx-java - 使用响应式(Reactive)编程将 couchbase 文档从一个存储桶复制到另一个存储桶时出现 OOM 问题

标签 rx-java couchbase reactive

我们正在尝试使用响应式(Reactive)编程将数据从一个存储桶复制到另一个存储桶(大约 100 万个文档)。我们在这段代码中遇到了 OOM。我不是 rxjava 专家,需要帮助以防止 OOM。我认为读取速度比写入速度快,这会因缓冲区已满而导致 OOM。代码如下:

CountDownLatch countDownLatch5 = new CountDownLatch(1);
Observable
        .from(n1qlKeysForDocsGPC)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String key) {
                return readPrimaryMainAsyncBucket
                        .get(key, 10, TimeUnit.SECONDS)
                        .onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
                        .retry(50)
                        .switchIfEmpty(Observable.empty())
                        .onErrorResumeNext(Observable.empty());
            }
        })
        .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(JsonDocument jsonDocument) {
                return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
            }
        })
        .last()
        .doOnTerminate(new Action0() {
            @Override
            public void call() {
                countDownLatch5.countDown();
            }
        })
        .subscribe();
try {
    countDownLatch5.await();
    logger.info("DataRecoverySchedulers | Completed countDownLatch5");
} catch (InterruptedException e) {
    e.printStackTrace();
}

最佳答案

Couchbase Java SDK 3.x 之前的版本(在撰写本文时尚未发布)使用 RxJava 版本 1。

flatmap正如您现在所拥有的那样,调用会将操作发布到内部缓冲区以异步执行,并返回 Observable跟踪每一个。这意味着第一个 flatmap将消耗您的 from 的输出以无限制的方式调用。换句话说,它读取整个列表的速度比操作发生的速度快得多。我预计您看到的 OOM 错误是由于 Couchbase 内部缓冲区溢出造成的。

要纠正此问题,您可以使用 flatmap 的变体这限制了未完成订阅的数量。您只需将第二个整数参数添加到 flatmap 中即可。称呼。所以你会有 .flatmap(new Func1<~>..., 10)将自己限制为一次 10 个未完成的操作。

Couchbase 中的默认缓冲区约为 16000 个未完成的操作,但这远远超出了大多数系统饱和所需的数量。

有关引用,请参阅相关的 Stack Overflow post关于限制文件上传的吞吐量。

关于rx-java - 使用响应式(Reactive)编程将 couchbase 文档从一个存储桶复制到另一个存储桶时出现 OOM 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54998435/

相关文章:

android - RXJava 调度程序/线程如何为不同的运算符(operator)工作?

Couchbase Sync Gateway - 获取文档的先前修订版?

ios - 相当于在 Swift Combine 中使用 @Published 的计算属性?

kotlin - 结合入站 channel 适配器和流发射器

android - 内存不足错误 RxAndroid + RxJava + Retrofit2

android - RXJava 将函数应用于 Observable 中的每个元素并返回它的可观察值

c# - .NET SDK 中的 Couchbase NodeUnavailableException

nosql - Openstack 上的 Couchbase XDCR

swift - 如何使用先前可观察链的结果?

android - 如何使用 rxjava + retrofit2 编写嵌套网络请求