我正在尝试将数据从大型 csv 文件流式传输到 readline 中。我尝试将 readStream 从 s3 通过管道传输到 readline 输入,但是我遇到了一个错误,S3 只允许连接在一定时间内保持打开状态。
我正在从 s3 创建流,如下所示:
import * as AWS from 'aws-sdk';
import {s3Env} from '../config';
export default async function createAWSStream(): Promise<SmartStream> {
return new Promise((resolve, reject) => {
const params = {
Bucket: s3Env.bucket,
Key: s3Env.key
};
try {
const s3 = new AWS.S3({
accessKeyId: s3Env.accessKey,
secretAccessKey: s3Env.secret
});
s3.headObject(bucketParams, (error, data) => {
if (error) {
throw error
};
const stream = s3.getObject(params).createReadStream();
resolve(stream);
})
} catch (error) {
reject(error);
}
})
}
然后我将其通过管道传输到 readline 中:
import * as readline from 'readline';
import createAWSStream from './createAWSStream';
export const readCSVFile = async function(): Promise<void> {
const rStream = await createAWSStream();
const lineReader = readline.createInterface({
input: rStream
});
for await (const line of lineReader) {
// process line
}
}
我发现 s3 连接的超时设置为 120000 毫秒(2 分钟)。我尝试简单地提高超时,但是我遇到了更多来自 HTTPS 连接的超时问题。
如何以正确的方式从 AWS S3 流式传输数据,而不需要在某个非常大的时间范围内设置一堆超时?
最佳答案
我能够使用AWS S3 Range
来解决这个问题属性并使用 NodeJS Stream API 创建自定义可读流.
通过使用这个“智能流”,我能够在对 S3 实例的单独请求中抓取数据 block 。通过分块抓取数据,我避免了任何超时错误并创建了更高效的流。 NodeJS Readable Super 类处理缓冲区,以免重载 readline 的输入。它还自动处理流的暂停和恢复。
此类使得可以非常轻松地从 AWS S3 流式传输大文件:
import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";
export class SmartStream extends Readable {
_currentCursorPosition = 0; // Holds the current starting position for our range queries
_s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
_maxContentLength: number; // Total number of bites in the file
_s3: S3; // AWS.S3 instance
_s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method
constructor(
parameters: S3.GetObjectRequest,
s3: S3,
maxLength: number,
// You can pass any ReadableStream options to the NodeJS Readable super class here
// For this example we wont use this, however I left it in to be more robust
nodeReadableStreamOptions?: ReadableOptions
) {
super(nodeReadableStreamOptions);
this._maxContentLength = maxLength;
this._s3 = s3;
this._s3StreamParams = parameters;
}
_read() {
if (this._currentCursorPosition > this._maxContentLength) {
// If the current position is greater than the amount of bytes in the file
// We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
this.push(null);
} else {
// Calculate the range of bytes we want to grab
const range = this._currentCursorPosition + this._s3DataRange;
// If the range is greater than the total number of bytes in the file
// We adjust the range to grab the remaining bytes of data
const adjustedRange =
range < this._maxContentLength ? range : this._maxContentLength;
// Set the Range property on our s3 stream parameters
this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
// Update the current range beginning for the next go
this._currentCursorPosition = adjustedRange + 1;
// Grab the range of bytes from the file
this._s3.getObject(this._s3StreamParams, (error, data) => {
if (error) {
// If we encounter an error grabbing the bytes
// We destroy the stream, NodeJS ReadableStream will emit the 'error' event
this.destroy(error);
} else {
// We push the data into the stream buffer
this.push(data.Body);
}
});
}
}
}
为了将其应用到 createAWSStream
函数中,我只需替换创建 readStream 的行:
const stream = s3.getObject(params).createReadStream();
创建我的 SmartStream
类的实例,并传入 s3 params 对象、s3 实例和数据的内容长度。
const stream = new SmartStream(params, s3, data.ContentLength);
关于javascript - 使用 NodeJS 从 AWS S3 流式传输文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70625366/