javascript - 如何处理来自fs readline.Interface异步迭代器的错误

标签 javascript node.js async-iterator

基于processLineByLine()的示例,我注意到如果给定的文件名不存在,我们将无法捕获错误。在这种情况下,程序将以类似以下内容的方式结束:


  UnhandledPromiseRejectionWarning:错误:ENOENT:没有此类文件或目录


因此,我要引发可捕获错误的最简单方法是对processLineByLine()函数进行2次修改:


function*之类的生成器打开它
文件上的await存在检查await access(filename, fs.constants.F_OK)


最后,我不得不将readline.Interface实例转换为异步生成器。我特别不喜欢这最后一部分。产生的lines()函数类似于:

export async function* lines(filename) {
    await access(filename, fs.constants.F_OK)
    const lines = readline.createInterface({
        input: fs.createReadStream(filename),
        crlfDelay: Infinity
    })
    for await (const l of lines) {
        yield l
    }
}


问题:是否有更好的方法使lines()返回异步迭代器,或者如果文件名不存在,则抛出错误?

错误报告:关于@ jfriend00观察,我在nodejs上打开了一个Bug问题:https://github.com/nodejs/node/issues/30831

最佳答案

嗯,这是一个棘手的问题。即使检测到该文件是否作为预检文件,也不能保证您可以成功打开该文件(该文件可能已锁定或存在权限问题),并且不能在打开前检测该文件是否存在是服务器开发中的经典竞争条件(小窗口,但仍然是比赛条件)。

我仍然认为必须有一种更好的方法来消除fs.createReadStream()中的错误,但是我能找到的唯一方法是将其包装在一个Promise中,该Promise仅在成功打开文件后才能解决。这样一来,您就可以从打开文件中获取错误,然后将其传播回async函数的调用者。如下所示:

const fs = require('fs');
const readline = require('readline');

function createReadStreamSafe(filename, options) {
    return new Promise((resolve, reject) => {
        const fileStream = fs.createReadStream(filename, options);
        fileStream.on('error', reject).on('open', () => {
            resolve(filestream);
        });

    });
}

async function processLineByLine(f) {
  const fileStream = await createReadStreamSafe(f);

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  for await (const line of rl) {
    // Each line in input.txt will be successively available here as `line`.
    console.log(`Line from file: ${line}`);
  }
}

processLineByLine("nofile").catch(err => {
    console.log("caught error");
});


这使得processLineByLine()返回的承诺将被拒绝,您可以在那里处理我认为您要的错误。如果我误解了您的要求,请澄清一下。

仅供参考,在我看来,这是readline.createInterface()中的错误,因为它似乎应该在for await (const line of rl)的第一次迭代中拒绝,但这似乎不是发生的情况。

因此,其结果是,即使这种变通方法也无法在流打开后检测到流中的读取错误。确实需要将其固定在createInterface()内部。我同意文件打开错误或读取错误都应显示为for await (const line of rl)上的拒绝。



解决文件打开问题的另一种方法是使用await fs.promises.open(...)预打开文件,然后将fd传递给fs.createReadStream,然后您自己会看到打开错误。



一种不同的解决方案-包装readLine迭代器以添加错误处理

警告,这看起来有点像hack,但这是一个非常有趣的学习项目,因为我最终不得不用我自己的readline asyncIterator包装起来,以便在检测到readStream上的错误时拒绝( readline库丢失的错误处理)。

我开始执行一项任务,以弄清楚如何编写一个processLineByLine()函数,该函数将返回一个asyncIterator,该函数将正确拒绝流错误(即使readline代码在此方面有错误),同时仍然使用readline内部库。

目的是能够编写如下代码:

for await (let line of processLineByLine("somefile1.txt")) {
     console.log(line);
 }


可以正确处理内部使用的readStream上的错误,无论该文件是否不存在,存在但无法打开,甚至在读取时稍后遇到读取错误。由于我没有在内部更改/修复readline接口代码,因此我必须在readStream上安装我自己的error侦听器,并且当我看到那里的错误时,我需要使readline接口的所有未决或将来的承诺都被拒绝。

我最终得到的是:

// This is an experiment to wrap the lines asyncIterator with our own iterator
// so we can reject when there's been an error on the readStream.  It's really
// ugly, but does work.

const fs = require('fs');
const readline = require('readline');

