创建一个非常简单的 Node.js 实用程序来单独处理文本文件中的每条记录(逐行),但由于 Node 固有的异步世界,处理以下场景非常困难:
- 打开与数据库的连接
- 读取文本文件的每一行
- 根据该行已处理文本中的条件,在数据库中查找记录
- 读完文本文件后,关闭 数据库连接
我面临的挑战是逐行读取文本文件(使用“readline”模块),将监听器附加到模块发出的“line”事件。文件的所有行都会被快速处理,并且对数据库的查询会排队。我尝试了很多方法来本质上创建同步进程,但都无济于事。这是我的最新尝试,其中肯定充满了异步/等待函数。作为一名长期开发人员但对 Node.js 很陌生,我知道我错过了一些简单的东西。任何指导将不胜感激。
const { Pool, Client } = require('pg')
const client = new Client({
user: '*****',
host: '****',
database: '*****',
password: '******#',
port: 5432,
})
client.connect()
.then(() => {
console.log("Connected");
console.log("Processing file");
const fs = require('fs');
const readline = require('readline');
const instream = fs.createReadStream("input.txt");
const outstream = new (require('stream'))();
const rl = readline.createInterface(instream, outstream);
rl.on('line', async function (line) {
var callResult;
if (line.length > 0) {
var words = line.replace(/[^0-9a-z ]/gi, '').split(" ");
var len = words.length;
for (var i = 0; i < words.length; i++) {
if (words[i].length === 0) {
words.splice(i, 1);
i--;
} else {
words[i] = words[i].toLowerCase();
}
}
for (var i = 0; i < words.length; i++) {
if (i <= words.length - 3) {
callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim() + " " + words[i + 2].trim());
if (!callResult) {
callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());
if (!callResult) {
callResult = await isKeyPhrase(words[i].trim());
}
};
} else if (i <= words.length - 2) {
callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());
if (!callResult ) {
callResult = await isKeyPhrase(words[i].trim());
};
} else if (i < words.length) {
callResult = await isKeyPhrase(words[i].trim());
}
}
} // (line.length > 0)
});
rl.on('close', function (line) {
console.log('done reading file.');
// stubbed out because queries are still running
//client.end();
});
}).catch( (err) => {
console.error('connection error', err.stack);
});
async function isKeyPhrase(keyPhraseText) {
var callResult = false;
return new Promise(async function(resolve, reject) {
const query = {
name: 'get-name',
text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
values: [keyPhraseText],
rowMode: 'array'
}
// promise
await client.query(query)
.then(result => {
if (result.rowCount == 1) {
console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
calResult = true;
}
}).catch(e => {
console.error(e.stack)
console.log(e.stack);
reject(e);
});
resolve(callResult);
});
}
最佳答案
欢迎来到 StackOverflow。 :)
事实上,在尝试与数据库交互每行数据时,没有(明智的)方法来同步读取文件。如果文件大于内存的 1/8,则没有可行的方法。
但这并不意味着没有办法或为此编写合理的代码。唯一的问题是标准 Node 流(包括 readline)不等待异步代码。
我建议使用 scramjet
,一个函数式流编程框架,几乎是为您的用例而设计的(免责声明:我是作者)。代码如下所示:
const { Pool, Client } = require('pg')
const { StringStream } = require("scramjet");
const client = new Client({
user: '*****',
host: '****',
database: '*****',
password: '******#',
port: 5432,
})
client.connect()
.then(async () => {
console.log("Connected, processing file");
return StringStream
// this creates a "scramjet" stream from input.
.from(fs.createReadStream("input.txt"))
// this splits fs line by line
.lines()
// the next line is just to show when the file is fully read
.use(stream => stream.whenEnd.then(() => console.log("done reading file.")))
// this splits the words like the first "for" loop in your code
.map(line => line.toLowerCase().replace(/[^0-9a-z ]+/g, '').split(" "))
// this one gets rid of empty lines (i.e. no words)
.filter(line => line.length > 0)
// this splits the words like the first "for" loop in your code
.map(async words => {
for (var i = 0; i < words.length; i++) {
const callResult = await isKeyPhrase(words.slice(i, i + 3).join(" "));
if (callResult) return callResult;
}
})
// this runs the above list of operations to the end and returns a promise.
.run();
})
.then(() => {
console.log("done processing file.");
client.end();
})
.catch((e) => {
console.error(e.stack);
});
async function isKeyPhrase(keyPhraseText) {
const query = {
name: 'get-name',
text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
values: [keyPhraseText],
rowMode: 'array'
};
const result = await client.query(query);
if (result.rowCount > 0) {
console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
return true;
}
return false;
}
我在某些地方压缩和优化了您的代码,但总的来说,这应该可以满足您的需求 - scramjet
为每个操作添加异步模式,并将等待所有操作结束。
关于node.js - 处理输入文件后让 Node.js 与数据库断开连接的正确调用顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52609647/