node.js - 使用 rxjs 通过 nodejs request.get 发出批量请求

标签 node.js request rxjs

我目前正在使用以下函数根据调用 request.get 的结果创建一个 Promise:

function dlPromiseForMeta(meta) {
    return new Promise(function (resolve, reject) {

        meta.error = false;

        var fileStream = fs.createWriteStream(meta.filePath);

        fileStream.on('error', function (error) {
            meta.error = true;
            console.log('filesystem ' + meta.localFileName + ' ERROR: ' + error);
            console.log('record: ' + JSON.stringify(meta));
            reject(meta);
        });

        fileStream.on('close', function () {
            resolve(meta);
        });

        request.get({
            uri: meta.url,
            rejectUnauthorized: false,
            followAllRedirects: true,
            pool: {
                maxSockets: 1000
            },
            timeout: 10000,
            agent: false
        })
            .on('socket', function () {
                console.log('request ' + meta.localFileName + ' made');
            })
            .on('error', function (error) {
                meta.error = true;
                console.log('request ' + meta.localFileName + ' ERROR: ' + error);
                console.log('record: ' + JSON.stringify(meta));
                reject(meta);
            })
            .on('end', function () {
                console.log('request ' + meta.localFileName + ' finished');
                fileStream.close();
            })
            .pipe(fileStream);
    });
}

除非我尝试调用它太多次,否则效果很好,如下例所示,其中 imagesForKeywords 返回一个 rxjs Observable:

imagesForKeywords(keywords, numberOfResults)
    .mergeMap(function (meta) {
        meta.fileName = path.basename(url.parse(meta.url).pathname);
        meta.localFileName = timestamp + '_' + count++ + '_' + meta.keyword + '_' + meta.source + path.extname(meta.fileName);
        meta.filePath = path.join(imagesFolder, meta.localFileName);

        return rxjs.Observable.fromPromise(dlPromiseForMeta(meta))(meta);
    });

当源可观察值变得足够大时,我开始收到 ESOCKETTIMEDOUT 错误。

所以我想做的是以某种方式对 mergeMap 中发生的每一个,比如说,100 条目进行批处理...所以我并行执行这 100 个条目,每个批处理串行执行,然后在最后合并它们。

如何使用 rxjs 完成此任务?

最佳答案

我认为最简单的方法是使用 bufferTime() ,它会在一定的毫秒数后触发,但最后还有一个用于计数的参数。

使用超时似乎很有用,以防存在流模式在合理的时间内未达到批量限制的情况。

如果这不适合您的用例,请在评论中提供更多详细信息,我将进行相应调整。

您的代码将如下所示,

  • bufferTime如上所述
  • forkjoin - 并行运行缓冲区内容并在全部返回时发出
  • mergeMap - 合并结果
imagesForKeywords(keywords, numberOfResults)
  .mergeMap(function (meta) {
    meta.fileName = path.basename(url.parse(meta.url).pathname);
    meta.localFileName = timestamp + '_' + count++ + '_' + meta.keyword + '_' + meta.source + path.extname(meta.fileName);
    meta.filePath = path.join(imagesFolder, meta.localFileName);
    return meta;
  })
  .bufferTime(maxTimeout, null, maxBatch)
  .mergeMap(items => rxjs.Observable.forkJoin(items.map(dlPromiseForMeta)))
  .mergeMap(arr => rxjs.Observable.from(arr))
<小时/>

这是一个可运行的模型来展示它的工作原理。已注释掉最后一个 mergeMap 以显示缓冲。

我假设了几件事,

  • imagesForKeywords 将关键字分解为可观察的关键字流
  • 每次 dlPromiseForMeta 调用有一个关键字

// Some mocking
const imagesForKeywords = (keywords, numberOfResults) => {
  return Rx.Observable.from(keywords.map(keyword => { return {keyword} }))
}
const dlPromiseForMeta = (meta) => {
  return Promise.resolve(meta.keyword + '_image')
}

// Compose meta - looks like it can run at scale, since is just string manipulations.
const composeMeta = meta => {
  // meta.fileName = path.basename(url.parse(meta.url).pathname);
  // meta.localFileName = timestamp + '_' + count++ + '_' + meta.keyword + '_' + meta.source + path.extname(meta.fileName);
  // meta.filePath = path.join(imagesFolder, meta.localFileName);
  return meta;
}

const maxBatch = 3
const maxTimeout = 50 //ms
const bufferedPromises = (keywords, numberOfResults) =>
  imagesForKeywords(keywords, numberOfResults)
    .map(composeMeta)
    .bufferTime(maxTimeout, null, maxBatch)
    .mergeMap(items => Rx.Observable.forkJoin(items.map(dlPromiseForMeta)))
    //.mergeMap(arr => Rx.Observable.from(arr))

const keywords = ['keyw1', 'keyw2', 'keyw3', 'keyw4', 'keyw5', 'keyw6', 'keyw7'];
const numberOfResults = 1;
bufferedPromises(keywords, numberOfResults)
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

关于node.js - 使用 rxjs 通过 nodejs request.get 发出批量请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48216712/

相关文章:

javascript - 中间件代码没有被调用

javascript - Nodejs 请求返回行为不当

angular - 从服务类进行嵌套 HTTP 调用并返回 Observable

angular - RXJS:跳过由其他可观察对象触发的可观察对象的值

javascript - 如何在 Node.js 中正确构建类

javascript - 如果我在 `module.exports` 上定义模块,那么 `global` 有什么好处?

javascript - 有没有更好的方法来处理 express 中的错误?

html - 新行字符的提交方式不同

parameters - 用于查询的 iOS RestKit 参数

angular - 在响应中获取状态零而不是 401