node.js - 处理输入文件后让 Node.js 与数据库断开连接的正确调用顺序

标签 node.js

创建一个非常简单的 Node.js 实用程序来单独处理文本文件中的每条记录(逐行),但由于 Node 固有的异步世界,处理以下场景非常困难:

  1. 打开与数据库的连接
  2. 读取文本文件的每一行
  3. 根据该行已处理文本中的条件,在数据库中查找记录
  4. 读完文本文件后,关闭 数据库连接

我面临的挑战是逐行读取文本文件(使用“readline”模块),将监听器附加到模块发出的“line”事件。文件的所有行都会被快速处理,并且对数据库的查询会排队。我尝试了很多方法来本质上创建同步进程,但都无济于事。这是我的最新尝试,其中肯定充满了异步/等待函数。作为一名长期开发人员但对 Node.js 很陌生,我知道我错过了一些简单的东西。任何指导将不胜感激。

const { Pool, Client } = require('pg')

const client = new Client({
  user: '*****',
  host: '****',
  database: '*****',
  password: '******#',
  port: 5432,
})


client.connect()
  .then(() => {

    console.log("Connected");

    console.log("Processing file");

    const fs = require('fs');
    const readline = require('readline');
    const instream = fs.createReadStream("input.txt");
    const outstream = new (require('stream'))();
    const rl = readline.createInterface(instream, outstream);

    rl.on('line', async function (line) {

        var callResult;

        if (line.length > 0) {

            var words = line.replace(/[^0-9a-z ]/gi, '').split(" ");
            var len = words.length;

            for (var i = 0; i < words.length; i++) {
                if (words[i].length === 0) {         
                  words.splice(i, 1);
                  i--;
                } else {
                    words[i] = words[i].toLowerCase();  
                }
              }

            for (var i = 0; i < words.length; i++) {

                if (i <= words.length - 3) {

                    callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim() + " " + words[i + 2].trim());

                    if (!callResult) {

                        callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());

                        if (!callResult) {

                            callResult = await isKeyPhrase(words[i].trim());
                        } 
                    };

                } else if (i <= words.length - 2) {

                    callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());

                    if (!callResult ) {

                        callResult = await isKeyPhrase(words[i].trim());

                    };

                } else if (i < words.length) {

                    callResult = await isKeyPhrase(words[i].trim());
                }
            } 

        }       // (line.length > 0)        

    });

    rl.on('close', function (line) {
        console.log('done reading file.');

        // stubbed out because queries are still running
        //client.end();

    });


  }).catch( (err) => {
    console.error('connection error', err.stack);
});

async function isKeyPhrase(keyPhraseText) {

    var callResult = false;

    return new Promise(async function(resolve, reject) {

        const query = {
          name: 'get-name',
          text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
          values: [keyPhraseText],
          rowMode: 'array'
        }

        // promise
        await client.query(query)
          .then(result => {

            if (result.rowCount == 1) {

                console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);

                calResult = true;

            } 

          }).catch(e => {

            console.error(e.stack)
            console.log(e.stack);
            reject(e);

        });

        resolve(callResult);

    });

}

最佳答案

欢迎来到 StackOverflow。 :)

事实上,在尝试与数据库交互每行数据时,没有(明智的)方法来同步读取文件。如果文件大于内存的 1/8,则没有可行的方法。

但这并不意味着没有办法或为此编写合理的代码。唯一的问题是标准 Node 流(包括 readline)不等待异步代码。

我建议使用 scramjet ,一个函数式流编程框架,几乎是为您的用例而设计的(免责声明:我是作者)。代码如下所示:

const { Pool, Client } = require('pg')
const { StringStream } = require("scramjet");

const client = new Client({
    user: '*****',
    host: '****',
    database: '*****',
    password: '******#',
    port: 5432,
})

client.connect()
    .then(async () => {
        console.log("Connected, processing file");


        return StringStream
            // this creates a "scramjet" stream from input.
            .from(fs.createReadStream("input.txt"))
            // this splits fs line by line
            .lines()
            // the next line is just to show when the file is fully read
            .use(stream => stream.whenEnd.then(() => console.log("done reading file.")))
            // this splits the words like the first "for" loop in your code
            .map(line => line.toLowerCase().replace(/[^0-9a-z ]+/g, '').split(" "))
            // this one gets rid of empty lines (i.e. no words)
            .filter(line => line.length > 0)
            // this splits the words like the first "for" loop in your code
            .map(async words => {
                for (var i = 0; i < words.length; i++) {
                    const callResult = await isKeyPhrase(words.slice(i, i + 3).join(" "));
                    if (callResult) return callResult;
                }
            })
            // this runs the above list of operations to the end and returns a promise.
            .run();
    })
    .then(() => {
        console.log("done processing file.");
        client.end();
    })
    .catch((e) => {
        console.error(e.stack);
    });


async function isKeyPhrase(keyPhraseText) {

    const query = {
        name: 'get-name',
        text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
        values: [keyPhraseText],
        rowMode: 'array'
    };

    const result = await client.query(query);

    if (result.rowCount > 0) {
        console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
        return true;
    }

    return false;
}

我在某些地方压缩和优化了您的代码,但总的来说,这应该可以满足您的需求 - scramjet 为每个操作添加异步模式,并将等待所有操作结束。

关于node.js - 处理输入文件后让 Node.js 与数据库断开连接的正确调用顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52609647/

相关文章:

javascript - Node.js 同时导出路由和函数

javascript - Node.js - 下载并本地保存 PDF 文件以供离线使用 - 文件损坏

node.js - Selenium Webdriver JS 并行抓取 [nodejs]

css - 通过 Node 构建工具删除动态站点中未使用的 CSS

javascript - 完全合并两个多维数组并生成一个新数组

node.js - 如何在 sequelize 中使用非列变量

node.js - 在 Firebase 中插入 JSON 显示为字符串而不是显示为树

node.js - GatsbyJS/ReactJS 未正确代理请求

javascript - 可以在 meteor 应用程序中使用 nodejs 包吗?

javascript - 正确处理 promise 拒绝