我需要处理 100 万行记录,转换其中的每一行,并将它们保存到多个文件中(按小时分箱;每小时 1 个文件 - 我正在考虑通过“过滤器”将它们拆分) .
出于某种原因,我需要严格按顺序处理这些行。意思是,如果 #450000 行需要更长的时间来处理和保存(这是棘手的部分,因为 fs 与回调异步),处理不会跳转到 #450001...它将等到 450000 完成。代码中的随机休眠就是为了模拟这种情况。
以前(使用简单的 Promise,没有 RxJ),我会创建 N 个 Promise,每一行一个,将它们保存在一个数组中,并通过 reduce 操作进行链接,如下所述:https://github.com/kriskowal/q
但我不想创建 100 万个 Promise 实例。所以,我研究了ReactiveX,希望它能像“推卸责任”一样;意味着它不会等待,一旦事件弹出,处理就会发生,并且处理所使用的资源(认为处理 block 基本上是幕后的 promise )将尽快释放。
我尝试用以下代码验证这一点:
import Rx from 'rxjs-es6/Rx';
import Q from 'q';
let subject = new Rx.Subject();
let processEventJsons = function(observable) {
observable.concatMap(eventJson => {
let deferred = Q.defer();
setTimeout(() => {
eventJson.procDatetime = new Date().toISOString();
deferred.resolve(eventJson);
}, Math.random() * 5000);
return Rx.Observable.fromPromise(deferred.promise)
})
.subscribe({
next: enrichedEventJson => {
console.log(JSON.stringify(enrichedEventJson));
},
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
}
processEventJsons(
subject.filter(dataJson => dataJson.type === "interview").map(dataJson => {
return {event: "intv", datetime: dataJson.datetime}
})
)
processEventJsons(
subject.filter(dataJson => dataJson.type === "checkin").map(dataJson => {
return {event: "chki", datetime: dataJson.datetime}
})
)
for (let i = 0; i < 1000000; i++) {
if (Math.random() < 0.5) {
subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
} else {
subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
}
}
subject.complete();
但是...我不断得到:
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory.
console.log(JSON.stringify(enrichedEventJson));在“for-loop”(在代码末尾)完成之前不会打印。
这让我觉得切换到 RxJS 并没有真正改善情况;它仍然在幕后排队 Promise。
或者我错误地使用了API?你能帮我指出哪里出了问题吗?
更新更新:
假旗。发现问题不在于 RxJS 的使用,而在于 for 循环(太紧了)。所以我把它改为:
for (let i = 0; i < 1000000; i++) {
if (Math.random() < 0.5) {
setTimeout(() => {
subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
});
} else {
setTimeout(() => {
subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
});
}
}
最佳答案
I would create N promises, one for each line, keep them in an array, and do the chaining by reduce op
这是一种简单但占用内存的方法。它使用了需要同时存在的一百万个 promise 。相反,您可以使用递归方法在常量内存中按顺序处理行:
function getInput(i) {
return {id: i, type: Math.random() < 0.5 ? "interview" : "checkin", datetime: new Date().toISOString()};
}
function process(eventJson) {
let deferred = Q.defer();
setTimeout(() => {
eventJson.procDatetime = new Date().toISOString();
deferred.resolve(eventJson);
}, Math.random() * 5000);
return deferred.promise;
}
function filteredProcess({type, datetime}) {
if (type === "interview")
return process({event: "intv", datetime});
if (type === "checkin")
return process({event: "chki", datetime});
}
function log(enrichedEventJson) {
console.log(JSON.stringify(enrichedEventJson));
}
function loop(i) {
if (i < 1000000)
return getInput(i)
.then(filteredProcess)
.then(log)
.then(() => loop(i+1));
else
return Q("done")
}
loop().then(console.log, err => console.error('something wrong occurred: ' + err));
关于javascript - RxJS 中的排序导致 "out of memory"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42797487/