node.js - 运行 NodeJS 事件循环/等待子进程完成

标签 node.js event-loop

我首先尝试对问题进行一般描述,然后更详细地说明为什么通常的方法不起作用。如果您想阅读这些抽象的解释,请继续。最后我解释了更大的问题和具体的应用程序,所以如果你更愿意阅读,请跳到“实际应用程序”。

我正在使用 node.js 子进程来做一些计算密集型的工作。父进程完成它的工作,但在执行的某个时刻,它到达了一个点,在继续之前它必须拥有来自子进程的信息。因此,我正在寻找一种等待子进程完成的方法。

我目前的设置看起来有点像这样:

importantDataCalculator = fork("./runtime");
importantDataCalculator.on("message", function (msg) {
    if (msg.type === "result") {
        importantData = msg.data;
    } else if (msg.type === "error") {
        importantData = null;
    } else {
        throw new Error("Unknown message from dataGenerator!");
    }
});

和其他地方
function getImportantData() {
    while (importantData === undefined) {
        // wait for the importantDataGenerator to finish
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

所以当父进程启动时,它执行第一部分代码,产生一个子进程来计算数据并继续做它自己的工作。当需要子进程的结果继续时,它会调用 getImportantData() 。所以这个想法是 getImportantData() 阻塞直到数据被计算出来。

但是,我使用的方式不起作用。我认为这是由于我使用 while 循环阻止了事件循环的执行。并且由于事件循环不执行,无法接收来自子进程的消息,因此while循环的条件无法改变,使其成为无限循环。

当然,我真的不想使用这种while循环。我宁愿做的是告诉 node.js “执行事件循环的一次迭代,然后回到我身边”。我会重复执行此操作,直到收到我需要的数据,然后通过从 getter 返回来继续执行我离开的地方。

我意识到他会带来多次重新进入同一个函数的危险,但是我想在事件循环中使用它的模块除了等待来自子进程的这条消息并发送其他消息报告它的进度外,几乎什么都不做,所以这应该不是问题。

有没有办法在 Node.js 中只执行一次事件循环迭代?还是有另一种方法可以实现类似的目标?或者是否有一种完全不同的方法来实现我在这里尝试做的事情?

到目前为止,我能想到的唯一解决方案是以引入另一个过程的方式更改计算。在这种情况下,会有计算重要数据的进程,计算不需要重要数据的数据位的进程,以及这两者的父进程,它只是等待来自两个子进程的数据并结合他们到达时的碎片。由于它本身不需要做任何计算密集型的工作,它可以只等待来自事件循环(=消息)的事件并对它们使用react,根据需要转发组合数据并存储尚不能组合的数据片段。
然而,这引入了另一个进程,甚至更多的进程间通信,这引入了更多的开销,我想避免这种情况。

编辑

我看到需要更多细节。

父进程(我们称之为进程 1)本身就是一个由另一个进程(进程 0)产生的进程,用于执行一些计算密集型工作。实际上,它只是执行一些我无法控制的代码,因此我无法使其异步工作。我能做(并且已经做)的是让定期执行的代码调用一个函数来报告它的进度并提供部分结果。该进度报告然后通过 IPC 发送回原始流程。

但在极少数情况下,部分结果不正确,因此必须对其进行修改。为此,我需要一些可以独立于正常计算进行计算的数据。但是,此计算可能需要几秒钟;因此,我启动另一个进程(进程 2)来进行这个计算,并通过 IPC 消息将结果提供给进程 1。现在进程 1 和 2 正在愉快地计算这些东西,希望进程 2 计算的校正数据在进程 1 需要它之前完成。但有时需要更正过程 1 的早期结果之一,在这种情况下,我必须等待过程 2 完成其计算。阻塞进程1的事件循环理论上不是问题,因为主进程(进程0)不会受到它的影响。唯一的问题是,通过阻止进程 1 中代码的进一步执行,我也阻塞了事件循环,这阻止了它从进程 2 接收结果。

所以我需要以某种方式暂停进程 1 中代码的进一步执行而不阻塞事件循环。我希望有一个像 process.runEventLoopIteration 这样的调用,它执行事件循环的迭代然后返回。

然后我会像这样更改代码:
function getImportantData() {
    while (importantData === undefined) {
        process.runEventLoopIteration();
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

因此执行事件循环,直到我收到必要的数据,但不继续执行调用 getImportantData() 的代码。

基本上我在过程 1 中所做的是:
function callback(partialDataMessage) {
    if (partialDataMessage.needsCorrection) {
        getImportantData();
        // use data to correct message
        process.send(correctedMessage); // send corrected result to main process
    } else {
        process.send(partialDataMessage); // send unmodified result to main process
    }
}

function executeCode(code) {
    run(code, callback); // the callback will be called from time to time when the code produces new data
    // this call is synchronous, run is blocking until the calculation is finished
    // so if we reach this point we are done
    // the only way to pause the execution of the code is to NOT return from the callback 
}

实际应用/实现/问题

我需要以下应用程序的这种行为。如果您有更好的方法来实现这一目标,请随时提出。

我想执行任意代码并被通知它更改了哪些变量、调用了哪些函数、发生了哪些异常等。我还需要这些事件在代码中的位置,以便能够在 UI 旁边的 UI 中显示收集的信息原始代码。

为了实现这一点,我检测了代码并将回调插入其中。然后我执行代码,将执行包装在 try-catch 块中。每当使用有关执行的一些数据(例如变量更改)调用回调时,我都会向主进程发送一条消息,告诉它有关更改的信息。这样,用户会在代码运行时收到有关代码执行的通知。这些回调生成的事件的位置信息在检测期间添加到回调调用中,因此这不是问题。

出现问题,出现异常时。我还想通知用户有关测试代码中的异常。因此,我将代码的执行封装在 try-catch 中,并且捕获执行之外的任何异常并将其发送到用户界面。但是错误的位置不正确。 node.js 创建的 Error 对象有一个完整的调用堆栈,所以它知道它发生在哪里。但是这个位置如果相对于检测的代码,所以我不能按原样使用这个位置信息,在原始代码旁边显示错误。我需要将检测代码中的这个位置转换为原始代码中的一个位置。为此,在检测代码后,我计算了 source map 以将检测代码中的位置映射到原始代码中的位置。但是,此计算可能需要几秒钟。所以,我想,我会启动一个子进程来计算源映射,而检测代码的执行已经开始。然后,当发生异常时,我检查源 map 是否已经计算过,如果没有,我等待计算完成才能更正位置。

由于要执行和观察的代码可以是完全任意的,我不能简单地将其重写为异步的。我只知道它调用了提供的回调,因为我检测了代码来这样做。我也不能只存储消息并返回继续执行代码,在下一次调用期间检查源映射是否已完成,因为继续执行代码也会阻塞事件循环,阻止计算源在执行过程中从未收到过映射。或者如果它被接收,那么只有在要执行的代码完全完成之后,这可能会很晚或永远不会(如果要执行的代码包含无限循环)。但是在我收到 sourceMap 之前,我无法发送有关执行状态的进一步更新。结合起来,这意味着我只能在要执行的代码完成后(可能永远不会)发送更正的进度消息,这完全违背了程序的目的(使程序员能够观察代码所做的事情,而执行)。

暂时将控制权交给事件循环将解决这个问题。然而,这似乎是不可能的。我的另一个想法是引入第三个进程来控制执行进程和 sourceMapGeneration 进程。它从执行过程接收进度消息,如果有任何消息需要更正,它会等待 sourceMapGeneration 过程。由于进程是独立的,控制进程可以存储接收到的消息并等待 sourceMapGeneration 进程,同时执行进程继续执行,一旦它接收到源映射,它就会纠正消息并将它们全部发送出去。

但是,这不仅需要另一个进程(开销),还意味着我必须在进程之间再次传输代码,并且由于代码可能有数千行,而这本身可能需要一些时间,所以我想移动它尽量少绕。

我希望这能解释为什么我不能也没有使用通常的“异步回调”方法。

最佳答案

在您澄清您所寻求的行为后,为您的问题添加第三个 (:)) 解决方案我建议使用 Fibers

Fibers 让你在 nodejs 中做 co-routines。协程是允许多个入口/导出点的函数。这意味着您将能够让出控制权并随心所欲地恢复控制权。

这是官方文档中的 sleep 函数,它完全可以做到这一点, sleep 给定的时间并执行操作。

function sleep(ms) {
    var fiber = Fiber.current;
    setTimeout(function() {
        fiber.run();
    }, ms);
    Fiber.yield();
}

Fiber(function() {
    console.log('wait... ' + new Date);
    sleep(1000);
    console.log('ok... ' + new Date);
}).run();
console.log('back in main');

您可以将等待资源的代码放在一个函数中,使其屈服,然后在任务完成后再次运行。

例如,根据问题改编您的示例:
var pausedExecution, importantData;
function getImportantData() {
    while (importantData === undefined) {
        pausedExecution = Fiber.current;
        Fiber.yield();
        pausedExecution = undefined;
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have proper data now
        return importantData;
    }
}

function callback(partialDataMessage) {
    if (partialDataMessage.needsCorrection) {
        var theData = getImportantData();
        // use data to correct message
        process.send(correctedMessage); // send corrected result to main process
    } else {
        process.send(partialDataMessage); // send unmodified result to main process
    }
}

function executeCode(code) {
    // setup child process to calculate the data
    importantDataCalculator = fork("./runtime");
    importantDataCalculator.on("message", function (msg) {
        if (msg.type === "result") {
            importantData = msg.data;
        } else if (msg.type === "error") {
            importantData = null;
        } else {
            throw new Error("Unknown message from dataGenerator!");
        }

        if (pausedExecution) {
            // execution is waiting for the data
            pausedExecution.run();
        }
    });


    // wrap the execution of the code in a Fiber, so it can be paused
    Fiber(function () {
        runCodeWithCallback(code, callback); // the callback will be called from time to time when the code produces new data
        // this callback is synchronous and blocking,
        // but it will yield control to the event loop if it has to wait for the child-process to finish
    }).run();
}

祝你好运!我总是说用 3 种方法解决一个问题比用同样的方法解决 3 个问题要好。我很高兴我们能够找到对你有用的东西。诚然,这是一个非常有趣的问题。

关于node.js - 运行 NodeJS 事件循环/等待子进程完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16378094/

相关文章:

c++ - GLib - C++ 中的主事件循环

assembly - x86 程序集中是否有任何暂停/ sleep 或事件

node.js - Graphql Dataloader 文件结构和上下文

ruby-on-rails - 使用单独的前端/后端服务器将应用程序部署到 Heroku

javascript - axios 如何将 blob 与 arraybuffer 作为响应类型处理?

node.js - Node.JS 和 IIS 的请求机制(事件循环和线程)

javascript - 将数据传递到 Handlebars - node.js

Node.js 异步调用处理和多核扩展

node.js - 使用 rxjs 的集成测试 geteventstore 存在竞争条件

node.js - 如何计算事件循环(NodeJs)的总滴答数?