javascript - 使用 NodeJS 从 AWS S3 流式传输文件

标签 javascript node.js typescript amazon-s3 streaming

我正在尝试将数据从大型 csv 文件流式传输到 readline 中。我尝试将 readStream 从 s3 通过管道传输到 readline 输入,但是我遇到了一个错误,S3 只允许连接在一定时间内保持打开状态。

我正在从 s3 创建流,如下所示:

import * as AWS from 'aws-sdk';
import {s3Env} from '../config';

export default async function createAWSStream(): Promise<SmartStream> {
    return new Promise((resolve, reject) => {
        const params = {
            Bucket: s3Env.bucket,
            Key: s3Env.key
        };

        try {
            const s3 = new AWS.S3({
                accessKeyId: s3Env.accessKey,
                secretAccessKey: s3Env.secret
            });

            s3.headObject(bucketParams, (error, data) => {
                if (error) {
                    throw error
                };

                const stream = s3.getObject(params).createReadStream();

                resolve(stream);
            })
        } catch (error) {
            reject(error);
        }
    })
}

然后我将其通过管道传输到 readline 中:

import * as readline from 'readline';
import createAWSStream from './createAWSStream';

export const readCSVFile = async function(): Promise<void> {
  const rStream = await createAWSStream();

  const lineReader = readline.createInterface({
    input: rStream
  });

  for await (const line of lineReader) {
    // process line
  }
}

我发现 s3 连接的超时设置为 120000 毫秒(2 分钟)。我尝试简单地提高超时,但是我遇到了更多来自 HTTPS 连接的超时问题。

如何以正确的方式从 AWS S3 流式传输数据,而不需要在某个非常大的时间范围内设置一堆超时?

最佳答案

我能够使用AWS S3 Range来解决这个问题属性并使用 NodeJS Stream API 创建自定义可读流.

通过使用这个“智能流”,我能够在对 S3 实例的单独请求中抓取数据 block 。通过分块抓取数据,我避免了任何超时错误并创建了更高效的流。 NodeJS Readable Super 类处理缓冲区,以免重载 readline 的输入。它还自动处理流的暂停和恢复。

此类使得可以非常轻松地从 AWS S3 流式传输大文件:

import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";

export class SmartStream extends Readable {
    _currentCursorPosition = 0; // Holds the current starting position for our range queries
    _s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
    _maxContentLength: number; // Total number of bites in the file
    _s3: S3; // AWS.S3 instance
    _s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method

    constructor(
        parameters: S3.GetObjectRequest,
        s3: S3,
        maxLength: number,
        // You can pass any ReadableStream options to the NodeJS Readable super class here
        // For this example we wont use this, however I left it in to be more robust
        nodeReadableStreamOptions?: ReadableOptions
    ) {
        super(nodeReadableStreamOptions);
        this._maxContentLength = maxLength;
        this._s3 = s3;
        this._s3StreamParams = parameters;
    }

    _read() {
        if (this._currentCursorPosition > this._maxContentLength) {
            // If the current position is greater than the amount of bytes in the file
            // We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
            this.push(null);
        } else {
            // Calculate the range of bytes we want to grab
            const range = this._currentCursorPosition + this._s3DataRange;
            // If the range is greater than the total number of bytes in the file
            // We adjust the range to grab the remaining bytes of data
            const adjustedRange =
                range < this._maxContentLength ? range : this._maxContentLength;
            // Set the Range property on our s3 stream parameters
            this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
            // Update the current range beginning for the next go
            this._currentCursorPosition = adjustedRange + 1;
            // Grab the range of bytes from the file
            this._s3.getObject(this._s3StreamParams, (error, data) => {
                if (error) {
                    // If we encounter an error grabbing the bytes
                    // We destroy the stream, NodeJS ReadableStream will emit the 'error' event
                    this.destroy(error);
                } else {
                    // We push the data into the stream buffer
                    this.push(data.Body);
                }
            });
        }
    }
}

为了将其应用到 createAWSStream 函数中,我只需替换创建 readStream 的行:

const stream = s3.getObject(params).createReadStream();

创建我的 SmartStream 类的实例,并传入 s3 params 对象、s3 实例和数据的内容长度。

const stream = new SmartStream(params, s3, data.ContentLength);

关于javascript - 使用 NodeJS 从 AWS S3 流式传输文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70625366/

相关文章:

node.js - 从 docker 容器和 Node 使用 websocket 时出现问题

javascript - 我在使用 findOneAndUpdate 时遇到问题,当其中一个搜索数据未定义时不会返回错误

node.js - Node 和 docker - 如何处理 babel 或 typescript 构建?

typescript - ts 实例化一个作为参数传递的类

javascript - 同时触发两个元素的悬停?

javascript - Node.js + Mocha + Should.js 如何测试事件发射器抛出的错误?

javascript - 三.js反向光线转换

javascript - 你能根据 th 关系设计一个 td 吗?

javascript - 是否可以将 Realm 与 A​​ngular 2 一起使用?

arrays - 在 TypeScript 中,如何声明接受字符串并返回字符串的函数数组?