javascript - Node.js 中的批处理请求

标签 javascript node.js

我的程序正在与每秒仅接受约 10 个请求的 Web 服务通信。有时,我的程序会向 Web 服务发送 100 多个并发请求,导致我的程序崩溃。

如何将 Node.js 中的并发请求限制为每秒 5 个?我正在使用请求库。

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };

然后是自制的批处理方法:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};

以及实际请求的示例:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};

最佳答案

使用 async库中,mapLimit 函数完全可以满足您的需求。我无法为您的特定用例提供示例,因为您没有提供任何代码。

自述文件:


mapLimit(arr, limit, 迭代器, 回调)

与 map 相同,只有不超过“limit”个迭代器会同时出现 随时运行。

请注意,项目不是批量处理的,因此不能保证 第一个“限制”迭代器函数将在其他函数完成之前完成 开始了。

参数

  • arr - 要迭代的数组。
  • limit - 任何时候运行的迭代器的最大数量。
  • iterator(item, callback) - 应用于数组中每个项目的函数。 迭代器传递了一个必须调用一次的回调(错误,转换) 它已完成,但有一个错误(可以为 null)和一个已转换的项目。
  • callback(err, results) - 在所有迭代器之后调用的回调 功能已完成,或发生错误。结果是一个数组 从原始数组转换项目。

示例

async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){ //结果现在是每个文件的统计数据数组 });


编辑:既然您提供了代码,我发现您的使用与我假设的有点不同。当您知道要预先运行的所有任务时,async 库会更有用。我不知道有哪个图书馆可以轻松为您解决这个问题。上面的注释可能仍然与搜索该主题的人相关,所以我将其保留。

抱歉,我没有时间重组您的代码,但这是一个(未经测试的)函数示例,该函数发出异步请求,同时 self 节流到每秒 5 个请求。我强烈建议您解决这个问题,以提出适合您的代码库的更通用的解决方案。

var throttledRequest = (function () {
    var queue = [], running = 0;

    function sendPossibleRequests() {
        var url;
        while (queue.length > 0 && running < 5) {
            url = queue.shift();
            running++;
            r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
                running--;
                sendPossibleRequests();

                if(err)
                    console.log(err);
                else {
                    var data = JSON.parse(body);
                    self.emit("data.contact", data);
                }
            });
        }
    }

    return function (url) {
        queue.push(url);
        sendPossibleRequests();
    };
})();

基本上,您保留所有要异步处理的数据(例如要请求的 url)的队列,然后在每次回调(来自请求)之后,您尝试启动尽可能多的剩余请求。

关于javascript - Node.js 中的批处理请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18319823/

相关文章:

node.js - 无法在NodeJS中解决: 'TypeError: Converting circular structure to JSON'

javascript - 图片未出现在应用程序的 "release version"中

javascript - 单击时将嵌套数组/对象值显示到单独的元素中

javascript - 检查里面是否有标签

javascript - Heroku 构建失败 [sh : 1: react-scripts: Permission denied]

node.js - 如何为生产关闭 Node.js Express(ejs 模板引擎)错误?

javascript - 使用node或express返回json格式的正确方法

javascript - URLGOTO {{!VAR2}} 不起作用,有什么想法吗?

javascript - 在动态 Angular 下拉列表中获取选定的选项

node.js - 在转换流中使用 Node.js readline