node.js - 选择适当的异步方法来进行最大请求数/秒的批处理

标签 node.js async.js node-async

我需要对某些外部 API 执行循环调用,并有一定的延迟,以防止“超出用户速率限制”限制。

Google map 地理编码 API 对“请求/秒”敏感,允许 10 个请求/秒。我应该对数百个联系人进行地理编码,而这样的延迟是必需的。因此,我需要 10 个异步地理编码函数,每个函数的后延迟为 1 秒。因此,我收集数组中的所有联系人,然后以异步方式循环遍历数组。

通常,我需要有 N 个并发线程,每个线程结束时有 D 毫秒的延迟。整个循环迭代用户实体数组。像往常一样,每个线程处理单个实体。

我想有这样的代码:

const N = 10;   # threads count
const D = 1000; # delay after each execution

var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }

 var async = require('async') ;
 var people = new Array(900);

 async.batchMethod(people, processUser, N, finalCallback);
<小时/>

在此伪代码中,batchMethod 是我要求的方法。

最佳答案

延迟结果并不是您真正想要的。相反,您希望跟踪已发送的内容以及发送时间,以便一旦低于每秒请求数边界,您就可以发送另一个请求。

<小时/>

这是一个函数的一般概念,它将为您控制每秒固定请求数的速率。这使用了 Promise,并要求您提供一个返回 Promise 的请求函数(如果您现在不使用 Promise,则只需将请求函数包装在 Promise 中)。

// pass the following arguments:
//   array - array of values to iterate
//   requestsPerSec - max requests per second to send (integer)
//   maxInFlight - max number of requests in process at a time
//   fn - function to process an array value
//        function is passed array element as first argument
//        function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
//          or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
    return new Promise(function(resolve, reject) {
        var index = 0;
        var inFlightCntr = 0;
        var doneCntr = 0;
        var launchTimes = [];
        var results = new Array(array.length);

        // calculate num requests in last second
        function calcRequestsInLastSecond() {
            var now = Date.now();
            // look backwards in launchTimes to see how many were launched within the last second
            var cnt = 0;
            for (var i = launchTimes.length - 1; i >= 0; i--) {
                if (now - launchTimes[i] < 1000) {
                    ++cnt;
                } else {
                    break;
                }
            }
            return cnt;            
        }

        function runMore() {
            while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                (function(i) {
                    ++inFlightCntr;
                    launchTimes.push(Date.now());
                    fn(array[i]).then(function(val) {
                        results[i] = val;
                        --inFlightCntr;
                        ++doneCntr;
                        runMore();
                    }, reject);
                })(index);
                ++index;
            }
            // see if we're done
            if (doneCntr === array.length) {
                resolve(results);
            } else if (launchTimes.length >= requestsPerSec) {
                // calc how long we have to wait before sending more
                var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                if (delta >= 0) {
                    setTimeout(runMore, ++delta);
                }

            }
        }
        runMore();
    });
}

用法示例:

rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
    // process array of results here
}, function(err) {
    // process error here
});

此函数的更高级版本称为 rateMap()here on Github .

<小时/>

这段代码背后的总体思路是这样的:

  1. 传入一个数组进行迭代
  2. 它返回一个 Promise,其解析值是一个结果数组(按顺序)
  3. 您传递了最大数量的 requestsPerSec 来满足
  4. 您同时传递的请求数量达到上限
  5. 您传递一个函数,该函数将传递正在迭代的数组中的一个元素,并且必须返回一个 Promise
  6. 它保留上次发送请求时的时间戳数组。
  7. 要查看是否可以发送另一个请求,它会在数组中向后查找并计算上一秒发送的请求数。
  8. 如果该数字低于阈值,则会发送另一个数字。
  9. 如果该数字达到阈值,则会计算您需要等待多长时间才能发送另一个数字,并为该时间设置计时器。
  10. 完成每个请求后,它会检查是否可以发送更多请求
  11. 如果任何请求拒绝其 Promise,则返回的 Promise 会立即拒绝。如果您不希望它在第一次出现错误时停止,请修改传入的函数以不拒绝,而是使用某个值进行解析,稍后在处理结果时可以将该值识别为失败的请求。

这是一个有效的模拟:https://jsfiddle.net/jfriend00/3gr0tq7k/

注意:如果maxInFlight您传入的值高于 requestsPerSec值,那么这个函数基本上只会发送 requestsPerSec 请求,然后一秒钟后,发送另一个 requestsPerSec 请求,因为这是保持在 requestsPerSec 之下的最快方法。边界。如果maxInFlight值等于或低于requestsPerSec然后它会发送requestsPerSec然后当每个请求完成时,它将查看是否可以发送另一个请求。

关于node.js - 选择适当的异步方法来进行最大请求数/秒的批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48258097/

相关文章:

javascript - 使用 async.parallel 时出错

javascript - async each 中的异步任务 : Callback to be run after all task and sub tasks completed

node.js - Node-Webkit 中的 NodeJs Async.waterfall

javascript - 具有更高级别值的复杂嵌套 sequelize 循环查询

node.js - 如何停止AWS Lambda [Nodejs]

javascript - 使用 async.js 系列识别回调数据

javascript - Node.JS async.parallel 不会等到所有任务都完成

javascript - Node JS 与 C++ 集成

javascript - Firebase 的云函数 : How to access an array I have just got from firebase?

javascript - Node js eachOf循环中的mongodb查询