javascript - 流结束后调用异步请求

标签 javascript node.js csv request

我在使用 NodeJS 流时遇到问题。我遇到的问题是在我的一个流中,我使用请求模块(异步)发送请求,当请求得到响应时,整个流就完成了。

这是代码

var fs = require('fs');
var csv = require('fast-csv');
var path = require('path');
var request = require('request');
var JSONStream = require('JSONStream');
var locations = [];
var urls = [];
var MAP_QUEST_KEY = 'mykey';
var MAP_QUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?key=' + MAP_QUEST_KEY + '&inFormat=json&json=';
var CSV_FILE_NAME = 'smaller_stores.csv';
var inFile = path.join(__dirname, 'input', CSV_FILE_NAME);
var outFile = path.join(__dirname, 'output', CSV_FILE_NAME);
// create a location object
var createLocationJSONStream = function (data) {
    var location = {
        street: data.street_no + ' ' + data.street_name,
        city: data.city_name,
        state: data.state_id
    };
    var url = MAP_QUEST_URL + JSON.stringify(location);
    // this request finishes after stream
    request({
        url: 'url',
        method: 'GET',
        json: location
    }, function (error, response, body) {
        if (error) {
            console.log(error);
        } else {
            data.lat = body.results[0].locations[0].latLng.lat;
            data.lng = body.results[0].locations[0].latLng.lng;
        }
        return data;
    });
};
var readFileStream = fs.createReadStream(inFile);
var writeFileStream = fs.createWriteStream(outFile);
var parseCsvStream = csv.parse({
    trim: true,
    headers: true,
    objectMode: false
});
var deserializeJSONStream = JSONStream.parse();
var streamingEvent = readFileStream.pipe(parseCsvStream).transform(createLocationJSONStream).pipe(deserializeJSONStream);
// eventually want to write out to csv
//.pipe(csv.createWriteStream({headers: true}))
//.pipe(fs.createWriteStream(outFile, {encoding: 'utf8'}));
streamingEvent.on('data', function (data) {
    console.log(JSON.stringify(data));
});
streamingEvent.on('end', function () {
    console.log('end');
});

这是输出:

end
200
application/json; charset=utf-8
200
application/json; charset=utf-8
200
application/json; charset=utf-8

这里是 csv 示例:

 region_id    region_name     street_no   street_name                sitetype_id      country_name    timezone_no     geocode_id      city_name               state_id    zip_code      county_name
3             West  350         ORCHARD AVE              N              USA 4   954824536      UKIAH                      CA            95482-4536    MENDOCINO
1             South 1000           4TH AVE                   N              USA 7   333041903      FORT LAUDERDALE          FL          33304-1903    BROWARD
0             Northeast 1370           HURFVILLE RD                N                USA 7   80963818       DEPTFORD                NJ           08096-3818    GLOUCESTER

我知道我处理流的方式是错误的,但不知道修复它的最简单方法是什么?通过管道发送请求?

最佳答案

如果您希望异步 request() 操作在启动流代码之前完成,那么您需要将流代码放入请求的完成回调中。这样,您就不会在收到 request() 操作的数据后启动流代码。

此外,您无法像尝试return data;那样从异步操作中返回值。从回调返回的数据只是返回到回调基础设施中。外部函数早已返回,并且外部函数之后的后续代码行已被执行。相反,您必须在回调本身内使用结果或从该回调内进行函数调用并将数据传递给该其他函数。

我不完全遵循您尝试对流执行的操作,但如果您想确保在执行操作之前完成 request() 操作,那么它们将执行以下操作:流:

var fs = require('fs');
var csv = require('fast-csv');
var path = require('path');
var request = require('request');
var JSONStream = require('JSONStream');
var locations = [];
var urls = [];
var MAP_QUEST_KEY = 'mykey';
var MAP_QUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?key=' + MAP_QUEST_KEY + '&inFormat=json&json=';
var CSV_FILE_NAME = 'smaller_stores.csv';
var inFile = path.join(__dirname, 'input', CSV_FILE_NAME);
var outFile = path.join(__dirname, 'output', CSV_FILE_NAME);
// create a location object
var createLocationJSONStream = function (data) {
    var location = {
        street: data.street_no + ' ' + data.street_name,
        city: data.city_name,
        state: data.state_id
    };
    var url = MAP_QUEST_URL + JSON.stringify(location);
    // this request finishes after stream
    request({
        url: 'url',
        method: 'GET',
        json: location
    }, function (error, response, body) {
        if (error) {
            console.log(error);
        } else {
            // get the latLng position so we can use it in our streaming
            var position = body.results[0].locations[0].latLng;

            var readFileStream = fs.createReadStream(inFile);
            var writeFileStream = fs.createWriteStream(outFile);
            var parseCsvStream = csv.parse({
                trim: true,
                headers: true,
                objectMode: false
            });
            var deserializeJSONStream = JSONStream.parse();
            var streamingEvent = readFileStream.pipe(parseCsvStream).transform(createLocationJSONStream).pipe(deserializeJSONStream);
            // eventually want to write out to csv
            //.pipe(csv.createWriteStream({headers: true}))
            //.pipe(fs.createWriteStream(outFile, {encoding: 'utf8'}));
            streamingEvent.on('data', function (data) {
                console.log(JSON.stringify(data));
            });
            streamingEvent.on('end', function () {
                console.log('end');
            });            
        }
    });
};

现在,latLng 坐标可在流代码中任意位置的 position 变量中使用,并且在数据准备就绪之前,流操作不会启动。

仅供引用,我发现如果您使用 Promise 而不是普通的回调,那么任何涉及多个异步操作的代码最终都更容易编写、理解和执行强大的错误处理。这需要了解 Promise 的工作原理,并且通常涉及“ promise ”某些操作(将常规异步操作转换为返回 Promise 的操作)。因此,从长远来看,我建议你开始学习 Promise。我使用 Bluebird Promise 库进行 Node.js 开发,因为它有很多有用的功能。

关于javascript - 流结束后调用异步请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32142452/

相关文章:

ruby CSV重复行解析

javascript - 为什么分配给属性会导致错误 "cannot set property of null"

node.js - 在Electron中运行nodegit失败

linux - Bash - 带分隔符的单行回显

python - 使用 python 将 csv 转换为 xls

node.js - 使用nodejs进行依赖注入(inject): some ideas?

javascript - 如何使用 jQuery 对选定的 html 选择元素进行计数

javascript - 多次声明变量

javascript - 如何在 node.js 中使用 sqlite3 - 什么是 smt?

javascript - 如何迭代多个参数数组