javascript - 如何在node.js中创建后台单线程FIFO作业队列

标签 javascript node.js concurrency

我是一名 JS/node 初学者,我正在尝试了解 Javascript 中的“并发”主题。我认为我现在对回调相当满意,但我不认为这是我的场景中要走的路。基本上,我有重复执行的昂贵任务(worker),我需要在主进程继续工作的同时逐一处理这些任务。这是一个最小的测试。

/*
 * We have a worker function  that does some expensive task, e.g., an
 * I/O task or something else.
 */
worker = function ( jobId ) {

    // It will take something between 1 and 2 seconds.
    var runtime = Math.floor((Math.random() * 1000) + 1000);
    console.log("started job #" + jobId + " (" + runtime + " ms)");

    // Then the worker will do something for a while ...
    setTimeout(
        function() {
            // .. and at some point it'll be finished.
            console.log("finished job #" + jobId);
        }, runtime
    );

};

/*
 * We obviously have a main process that meanwhile does other stuff
 * like processed user interactions. In this case we call this until
 * some artificial tickets are used.
 */
mainprocess = function (tickets) {

    // Simulate some processing time ..
    var runtime = Math.floor((Math.random() * 500));
    setTimeout(
        function() {
            console.log("main process #" + tickets + " (" + runtime + " ms)");
            if (tickets > 0) {
                tickets--;
                mainprocess(tickets);
            }
        }, runtime
    );
}

// At some point in the code we create workers and we want to make sure
// they're processed in the *order of creation* and *one after another* 
// without blocking the main process ...
for ( var i = 1; i <= 10; i++) {
    worker(i);
};

// ... and the some other stuff will happen for a while!
mainprocess(10);

// ..

代码当前输出类似..

started job #1 (1751 ms)
started job #2 (1417 ms)
...
started job #9 (1050 ms)
started job #10 (1864 ms)
main process #10 (142 ms)
main process #9 (228 ms)
main process #8 (149 ms)
main process #7 (88 ms)
main process #6 (410 ms)
finished job #9
finished job #5
main process #5 (265 ms)
finished job #2
main process #4 (270 ms)
finished job #7
finished job #3
finished job #1
...
main process #1 (486 ms)
main process #0 (365 ms)

我真的不知道如何更改代码,以便主进程继续运行,而工作线程按创建顺序执行(当前仅按正确顺序启动)和一个接一个(目前都是并行的)。所需的输出将是..

started job #1 (1384 ms)
main process #10 (268 ms)
main process #9 (260 ms)
main process #8 (216 ms)
main process #7 (93 ms)
main process #6 (160 ms)
main process #5 (269 ms)
main process #4 (44 ms)
finished job #1
started job #2 (1121 ms)
main process #3 (172 ms)
main process #2 (170 ms)
main process #1 (437 ms)
finished job #2
started job #3 (1585 ms)
main process #0 (460 ms)
finished job #3
started job #4 (1225 ms)
finished job #4
started job #5 (1300 ms)
finished job #5

如有任何帮助,我们将不胜感激。

最佳答案

好的,我已经弄清楚了。扩展和注释代码使用 javascript 的内置 Promise,但您可以使用 Q 实现相同的效果。或Bluebird或任何其他 Node 兼容的 promise 库。请注意,jquery $.Deferred 对象在 Node 环境中不可用。

/*
 * We have a worker function  that does some expensive task, e.g., an
 * I/O task or something else.
 */
worker = function ( jobId ) {

    // we need to put it into a new promise object
    return new Promise(function(resolve, reject) {

        // It will take something between 1 and 2 seconds.
        var runtime = Math.floor((Math.random() * 1000) + 1000);
        console.log("started job #" + jobId + " (" + runtime + " ms)");

        // Then the worker will do something for a while ...
        setTimeout(
            function() {
                // .. and at some point it'll be finished.
                console.log("finished job #" + jobId);
                 // .. now we have to resolve the promise!!
                resolve("resolved job #" + jobId);
            }, runtime
        );

    });
};


/*
 * We obviously have a main process that meanwhile does other stuff
 * like processed user interactions. In this case we call this until
 * some artificial tickets are used.
 */
mainprocess = function (tickets) {

    // Simulate some processing time ..
    var runtime = Math.floor((Math.random() * 500));
    setTimeout(
        function() {
            console.log("main process #" + tickets + " (" + runtime + " ms)");
            if (tickets > 0) {
                tickets--;
                mainprocess(tickets);
            }
        }, runtime
    );
}

// create a sequence with a resolved promise
var sequence = Promise.resolve();

// At some point in the code we create workers and we want to make sure
// they're processed in the *order of creation* and *one after another*
// without blocking the main process ...
for ( var i = 1; i <= 10; i++) {
    // create an IIFE so that the current "i" gets its own
    // closure when it will be used later (otherwise all job ids
    // would be "11" on invokation of the worker).
    (function() {
        var jobId = i;
        // add a new promise after the previous promise resolved
        sequence = sequence.then(
            function(result) {
                // handle result later
                return worker(jobId);
                // return just added the next promise to the chain!
            },
            function(err) {
                // handle error later
            }
        );
    })(); // END IIFE
};

// ... and the some other stuff will happen for a while!
mainprocess(10);

// ..

输出符合要求:

started job #1 (1384 ms)
main process #10 (268 ms)
main process #9 (260 ms)
main process #8 (216 ms)
main process #7 (93 ms)
main process #6 (160 ms)
main process #5 (269 ms)
main process #4 (44 ms)
finished job #1
started job #2 (1121 ms)
main process #3 (172 ms)
main process #2 (170 ms)
main process #1 (437 ms)
finished job #2
started job #3 (1585 ms)
main process #0 (460 ms)
finished job #3
started job #4 (1225 ms)
finished job #4
started job #5 (1300 ms)
finished job #5
...

我必须对以下文章给予高度赞扬,它们提供了见解:

关于javascript - 如何在node.js中创建后台单线程FIFO作业队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37070813/

相关文章:

javascript - RequireJS 搜索 app.app 而不是 app.js(其中 app.js 是 data-main 中指定的入口点)

node.js - 在 Heroku 的 Express Node 中存储 api key 的位置

javascript - forEach 循环中未定义的值

java - 这个字典函数是线程安全的(ConcurrentHashMap+AtomicInteger)吗?

javascript - onClick 在 React js 中没有触发

javascript - 如何将 Websockets 与 Pyramid 和 socket.io 一起使用?

javascript - 在html中调用js方法

javascript - 在nodejs中运行cron作业

c++ - 并发的 C++03 内存模型是什么?

java - 当键已经存在时,映射允许 putIfAbsent 而不创建值