我有一个用 Nodejs 编写的系统,它首先必须将非常大的 csv 文件中的记录导入到数据库中。使用Sequelize作为我的 ORM,我创建了一个简单的模型,如下所示:
"use strict";
const Sequelize = require('sequelize');
const sequelize = new Sequelize('mm', 'root', 'password', {
host: 'localhost',
dialect: 'mysql',
logging: true,
pool: {max: 5, min: 0, idle: 100000},
});
const Index = sequelize.define('index', {
value: {type: Sequelize.FLOAT}
});
然后,我编写了以下代码来循环文件中的行,解释这些行,并将它们写入数据库:
let readline = require('readline');
let moment = require('moment');
let lineReader = readline.createInterface({
input: require('fs').createReadStream('files/price_index.csv')
});
lineReader.on('line', function (line) {
let splitted = line.split(',');
let dt = moment(parseInt(splitted[0]));
let value = parseFloat(splitted[1]);
console.log(dt.format(), value);
Index.create({value: value, createdAt: dt});
});
这工作正常,但每 3120 条记录后会暂停大约 3 秒。我尝试了sqlite和mysql,但它总是在恰好3120条记录后暂停。
看到 Sequelize 也在这 3120 条记录之后开始记录插入查询,我推测这种行为的原因是某种缓存机制,它将所有查询放入队列中,直到它没有任何事情可做,或者如果它达到了神奇的查询缓存限制,即 3120 条记录。
我尝试在 Sequelize 的初始化中增加 pool.max
数量,但这似乎没有任何区别。
任何人都可以确认我的缓存想法,或者向我解释这种行为的真正原因是什么吗?我可以以某种方式改变这种行为,使其具有一致的吞吐量吗?欢迎所有提示!
最佳答案
我认为 3120 行将是 the high water mark for the createReadStream
buffer which is 64KiB 。当缓冲区已满时, Node 将停止读取。
看起来 3120 line
事件都在同一个 Node 事件刻度上运行,因此您会处理 3120 行,并为下一个刻度安排 3120 个异步 Index.create
调用。因此,最终双方都会进行大量的处理。要么读取并调度查询,要么处理大量的调度查询。
当 3120 line
事件函数完成时,会发生一些垃圾收集,并且计划的 3120 Sequelize create
调用有机会执行其操作。这是数据中的“暂停”,但 Node 仍在处理。所有 create
调用都需要几秒钟才能完成,然后进行更多垃圾收集并返回到下一个 csv 数据 block 及其所有 line
事件。这个过程就这样来回进行。
在包含 10000 行的 csv 文件中,我看到大约 3 个查询能够在读取所有 10000 行 csv 数据并计划插入之前运行。
一致的吞吐量
您可能想使用 Readable Stream与较小的 block 。然后根据后续插入完成来阻止读取。您可能需要自己进行线路处理,而不是使用 readline
。如果 csv 文件适合内存,只需将整个文件读入即可,因为安排会更容易。
也许使用类似queue
的东西管理插入,允许最大并发池 max
作为并发
。然后,一旦队列的长度足够低,就允许再次进行读取。
我不知道最终结果是否会更快,但最终结果可能非常相似。
关于mysql - 为什么 Sequelize 在 3120 条记录后暂停?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47052410/