function processLineByLine(filename, options = {}) {
    const fileStream = fs.createReadStream(filename, options);
    let latchedError = null;
    let kill = new Set();

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });

    const lines = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
    });

    // create our own little asyncIterator that wraps the lines asyncIterator
    //   so we can reject when we need to
    function asyncIterator() {
        const linesIterator = lines[Symbol.asyncIterator]();
        return {
            next: function() {
                if (latchedError) {
                    return Promise.reject(latchedError);
                } else {
                    return new Promise((resolve, reject) => {
                        // save reject handlers in higher scope so they can be called 
                        // from the stream error handler
                        kill.add(reject);

                        let p = linesIterator.next();

                        // have our higher level promise track the iterator promise
                        // except when we reject it from the outside upon stream error
                        p.then((data => {
                            // since we're resolving now, let's removing our reject
                            // handler from the kill storage.  This will allow this scope
                            // to be properly garbage collected
                            kill.delete(reject);
                            resolve(data);
                        }), reject);
                    });
                }
            }
        }
    }

    var asyncIterable = {
        [Symbol.asyncIterator]: asyncIterator
    };

    return asyncIterable;
}

async function runIt() {
    for await (let line of processLineByLine("xfile1.txt")) {
         console.log(line);
     }
 }

runIt().then(() => {
    console.log("done");
}).catch(err => {
    console.log("final Error", err);
});


有关其工作原理的一些解释...

我们自己的错误监控

首先,您可以看到以下内容:

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });


这是我们对readStream进行的错误监视,以弥补readline内部缺少的错误处理。每当我们看到错误时,都会将其保存在更高范围的变量中,以备日后使用。如果从readline对此流注册了任何未完成的promise,我们将“杀死”它们(拒绝它们,稍后您将看到其工作原理) )。

没有特殊处理文件打开错误

目标的一部分是摆脱以前的文件打开错误解决方案中的特殊处理。我们希望readStream上的任何错误触发对asyncIterable的拒绝,因此这是一种更为通用的机制。文件打开错误将在此错误处理中被捕获,就像其他读取错误一样。

我们自己的asyncIterable和asyncIterator

调用readline.createInterace()返回asyncIterable。它与常规可迭代项基本上相同,因为您可以在其上调用特殊属性以获取asyncIterator。该asyncIterator具有与常规迭代器相同的.next()属性,除了调用asyncIterator.next()时,它返回一个解析为对象而不是对象的promise。

因此,这就是for await (let line of lines)的工作方式。它首先调用lines[Symbol.asyncIterator]()以获得asyncIterator。然后,在它返回的asyncIterator上,它反复执行await asyncIterator.next()以等待asyncIterator.next()返回的承诺。

现在,readline.createInterface()已经返回了这样的asyncIterable。但是,它的工作方式不太正确。当readStream遇到错误时,它不会拒绝.next()在每次迭代中返回的承诺。实际上,那个承诺永远不会被拒绝或解决。因此,事情陷入僵局。在我的测试应用程序中,该应用程序将退出,因为readStream已完成(在错误发生之后),并且即使承诺仍在等待中,也不再阻止该应用程序退出。

因此,我需要一种方法来强制保证readlineIterator.next()先前已返回并且当前正在等待for await (...)拒绝。好吧,promise不会提供用于拒绝它的外向接口,并且我们无法访问有权拒绝它的readline实现的内部。

我的解决方案是将readlineIterator与我自己的包装在一起,作为一种代理。然后,我们自己的错误检测器看到一个错误,并且readline有未完成的promise,我可以使用我的代理/包装器强制拒绝那些未完成的promise。这将导致for await (...)看到拒绝并得到适当的错误。并且,它起作用。

我花了一些时间来了解asyncIterators的工作原理,以便能够包装一个。我非常感谢这篇Asynchronous Iterators in JavaScript文章,该文章提供了一些非常有用的代码示例,用于构造您自己的asyncIterable和asyncIterator。实际上,这是在此练习中真正学习的地方,而其他人也可以通过了解以上代码中的工作方式来学习。

强迫放弃承诺

这段代码中的“丑陋”之处在于,从承诺的拒绝处理程序的通常范围之外强制承诺拒绝。这是通过将拒绝处理程序存储在更高级别的范围内来完成的,其中readStream的错误处理可以调用​​承诺拒绝的触发器。可能有一种更优雅的方式对此进行编码,但这是可行的。

