javascript - Nodejs 在异步上创建简单队列

标签 javascript node.js

在下面的示例代码中,我可以同时运行多个函数

Promise.all([sendMoneyToRequestedUser(_data), saveTransferMoneyTransaction(_data)])
    .then(function (results) {
        log.info("OOOOOOOOOOOOOo");
    }).catch(function (error) {
    log.info(error)
});

但是我想为他们创建简单的队列,例如,我可以使用此解决方案使用 Promise 实现它吗?我有 3 个函数 checksendpost,我想连续运行并将每个步骤的结果传递给另一个。

第一步check()函数,第二步send()函数,完成后它们执行函数,例如 post()

对于此实现,我需要获取每个步骤的结果,例如从第 1 步 获取结果,如果它返回 true 那么在第 2 步 我需要使用结果第 1 步

async.parallel 是这个实现的解决方案吗?或者 Promise 可以做到这一点

最佳答案

更新:

有一个模块:https://www.npmjs.com/package/promise-queue

旧答案:

我在尝试使用各种模块时遇到了问题,最后我为此类工作编写了我能想到的最简单的实现。

看看我写的这个简单的类(Plain JS):

class Queue {
    constructor(maxSimultaneously = 1) {
        this.maxSimultaneously = maxSimultaneously;
        this.__active = 0;
        this.__queue = [];
    }
    
    /** @param { () => Promise<T> } func 
     * @template T
     * @returns {Promise<T>}
    */
    async enqueue(func) {
        if(++this.__active > this.maxSimultaneously) {
            await new Promise(resolve => this.__queue.push(resolve));
        }

        try {
            return await func();
        } catch(err) {
            throw err;
        } finally {
            this.__active--;
            if(this.__queue.length) {
                this.__queue.shift()();
            }
        }
    }
}

像这样使用它:

假设你有这个异步函数:

const printNumber = async (n) => {
    await new Promise(res => setTimeout(res, 2000)); // wait 2 sec
    console.log(n);
}

所以,而不是:

await printNumber(1);
await printNumber(2);
await printNumber(3);
await printNumber(4);

使用:

const q = new Queue();

q.enqueue(() => printNumber(1));
q.enqueue(() => printNumber(2));
q.enqueue(() => printNumber(3));
q.enqueue(() => printNumber(4));

每个功能将在其他功能完成后执行。

输出:

1 // after 2 sec
2 // after 4 sec
3 // after 6 sec
4 // after 8 sec

或者您可以限制队列同时运行一定数量的函数:

const q = new Queue(3);

q.enqueue(() => printNumber(1));
q.enqueue(() => printNumber(2));
q.enqueue(() => printNumber(3));
q.enqueue(() => printNumber(4));

输出:

1 // after 2 sec
2 // after 2 sec
3 // after 2 sec
4 // after 4 sec

此外,enqueue 方法将从您的 promise 中返回/抛出原始数据!

假设您编写了一个 API 来上传文件,并且您希望将同时上传的文件限制为最多 5 个。你希望一切都保持原样,而不改变你的流程。以下是您可以如何做到这一点:

async function upload(data) {
    // upload...
    if(something) {
        return 200;
    } else {
        throw 400;
    }
}

所以,与其这样做:

async function work(data) { 
    // do something...
    return await upload(data);
}

这样做:

const q = new Queue(5); // up to 5 at the same time
async function work(data) { 
    // do something...
    return await q.enqueue(() => upload(data));
}

class Queue {
    constructor(maxSimultaneously = 1) {
        this.maxSimultaneously = maxSimultaneously;
        this.__active = 0;
        this.__queue = [];
    }

    /** @param { () => Promise<T> } func 
     * @template T
     * @returns {Promise<T>}
    */
    async enqueue(func) {
        if(++this.__active > this.maxSimultaneously) {
            await new Promise(resolve => this.__queue.push(resolve));
        }

        try {
            return await func();
        } catch(err) {
            throw err;
        } finally {
            this.__active--;
            if(this.__queue.length) {
                this.__queue.shift()();
            }
        }
    }
}

const printNumber = async (n) => {
    await new Promise(res => setTimeout(res, 2000)); // wait 2 sec
    console.log(n);
}

async function start() {
    console.log('starting...');
    const q = new Queue();
    
    q.enqueue(() => printNumber(1));
    q.enqueue(() => printNumber(2));
    q.enqueue(() => printNumber(3));
    q.enqueue(() => printNumber(4));
}
Click this to run 1 log per 2 sec: <button onclick="start();">Start</button>

class Queue {
    constructor(maxSimultaneously = 1) {
        this.maxSimultaneously = maxSimultaneously;
        this.__active = 0;
        this.__queue = [];
    }

    /** @param { () => Promise<T> } func 
     * @template T
     * @returns {Promise<T>}
    */
    async enqueue(func) {
        if(++this.__active > this.maxSimultaneously) {
            await new Promise(resolve => this.__queue.push(resolve));
        }

        try {
            return await func();
        } catch(err) {
            throw err;
        } finally {
            this.__active--;
            if(this.__queue.length) {
                this.__queue.shift()();
            }
        }
    }
}

const printNumber = async (n) => {
    await new Promise(res => setTimeout(res, 2000)); // wait 2 sec
    console.log(n);
}

async function start() {
    console.log('starting...');
    const q = new Queue(3);
    
    q.enqueue(() => printNumber(1));
    q.enqueue(() => printNumber(2));
    q.enqueue(() => printNumber(3));
    q.enqueue(() => printNumber(4));
}
Click this to run up to 3 logs every 2 sec: <button onclick="start();">Start</button>

关于javascript - Nodejs 在异步上创建简单队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38903707/

相关文章:

javascript - 在 Node.js 中使用 axios 发布表单数据

javascript - 如何在javascript中反序列化JSON时间跨度

javascript待办事项列表不向待办事项列表添加新条目

node.js - Mongoose 通过id方法删除子文档

node.js - 无法使用 Node.js 连接到我的 Docker RabbitMQ

node.js - 图片未通过node.jsexpress转发

javascript - 有没有办法从 JavaScript 中的 map 包含 block 返回?

javascript - 如何在列中的div中设置div

node.js - Openshift 上部署的 Sails.js 项目的可视化错误

javascript - 当我指定 JSON 时, Node GET 请求返回 html?