javascript - Promise 合并流

标签 javascript node.js promise bluebird

我想逐行读取大文件并将数据插入数据库存储中。 我的函数在它创建的流中返回一个 Promise,并在调用事件 stream.on('end') 时解析它,但这不是我真正想要的,因为在 stream.on( 'data') 它在每一行生成 Promise.map(),我想确定所有插入操作在 resolve() 调用之前完成。在这种情况下我怎样才能生产出正确的链条?

    var loadFromFile = (options) => new Promise(function (resolve, reject) {
        let stream = fs.createReadStream(options.filePath, {
          flags: 'r',
        });

        stream.on('data', (chunk) => {
            /* process data chunk to string Array */  
            Promise.map(lines, (line) => {
            /* process and insert each line here */
            })
            .catch((err)=>{
                reject(err);
            });
          });

          stream.on('end', () => {
              if (someBuisnessLogicCheckHere) {
                reject('Invalid data was found in file');
              }
              resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed
          });
    });

最佳答案

如果您想确保在映射操作中的所有 promise 都已解决之前不会解决,请等待 Promise.map 返回的 promise ;见评论:

var loadFromFile = (options) => new Promise(function(resolve, reject) {
    let stream = fs.createReadStream(options.filePath, {
        flags: 'r',
    });
    let promises = [];                                 // Array of promises we'll wait for

    stream.on('data', (chunk) => {
        promises.push(                                 // Remember this promise
            Promise.map(lines, (line) => {
                /* process and insert each line here */
            })
            .catch((err) => {
                reject(err);
            })
        );
    });

    stream.on('end', () => {
        if (someBuisnessLogicCheckHere) {
            reject('Invalid data was found in file');
        }
        Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving
    });
});

请注意,我不只是执行 Promise.all(promises).then(resolve);,因为这会将一系列分辨率传递给 resolve,这您的原始代码未与调用者共享。

如果数组有可能在充满大多数已解决的 promise 时变得非常大,您可以在它们解决时主动删除它们,只等待最后留下的 promise 。

关于javascript - Promise 合并流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37254448/

相关文章:

mysql - node-mysql 连接池

android - 通过 Android 在 Kotlin 中实现 promise

Javascript/jQuery - 无论如何要简写这个或者不那么迟钝

javascript - 为多个实例使用复选框

Javascript 问题 - 使用 getElementById 间歇性地找不到元素

javascript - 为什么这个 promise 没有得到解决?

javascript - 试图定义一个 promise.all

javascript - 物化选项卡

javascript - Passport.js req.user 返回 : 'Promise { false }' - How do I get the current sessions' username?

javascript - 如何查询 Mongoose 数据库以检索一项?