node.js - 使用 Node.js 将大量文件上传到 Google Storage 存储桶

标签 node.js google-app-engine google-cloud-storage

我需要将约 300,000 个文件(约 1.5TB)传输到 Google Cloud 存储桶。

挑战:

  • 这是来自远程源,因此是否可以使用代理 有上传功能吗?
  • 这可能需要几天时间。那么如何最好 运行一个进程这么长? App Engine 服务、Compute Engine 实例、 还有别的吗?

这是我尝试使用的代码:

// Imports the Google Cloud client library.
const Storage = require('@google-cloud/storage');
const db = require('../models');

// Instantiates a client. If you don't specify credentials when constructing
// the client, the client library will look for credentials in the
// environment.
const storage = new Storage();

const bucketName = "bucket-name";
const increment = 5;

let globalCounter = 0;

function getPDFLinks(){
    return new Promise((resolve, reject) => {
        const dbRecords = [/*Array of URLs from our db*/];
        const dlLinkArray = dbRecords.map(link => ({
            link: 'https://sample.domain.com' + link.dataValues.downloadLink,
            filename: link.dataValues.contentID
        }));
        console.log("dlLinkArray Length: ", dlLinkArray.length);
        downloadPDFsSlow(dlLinkArray, 0)
        .then(x => {
            console.log("finished all downloads and updated SQL");
            resolve(x);
        })
        .catch(e => {
            console.error(e);
            reject(e);
        });
    });
}

function downloadPDFsSlow(linksArray, counter){ //<increment> promises at a time. x{link: url, filename: contnetID}
    return new Promise((resolve, reject) => {
        Promise.all(linksArray.slice(counter, counter+increment).map(x => uploadFile(bucketName, x.link, x.filename) ))
        .then(() => {
            //console.log("Map uploadFile results: ", x);
            globalCounter++;
            console.log('globalCounter: ', globalCounter);
            if(linksArray.length > counter){ //have not reached the end of URLs
                const toUpdate = linksArray.slice(counter, counter+increment).map(x => x.filename);
                updateRecords(toUpdate); //ASYNC
                setTimeout(() => downloadPDFsSlow(linksArray, counter+increment), 1000);
            }
            else{ //Reached the end
                console.log("DONE");
                resolve(`downloadPDFsSlow completed ${linksArray.length} records`);
            }
        })
        .catch(e => {
            console.error(e);
            //log error, log slide of arrays that caused error, call next slice of downloadPDFsSlow
            if(linksArray.length > counter){
                console.log("Skipping to next. Counter: ", counter);
                    setTimeout(() => downloadPDFsSlow(linksArray, counter+increment), 1000);
            }else{
                reject('downloadPDFsSlow FAILED at the end' + JSON.stringify(e));
            }
        });
    });
}

function uploadFile(bucketName, fileURL, reName) { //uploads fileURL, deletes if too small, else renames to contentID.pdf
    return new Promise( (resolve, reject) => {
        /**
         * TODO(developer): Uncomment the following lines before running the sample.
         */
        // const bucketName = 'Name of a bucket, e.g. my-bucket';
        // const filename = 'Local file to upload, e.g. ./local/path/to/file.txt';

        // Uploads a local file to the bucket
        storage
            .bucket(bucketName)
            .upload(fileURL, {
            // Support for HTTP requests made with `Accept-Encoding: gzip`
            gzip: true,
            metadata: {
                // Enable long-lived HTTP caching headers
                // Use only if the contents of the file will never change
                // (If the contents will change, use cacheControl: 'no-cache')
                cacheControl: 'public, max-age=31536000',
            },
            })
            .then((x) => {
                console.log("SIZE: ", x[1].size);
                if(x[1].size <= 202 || (x[1].size <= 13236 && x[1].size >= 13234)){ //check file size, if small then PDF not available
                    deleteFile(bucketName, x[1].name)
                    .then(d => resolve(d))
                    .catch(e => reject(e));
                }else{
                    //console.log(`${fileURL} uploaded to ${bucketName}.`);
                    renameFile(bucketName, x[1].name, "pdf/" + reName + ".pdf")
                    .then( renameResult => {
                        //console.log(renameResult);
                        resolve(x);
                    })
                    .catch(e => reject(e));
                }
            })
            .catch(err => {
                console.error('ERROR:', err);
                reject(err);
            });
        // [END storage_upload_file]
    });
}
  
