NodeJS 中的异步 foreach 或 for 循环 编辑:

问题描述

我处于 NodeJs 的初级水平,所以请原谅。以下 lambda 函数是在 S3 中压缩/压缩文件并将压缩文件上传回 S3。 listofKeys 包含要压缩的密钥列表。如果您注意到 for (const file in listofKeys),如果数据集很大,即如果 listofKeys 有很长的键列表,它会同步运行导致 lambda 超时。问题是,有没有办法异步或并行运行循环,以便及时或异步压缩文件

代码

const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');

const awsOptions = {
    region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);

const streamTo = (bucket,key) => {
    var passthrough = new stream.Passthrough();
    s3.upload({
        Bucket: bucket,Key: key,Body: passthrough,ContentType: "application/zip",},(err,data) => {
            if (err) throw err;
        }
    );
    return passthrough;
};

const getStream = (bucket,key) => {
    let streamCreated = false;
    const passthroughStream = new stream.Passthrough();

    passthroughStream.on("newListener",event => {
        if (!streamCreated && event == "data") {
            const s3Stream = s3
                .getobject({ Bucket: bucket,Key: key })
                .createReadStream();
            s3Stream
                .on("error",err => passthroughStream.emit("error",err))
                .pipe(passthroughStream);

            streamCreated = true;
        }
    });
    return passthroughStream;
};

exports.handler = async (event,context,callback) => {

    let totalKeys = 0;
    const listofKeys = [];
    const SrcBucket = event.Records[0].s3.bucket.name;
    const trigger_file = event.Records[0].s3.object.key;
    const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
    const dirToZip = trigger_file.split('/')[2].substr(0,trigger_file.split('/')[2].length - '.renamed'.length);
    const s3ListFilter = prefix + dirToZip;
    const destinationKey = prefix + `${dirToZip}.zip`;
    const bucketParams = {
        Bucket: SrcBucket,Delimiter: '/',Prefix: s3ListFilter + '/'
    };

    let data;
    do {
        bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
        data = await s3.listObjects(bucketParams).promise();
        const contents = data.Contents;
        totalKeys = totalKeys + contents.length;
        listofKeys.push(...contents.map(x => x.Key));
    } while (data.IsTruncated);

    console.log(`Total keys: ${listofKeys.length}`);
    
    await new Promise(async (resolve,reject) => {
        var zipStream = streamTo(SrcBucket,destinationKey);
        zipStream.on("close",resolve);
        zipStream.on("end",resolve);
        zipStream.on("error",reject);
        var archive = archiver("zip");
        archive.on("error",err => {
            throw new Error(err);
        });
        archive.pipe(zipStream);

        var keysCounter = 0;
        listofKeys.forEach(file => {
            archive.append(getStream(SrcBucket,file),{ name: file.split('/')[3] })
            keysCounter++
            if (keysCounter >= Object.keys(listofKeys).length) {
                // Called at the end of the loop
                archive.finalize();
            }
        });

        //archive.finalize();
    }).catch(err => {
        throw new Error(err);
    });

    callback(null,{
        body: { final_destination: destinationKey }
    });
};

解决方法

Array.prototype.forEach()

const array1 = ['a','b','c'];

array1.forEach(element => console.log(element));

// expected output: "a"
// expected output: "b"
// expected output: "c"

所以你的代码应该是:

listOfKeys.forEach(file => {
    archive.append(getStream(SrcBucket,listOfKeys[file]),{ name: listOfKeys[file].split('/')[3] })
})

(不确定它是否有效,请告诉我)

来源:Array.prototype.forEach() | MDN

编辑:

所以 archive.finalize() 应该在循环结束后调用,有几种方法可以做到,但我认为这个应该可以正常工作。见:Callback after all asynchronous forEach callbacks are completed

//There's probably a better way to do it but it works :
keysCounter = 0
listOfKeys.forEach(file => {
    archive.append(getStream(SrcBucket,{ name: listOfKeys[file].split('/')[3] })
    keyCounter++
    if(keyCounter >= Object.keys(listOfKeys).length) {
        // Called at the end of the loop
        archive.finalize();
    }
})
,

我可能会更积极地重写整个内容,但要回答您的具体问题:将您的 listOfKeys.forEach 语句替换为:

await Promise
  .all(
    listOfKeys.map(key => archive.append(getStream(SrcBucket,key),{ name: key.split('/')[3] }))
  );
,

不要尝试在一个 lambda 函数中执行它们,而是使用 SQS 来抵消它们,使用单独的 lambda 来处理每个 zip。

这样,您可以隔离以下内容:

  • 每个 zip 存档之间的失败
  • 并行运行每个 zip 进程
  • 在单个 lambda 函数中隔离每个 zip 的处理
  • 为无法处理的消息(或 zip)实施死信队列
  • 在您的应用程序中创建 SRP(单一职责),即一个 lambda 使用 zip 文件,另一个来处理它

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...