当文件完成流式传输到AWS S3上传API时的事件监听器?

问题描述

我正在Google Drive和AWS S3之间创建文件备份。在这里,我使用Google Get API下载文件并将数据插入到AWS S3中,从而创建了Readable流promise。 由于我有许多文件,因此每个承诺都会添加到队列中,并且在解析时只会输入新的承诺。

我正在努力仅在文件完成上传到AWS S3时解决承诺,而不是在文件下载时解决承诺?

我认为使用.on('finish',() => {resolve()})应该可以做到这一点,但是似乎不起作用。

这是我的代码示例:

// download stream of NON gdocs files and pipe to destination
const getGFileContent = async (fileObj) => {  
  let fileExt = fileObj.path.join('/').concat('/',fileObj.name)

  return drive.files.get({fileId: fileObj.id,mimeType: fileObj.mimeType,alt: 'media'},{responseType: 'stream'})
    .then(res => {
      return new Promise((resolve,reject) => {
        res.data
          .pipe(uploadS3(fileExt))
          .on('end',() => {console.log(`Done downloading file: ${fileExt}`)})
          .on('finish',() => {resolve(console.log(`File Backup Complete: ${fileExt}`))})
          .on('error',err => {reject(console.error(`Error downloading file: ${err}`))})
      })

// upload a file to AWS S3 by passing the file stream from getGFileContent into the 'body' parameter of the upload
const uploadS3 = (filePath) => {
  let pass = new stream.Passthrough()
  let params = {
    Bucket: awsBucketName,// bucket-name
    Key: filePath,// file will be saved as bucket-name/[uniquekey.csv]
    Body: pass  // file data passed through stream
  } 
  new aws.S3().upload(params).promise()
    .then(() => console.log(`Successfully uploaded to S3: ${filePath}`))
    .catch( err => console.log(`Error,unable to upload to S3: ${err}`))
  return pass
}

解决方法

首先要想到的是,在返回passThrough流之前,先使uploadS3函数asyncawait都完成上传。 但这是行不通的。然后它将返回一个Promise,并且.pipe()仅接受一个流对象。

代替,您可以重构代码,以便getGFileContent返回可读的流保证。

然后,使uploadS3接受可读流作为参数并返回s3上传承诺。

要对其进行总结,请添加一个异步backupFile函数,该函数将await用于GDrive Steam和要在继续之前解决的上载承诺。这还将使功能保持整洁,干净,每个功能都有其自己的单一职责。

示例代码:

const AWS = require('aws-sdk');
const fs = require('fs');

const s3 = new AWS.S3();

AWS.config.update({
    accessKeyId: '----',secretAccessKey: '----',});

const backupFile = async (file) => {
    const fileStream = await getGFileStream(file);
    try {
        await uploadStreamToS3(fileStream);
        console.log(`S3 Backup of ${fileStream.path} completed`)
    } catch (err) {
        console.log(`error during file upload ${err}`);
    }
}

const getGFileStream = async (fileObj) => {
    // TODO: logic to find and get the file. Returns a readableStream promise 
    const fileStream = fs.createReadStream('./largeFile.zip');
    console.log('File ${...} read from Google Drive');
    return fileStream;
}

const uploadStreamToS3 = (fileStream) => {
    const params = {Bucket: 'test-bucket',Key: 'key',Body: fileStream}
    console.log(`Starting to upload ${fileStream.path} to S3`);
    return s3.upload(params).promise();
}

backupFile({id: 'mockTestFile'});