使我们自己的asyncIterable

异步可迭代对象只是一个对象,它具有一个名为[Symbol.asyncIterator]的属性。该属性必须是一个在不带参数的情况下返回asyncIterator.的函数。因此,这是我们的asyncIterable

var asyncIterable = {
    [Symbol.asyncIterator]: asyncIterator
};


制作我们自己的asyncIterator

asyncIterator是一个函数,当被调用时,该函数将返回带有next()属性的对象。每次调用obj.next()时,它将返回一个解析为常规迭代器元组对象{done, value}的promise。我们不必担心解析的值,因为我们只需从readline的迭代器中获取该值即可。因此,这是我们的asyncIterator

// create our own little asyncIterator that wraps the lines asyncIterator
//   so we can reject when we need to
function asyncIterator() {
    const linesIterator = lines[Symbol.asyncIterator]();
    return {
        next: function() {
            if (latchedError) {
                return Promise.reject(latchedError);
            } else {
                return new Promise((resolve, reject) => {
                    // save reject handlers in higher scope so they can be called 
                    // from the stream error handler
                    kill.push(reject);

                    let p = linesIterator.next();

                    // have our higher level promise track the iterator promise
                    // except when we reject it from the outside upon stream error
                    p.then(resolve, reject);
                });
            }
        }
    }
}


首先,它从readline接口(我们正在代理/包装的接口)中获取asyncIterator并将其本地存储在作用域中,以便以后使用。

然后,它返回格式为{next: fn}的强制迭代器结构。然后,在该函数内部就是我们的包装逻辑展开的地方。如果我们看到先前的锁存错误,那么我们总是返回Promise.reject(latchedError);。如果没有错误,那么我们将返回一个手动构建的Promise。

在用于该承诺的执行程序函数中,我们通过将拒绝处理添加到名为Set的更高范围的kill中来注册拒绝处理。如果允许更高范围的filestream.on('error', ....)处理程序通过调用该函数看到错误,则可以拒绝该承诺。

然后,我们调用linesIterator.next()以获得返回的保证。我们对那个承诺的resolve和拒绝回调都感兴趣。如果可以正确解决该承诺,则可以从更高级别的作用域中删除拒绝处理程序(以实现对我们作用域的更好的垃圾回收),然后使用相同的解析值来解决我们的wrap / proxy承诺。

如果linesIterator承诺被拒绝,我们只需将拒绝权通过包装/代理承诺即可。

我们自己的文件流错误处理

因此,现在是最后的解释。我们有这个错误处理程序来监视流:

fileStream.on('error', (err) => {
    latchedError = err;
    // any open promises waiting on this stream, need to get rejected now
    for (let fn of kill) {
        fn(err);
    }
});


这有两件事。首先,它存储/锁存错误,因此将来对行迭代器的任何调用都只会因先前的错误而拒绝。其次,如果行迭代器中有任何待处理的承诺等待解决,它将循环遍历kill集并拒绝这些承诺。这就是使asyncIterator承诺正确拒绝的原因。这应该在readline代码内部发生,但是由于它没有正确执行,因此我们强行拒绝了wrap / proxy承诺,因此,当流遇到错误时,调用方将看到正确的拒绝。



最后,您可以这样做,因为所有丑陋的细节都隐藏在包装的asyncIterable后面:

async function runIt() {
    for await (let line of processLineByLine("xfile1.txt")) {
         console.log(line);
     }
 }

runIt().then(() => {
    console.log("done");
}).catch(err => {
    console.log("final Error", err);
});

关于javascript - 如何处理来自fs readline.Interface异步迭代器的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59216364/

相关文章:

javascript - HTML5 视频 canPlay 事件第二次不起作用

javascript - Yeoman 使用谷歌应用引擎服务器

javascript - 使用 javascript/jquery 随机化 CSS3 动画时间?

javascript - 完成后无法禁用 Bootstrap 按钮

javascript - 带有嵌套函数的 Node 异步函数

javascript - 为什么这个 readline 异步迭代器不能正常工作?

javascript - 如何映射异步生成器?

javascript - 使用 postman 无法从express.js服务器获得响应

angularjs - 元素数组无法正常工作

javascript - 如何在中断 "for await"循环后*重新打开* AsyncIterator?