function listFiles(bucketName) {

/**
 * TODO(developer): Uncomment the following line before running the sample.
 */
// const bucketName = 'Name of a bucket, e.g. my-bucket';

// Lists files in the bucket
storage
    .bucket(bucketName)
    .getFiles()
    .then(results => {
    const files = results[0];

    console.log('Files:');
    files.forEach(file => {
        console.log(file.name);
    });
    })
    .catch(err => {
    console.error('ERROR:', err);
    });
// [END storage_list_files]
}

function renameFile(bucketName, srcFilename, destFilename) {
    return new Promise( (resolve, reject) => {
        /**
         * TODO(developer): Uncomment the following lines before running the sample.
         */
        // const bucketName = 'Name of a bucket, e.g. my-bucket';
        // const srcFilename = 'File to move, e.g. file.txt';
        // const destFilename = 'Destination for file, e.g. moved.txt';

        // Moves the file within the bucket
        storage
            .bucket(bucketName)
            .file(srcFilename)
            .move(destFilename)
            .then((x) => {
                console.log(
                    `gs://${bucketName}/${srcFilename} moved to gs://${bucketName}/${destFilename}.`
                );
                resolve(x);
            })
            .catch(err => {
                console.error('ERROR:', err);
                reject(err);
            });
        // [END storage_move_file]
    });
}

function deleteFile(bucketName, filename) {
    return new Promise( (resolve, reject) => {
        /**
         * TODO(developer): Uncomment the following lines before running the sample.
         */
        // const bucketName = 'Name of a bucket, e.g. my-bucket';
        // const filename = 'File to delete, e.g. file.txt';
    
        // Deletes the file from the bucket
        storage
        .bucket(bucketName)
        .file(filename)
        .delete()
        .then((x) => {
            console.log(`gs://${bucketName}/${filename} deleted.`);
            resolve(x);
        })
        .catch(err => {
            console.error('ERROR:', err);
            reject(err);
        });
        // [END storage_delete_file]
    });
}

function updateRecords(recordsToUpdate){
    db.sequelize.sync({force: false}).then(function(){
        Promise.all(recordsToUpdate.map(x => db.Record.update({localFile: x + '.pdf'}, {where: { contentID: x }})))
        .then(() => {
            console.log("Updated filename");
            //db.sequelize.close();
        })
        .catch(e => console.error(e));
    });
}

//EXECUTE
getPDFLinks()
    .then(x => {
        console.log("getPDFLinks COMPLETE");
        console.log(x);
    })
    .catch(e => {
        console.error("getPDFLinks FAILED");
        console.error(e);
    });

最佳答案

我建议您研究一下任务队列 ( Cloud Tasks )。

一个好的方法是为每个(可能是批处理)dbRecords 创建任务。然后,工作进程会提取每个文件并应用您的转换,然后将结果保存到 GCS。这种方法可以为您提供并行性、更短的请求、异步和重试。

https://www.npmjs.com/package/@google-cloud/tasks

关于node.js - 使用 Node.js 将大量文件上传到 Google Storage 存储桶,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52546504/

相关文章:

node.js - 何时在 Node Express 应用程序中使用 app.use()

node.js - Eureka与Node-Js客户端注册成功但心跳失败(总是重新注册)

node.js - 如何在 JetBrains PhpStorm 上安装 Node.js?

javascript - Python 中的 Pubnub 存在功能 (GAE)

java - 帮助创建泛型类以避免代码重复

python - Beam/Dataflow 自定义 Python 作业 - Cloud Storage 到 PubSub

javascript - 在 Node.js 中混合使用 JavaScript 和 TypeScript

google-app-engine - 限制对 SyncGateway 的 Admin REST API 的访问

python - 如何从本地 python 应用程序访问谷歌云存储?

python-3.x - python 使用服务帐户 json 文件推送 GCP 云存储文件