javascript - RxJS 中的排序导致 "out of memory"错误

标签 javascript node.js promise rxjs reactive-programming

我需要处理 100 万行记录,转换其中的每一行,并将它们保存到多个文件中(按小时分箱;每小时 1 个文件 - 我正在考虑通过“过滤器”将它们拆分) .

出于某种原因,我需要严格按顺序处理这些行。意思是,如果 #450000 行需要更长的时间来处理和保存(这是棘手的部分,因为 fs 与回调异步),处理不会跳转到 #450001...它将等到 450000 完成。代码中的随机休眠就是为了模拟这种情况。

以前(使用简单的 Promise,没有 RxJ),我会创建 N 个 Promise,每一行一个,将它们保存在一个数组中,并通过 reduce 操作进行链接,如下所述:https://github.com/kriskowal/q

但我不想创建 100 万个 Promise 实例。所以,我研究了ReactiveX,希望它能像“推卸责任”一样;意味着它不会等待,一旦事件弹出,处理就会发生,并且处理所使用的资源(认为处理 block 基本上是幕后的 promise )将尽快释放。

我尝试用以下代码验证这一点:

import Rx from 'rxjs-es6/Rx';
import Q from 'q';    

let subject = new Rx.Subject();
let processEventJsons = function(observable) {
  observable.concatMap(eventJson => {
    let deferred = Q.defer();    

    setTimeout(() => {
      eventJson.procDatetime = new Date().toISOString();
      deferred.resolve(eventJson);
    }, Math.random() * 5000);    

    return Rx.Observable.fromPromise(deferred.promise)
  })
  .subscribe({
    next: enrichedEventJson => {
      console.log(JSON.stringify(enrichedEventJson));
    },
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done'),
  });
}    

processEventJsons(
  subject.filter(dataJson => dataJson.type === "interview").map(dataJson => {
    return {event: "intv", datetime: dataJson.datetime}
  })
)    

processEventJsons(
  subject.filter(dataJson => dataJson.type === "checkin").map(dataJson => {
    return {event: "chki", datetime: dataJson.datetime}
  })
)    

for (let i = 0; i < 1000000; i++) {
  if (Math.random() < 0.5) {
    subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
  } else {
    subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
  }
}
subject.complete();

但是...我不断得到:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory.

console.log(JSON.stringify(enrichedEventJson));在“for-loop”(在代码末尾)完成之前不会打印。

这让我觉得切换到 RxJS 并没有真正改善情况;它仍然在幕后排队 Promise。

或者我错误地使用了API?你能帮我指出哪里出了问题吗?

更新更新:

假旗。发现问题不在于 RxJS 的使用,而在于 for 循环(太紧了)。所以我把它改为:

for (let i = 0; i < 1000000; i++) {
  if (Math.random() < 0.5) {
    setTimeout(() => {
      subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
    });
  } else {
    setTimeout(() => {
      subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
    });
  }
}

最佳答案

I would create N promises, one for each line, keep them in an array, and do the chaining by reduce op

这是一种简单但占用内存的方法。它使用了需要同时存在的一百万个 promise 。相反,您可以使用递归方法在常量内存中按顺序处理行:

function getInput(i) {
  return {id: i, type: Math.random() < 0.5 ? "interview" : "checkin", datetime: new Date().toISOString()};
}
function process(eventJson) {
  let deferred = Q.defer();    
  setTimeout(() => {
    eventJson.procDatetime = new Date().toISOString();
    deferred.resolve(eventJson);
  }, Math.random() * 5000);    
  return deferred.promise;
}
function filteredProcess({type, datetime}) {
  if (type === "interview")
    return process({event: "intv", datetime});
  if (type === "checkin")
    return process({event: "chki", datetime});
}
function log(enrichedEventJson) {
  console.log(JSON.stringify(enrichedEventJson));
}

function loop(i) {
  if (i < 1000000)
    return getInput(i)
    .then(filteredProcess)
    .then(log)
    .then(() => loop(i+1));
  else
    return Q("done")
}

loop().then(console.log, err => console.error('something wrong occurred: ' + err));

关于javascript - RxJS 中的排序导致 "out of memory"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42797487/

相关文章:

相当于 php mktime 的 Javascript

javascript - 将功能组件转换为基于类的组件

node.js - redis expireAt 在服务器中没有过期

javascript - 将 fetch 与 async/await 一起使用会返回 [object Object]

javascript - 在 API 中使用 JavaScript Promise

javascript - DOMContentLoaded 事件为单个页面加载触发两次

javascript - 对 Struts2 操作类的 jQuery Ajax 请求总是返回 200 ok,但错误事件被触发

javascript - 正则表达式获取数组中的所有 JS 类名

javascript - Node.js + 对象数组

javascript - 在 promise 内部,我想将值(该值本身来自另一个 promise )分配给将在该 promise 之外使用的变量