我在使用 NodeJS 流时遇到问题。我遇到的问题是在我的一个流中,我使用请求模块(异步)发送请求,当请求得到响应时,整个流就完成了。
这是代码
var fs = require('fs');
var csv = require('fast-csv');
var path = require('path');
var request = require('request');
var JSONStream = require('JSONStream');
var locations = [];
var urls = [];
var MAP_QUEST_KEY = 'mykey';
var MAP_QUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?key=' + MAP_QUEST_KEY + '&inFormat=json&json=';
var CSV_FILE_NAME = 'smaller_stores.csv';
var inFile = path.join(__dirname, 'input', CSV_FILE_NAME);
var outFile = path.join(__dirname, 'output', CSV_FILE_NAME);
// create a location object
var createLocationJSONStream = function (data) {
var location = {
street: data.street_no + ' ' + data.street_name,
city: data.city_name,
state: data.state_id
};
var url = MAP_QUEST_URL + JSON.stringify(location);
// this request finishes after stream
request({
url: 'url',
method: 'GET',
json: location
}, function (error, response, body) {
if (error) {
console.log(error);
} else {
data.lat = body.results[0].locations[0].latLng.lat;
data.lng = body.results[0].locations[0].latLng.lng;
}
return data;
});
};
var readFileStream = fs.createReadStream(inFile);
var writeFileStream = fs.createWriteStream(outFile);
var parseCsvStream = csv.parse({
trim: true,
headers: true,
objectMode: false
});
var deserializeJSONStream = JSONStream.parse();
var streamingEvent = readFileStream.pipe(parseCsvStream).transform(createLocationJSONStream).pipe(deserializeJSONStream);
// eventually want to write out to csv
//.pipe(csv.createWriteStream({headers: true}))
//.pipe(fs.createWriteStream(outFile, {encoding: 'utf8'}));
streamingEvent.on('data', function (data) {
console.log(JSON.stringify(data));
});
streamingEvent.on('end', function () {
console.log('end');
});
这是输出:
end
200
application/json; charset=utf-8
200
application/json; charset=utf-8
200
application/json; charset=utf-8
这里是 csv 示例:
region_id region_name street_no street_name sitetype_id country_name timezone_no geocode_id city_name state_id zip_code county_name
3 West 350 ORCHARD AVE N USA 4 954824536 UKIAH CA 95482-4536 MENDOCINO
1 South 1000 4TH AVE N USA 7 333041903 FORT LAUDERDALE FL 33304-1903 BROWARD
0 Northeast 1370 HURFVILLE RD N USA 7 80963818 DEPTFORD NJ 08096-3818 GLOUCESTER
我知道我处理流的方式是错误的,但不知道修复它的最简单方法是什么?通过管道发送请求?
最佳答案
如果您希望异步 request()
操作在启动流代码之前完成,那么您需要将流代码放入请求的完成回调中。这样,您就不会在收到 request()
操作的数据后启动流代码。
此外,您无法像尝试return data;
那样从异步操作中返回值。从回调返回的数据只是返回到回调基础设施中。外部函数早已返回,并且外部函数之后的后续代码行已被执行。相反,您必须在回调本身内使用结果或从该回调内进行函数调用并将数据传递给该其他函数。
我不完全遵循您尝试对流执行的操作,但如果您想确保在执行操作之前完成 request()
操作,那么它们将执行以下操作:流:
var fs = require('fs');
var csv = require('fast-csv');
var path = require('path');
var request = require('request');
var JSONStream = require('JSONStream');
var locations = [];
var urls = [];
var MAP_QUEST_KEY = 'mykey';
var MAP_QUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?key=' + MAP_QUEST_KEY + '&inFormat=json&json=';
var CSV_FILE_NAME = 'smaller_stores.csv';
var inFile = path.join(__dirname, 'input', CSV_FILE_NAME);
var outFile = path.join(__dirname, 'output', CSV_FILE_NAME);
// create a location object
var createLocationJSONStream = function (data) {
var location = {
street: data.street_no + ' ' + data.street_name,
city: data.city_name,
state: data.state_id
};
var url = MAP_QUEST_URL + JSON.stringify(location);
// this request finishes after stream
request({
url: 'url',
method: 'GET',
json: location
}, function (error, response, body) {
if (error) {
console.log(error);
} else {
// get the latLng position so we can use it in our streaming
var position = body.results[0].locations[0].latLng;
var readFileStream = fs.createReadStream(inFile);
var writeFileStream = fs.createWriteStream(outFile);
var parseCsvStream = csv.parse({
trim: true,
headers: true,
objectMode: false
});
var deserializeJSONStream = JSONStream.parse();
var streamingEvent = readFileStream.pipe(parseCsvStream).transform(createLocationJSONStream).pipe(deserializeJSONStream);
// eventually want to write out to csv
//.pipe(csv.createWriteStream({headers: true}))
//.pipe(fs.createWriteStream(outFile, {encoding: 'utf8'}));
streamingEvent.on('data', function (data) {
console.log(JSON.stringify(data));
});
streamingEvent.on('end', function () {
console.log('end');
});
}
});
};
现在,latLng 坐标可在流代码中任意位置的 position
变量中使用,并且在数据准备就绪之前,流操作不会启动。
仅供引用,我发现如果您使用 Promise 而不是普通的回调,那么任何涉及多个异步操作的代码最终都更容易编写、理解和执行强大的错误处理。这需要了解 Promise 的工作原理,并且通常涉及“ promise ”某些操作(将常规异步操作转换为返回 Promise 的操作)。因此,从长远来看,我建议你开始学习 Promise。我使用 Bluebird Promise 库进行 Node.js 开发,因为它有很多有用的功能。
关于javascript - 流结束后调用异步请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32142452/