我目前正在使用以下函数根据调用 request.get
的结果创建一个 Promise
:
function dlPromiseForMeta(meta) {
return new Promise(function (resolve, reject) {
meta.error = false;
var fileStream = fs.createWriteStream(meta.filePath);
fileStream.on('error', function (error) {
meta.error = true;
console.log('filesystem ' + meta.localFileName + ' ERROR: ' + error);
console.log('record: ' + JSON.stringify(meta));
reject(meta);
});
fileStream.on('close', function () {
resolve(meta);
});
request.get({
uri: meta.url,
rejectUnauthorized: false,
followAllRedirects: true,
pool: {
maxSockets: 1000
},
timeout: 10000,
agent: false
})
.on('socket', function () {
console.log('request ' + meta.localFileName + ' made');
})
.on('error', function (error) {
meta.error = true;
console.log('request ' + meta.localFileName + ' ERROR: ' + error);
console.log('record: ' + JSON.stringify(meta));
reject(meta);
})
.on('end', function () {
console.log('request ' + meta.localFileName + ' finished');
fileStream.close();
})
.pipe(fileStream);
});
}
除非我尝试调用它太多次,否则效果很好,如下例所示,其中 imagesForKeywords
返回一个 rxjs
Observable
:
imagesForKeywords(keywords, numberOfResults)
.mergeMap(function (meta) {
meta.fileName = path.basename(url.parse(meta.url).pathname);
meta.localFileName = timestamp + '_' + count++ + '_' + meta.keyword + '_' + meta.source + path.extname(meta.fileName);
meta.filePath = path.join(imagesFolder, meta.localFileName);
return rxjs.Observable.fromPromise(dlPromiseForMeta(meta))(meta);
});
当源可观察值变得足够大时,我开始收到 ESOCKETTIMEDOUT
错误。
所以我想做的是以某种方式对 mergeMap
中发生的每一个,比如说,100
条目进行批处理...所以我并行执行这 100 个条目,每个批处理串行执行,然后在最后合并它们。
如何使用 rxjs
完成此任务?
最佳答案
我认为最简单的方法是使用 bufferTime() ,它会在一定的毫秒数后触发,但最后还有一个用于计数的参数。
使用超时似乎很有用,以防存在流模式在合理的时间内未达到批量限制的情况。
如果这不适合您的用例,请在评论中提供更多详细信息,我将进行相应调整。
您的代码将如下所示,
- bufferTime如上所述
- forkjoin - 并行运行缓冲区内容并在全部返回时发出
- mergeMap - 合并结果
imagesForKeywords(keywords, numberOfResults)
.mergeMap(function (meta) {
meta.fileName = path.basename(url.parse(meta.url).pathname);
meta.localFileName = timestamp + '_' + count++ + '_' + meta.keyword + '_' + meta.source + path.extname(meta.fileName);
meta.filePath = path.join(imagesFolder, meta.localFileName);
return meta;
})
.bufferTime(maxTimeout, null, maxBatch)
.mergeMap(items => rxjs.Observable.forkJoin(items.map(dlPromiseForMeta)))
.mergeMap(arr => rxjs.Observable.from(arr))
<小时/>
这是一个可运行的模型来展示它的工作原理。已注释掉最后一个 mergeMap
以显示缓冲。
我假设了几件事,
- imagesForKeywords 将关键字分解为可观察的关键字流
- 每次 dlPromiseForMeta 调用有一个关键字
// Some mocking
const imagesForKeywords = (keywords, numberOfResults) => {
return Rx.Observable.from(keywords.map(keyword => { return {keyword} }))
}
const dlPromiseForMeta = (meta) => {
return Promise.resolve(meta.keyword + '_image')
}
// Compose meta - looks like it can run at scale, since is just string manipulations.
const composeMeta = meta => {
// meta.fileName = path.basename(url.parse(meta.url).pathname);
// meta.localFileName = timestamp + '_' + count++ + '_' + meta.keyword + '_' + meta.source + path.extname(meta.fileName);
// meta.filePath = path.join(imagesFolder, meta.localFileName);
return meta;
}
const maxBatch = 3
const maxTimeout = 50 //ms
const bufferedPromises = (keywords, numberOfResults) =>
imagesForKeywords(keywords, numberOfResults)
.map(composeMeta)
.bufferTime(maxTimeout, null, maxBatch)
.mergeMap(items => Rx.Observable.forkJoin(items.map(dlPromiseForMeta)))
//.mergeMap(arr => Rx.Observable.from(arr))
const keywords = ['keyw1', 'keyw2', 'keyw3', 'keyw4', 'keyw5', 'keyw6', 'keyw7'];
const numberOfResults = 1;
bufferedPromises(keywords, numberOfResults)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
关于node.js - 使用 rxjs 通过 nodejs request.get 发出批量请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48216712/