我的程序正在与每秒仅接受约 10 个请求的 Web 服务通信。有时,我的程序会向 Web 服务发送 100 多个并发请求,导致我的程序崩溃。
如何将 Node.js 中的并发请求限制为每秒 5 个?我正在使用请求库。
// IF EVENT AND SENDER
if(data.sender[0].events && data.sender[0].events.length > 0) {
// FIND ALL EVENTS
for(var i = 0; i < data.sender[0].events.length; i++) {
// IF TYPE IS "ADDED"
if(data.sender[0].events[i].type == "added") {
switch (data.sender[0].events[i].link.rel) {
case "contact" :
batch("added", data.sender[0].events[i].link.href);
//_initContacts(data.sender[0].events[i].link.href);
break;
}
// IF TYPE IS "UPDATED"
} else if(data.sender[0].events[i].type == "updated") {
switch (data.sender[0].events[i].link.rel){
case "contactPresence" :
batch("updated", data.sender[0].events[i].link.href);
//_getContactPresence(data.sender[0].events[i].link.href);
break;
case "contactNote" :
batch("updated", data.sender[0].events[i].link.href);
// _getContactNote(data.sender[0].events[i].link.href);
break;
case "contactLocation" :
batch("updated", data.sender[0].events[i].link.href);
// _getContactLocation(data.sender[0].events[i].link.href);
break;
case "presenceSubscription" :
batch("updated", data.sender[0].events[i].link.href);
// _extendPresenceSubscription(data.sender[0].events[i].link.href);
break;
}
}
};
然后是自制的批处理方法:
var updated = [];
var added = [];
var batch = function(type, url){
console.log("batch called");
if (type === "added"){
console.log("Added batched");
added.push(url);
if (added.length > 5) {
setTimeout(added.forEach(function(req){
_initContacts(req);
}), 2000);
added = [];
}
}
else if (type === "updated"){
console.log("Updated batched");
updated.push(url);
console.log("Updated length is : ", updated.length);
if (updated.length > 5){
console.log("Over 5 updated events");
updated.forEach(function(req){
setTimeout(_getContactLocation(req), 2000);
});
updated = [];
}
}
};
以及实际请求的示例:
var _getContactLocation = function(url){
r.get(baseUrl + url,
{ "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }},
function(err, res, body){
if(err)
console.log(err);
else {
var data = JSON.parse(body);
self.emit("data.contact", data);
}
}
);
};
最佳答案
使用 async库中,mapLimit
函数完全可以满足您的需求。我无法为您的特定用例提供示例,因为您没有提供任何代码。
自述文件:
mapLimit(arr, limit, 迭代器, 回调)
与 map 相同,只有不超过“limit”个迭代器会同时出现 随时运行。
请注意,项目不是批量处理的,因此不能保证 第一个“限制”迭代器函数将在其他函数完成之前完成 开始了。
参数
- arr - 要迭代的数组。
- limit - 任何时候运行的迭代器的最大数量。
- iterator(item, callback) - 应用于数组中每个项目的函数。 迭代器传递了一个必须调用一次的回调(错误,转换) 它已完成,但有一个错误(可以为 null)和一个已转换的项目。
- callback(err, results) - 在所有迭代器之后调用的回调 功能已完成,或发生错误。结果是一个数组 从原始数组转换项目。
示例
async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){
//结果现在是每个文件的统计数据数组
});
编辑:既然您提供了代码,我发现您的使用与我假设的有点不同。当您知道要预先运行的所有任务时,async
库会更有用。我不知道有哪个图书馆可以轻松为您解决这个问题。上面的注释可能仍然与搜索该主题的人相关,所以我将其保留。
抱歉,我没有时间重组您的代码,但这是一个(未经测试的)函数示例,该函数发出异步请求,同时 self 节流到每秒 5 个请求。我强烈建议您解决这个问题,以提出适合您的代码库的更通用的解决方案。
var throttledRequest = (function () {
var queue = [], running = 0;
function sendPossibleRequests() {
var url;
while (queue.length > 0 && running < 5) {
url = queue.shift();
running++;
r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
running--;
sendPossibleRequests();
if(err)
console.log(err);
else {
var data = JSON.parse(body);
self.emit("data.contact", data);
}
});
}
}
return function (url) {
queue.push(url);
sendPossibleRequests();
};
})();
基本上,您保留所有要异步处理的数据(例如要请求的 url)的队列,然后在每次回调(来自请求)之后,您尝试启动尽可能多的剩余请求。
关于javascript - Node.js 中的批处理请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18319823/