问题描述
我处于 NodeJs 的初级水平,所以请原谅。以下 lambda 函数是在 S3 中压缩/压缩文件并将压缩文件上传回 S3。 listofKeys
包含要压缩的密钥列表。如果您注意到 for (const file in listofKeys)
,如果数据集很大,即如果 listofKeys
有很长的键列表,它会同步运行导致 lambda 超时。问题是,有没有办法异步或并行运行循环,以便及时或异步压缩文件?
代码:
const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');
const awsOptions = {
region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);
const streamTo = (bucket,key) => {
var passthrough = new stream.Passthrough();
s3.upload({
Bucket: bucket,Key: key,Body: passthrough,ContentType: "application/zip",},(err,data) => {
if (err) throw err;
}
);
return passthrough;
};
const getStream = (bucket,key) => {
let streamCreated = false;
const passthroughStream = new stream.Passthrough();
passthroughStream.on("newListener",event => {
if (!streamCreated && event == "data") {
const s3Stream = s3
.getobject({ Bucket: bucket,Key: key })
.createReadStream();
s3Stream
.on("error",err => passthroughStream.emit("error",err))
.pipe(passthroughStream);
streamCreated = true;
}
});
return passthroughStream;
};
exports.handler = async (event,context,callback) => {
let totalKeys = 0;
const listofKeys = [];
const SrcBucket = event.Records[0].s3.bucket.name;
const trigger_file = event.Records[0].s3.object.key;
const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
const dirToZip = trigger_file.split('/')[2].substr(0,trigger_file.split('/')[2].length - '.renamed'.length);
const s3ListFilter = prefix + dirToZip;
const destinationKey = prefix + `${dirToZip}.zip`;
const bucketParams = {
Bucket: SrcBucket,Delimiter: '/',Prefix: s3ListFilter + '/'
};
let data;
do {
bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
data = await s3.listObjects(bucketParams).promise();
const contents = data.Contents;
totalKeys = totalKeys + contents.length;
listofKeys.push(...contents.map(x => x.Key));
} while (data.IsTruncated);
console.log(`Total keys: ${listofKeys.length}`);
await new Promise(async (resolve,reject) => {
var zipStream = streamTo(SrcBucket,destinationKey);
zipStream.on("close",resolve);
zipStream.on("end",resolve);
zipStream.on("error",reject);
var archive = archiver("zip");
archive.on("error",err => {
throw new Error(err);
});
archive.pipe(zipStream);
var keysCounter = 0;
listofKeys.forEach(file => {
archive.append(getStream(SrcBucket,file),{ name: file.split('/')[3] })
keysCounter++
if (keysCounter >= Object.keys(listofKeys).length) {
// Called at the end of the loop
archive.finalize();
}
});
//archive.finalize();
}).catch(err => {
throw new Error(err);
});
callback(null,{
body: { final_destination: destinationKey }
});
};
解决方法
Array.prototype.forEach()
const array1 = ['a','b','c'];
array1.forEach(element => console.log(element));
// expected output: "a"
// expected output: "b"
// expected output: "c"
所以你的代码应该是:
listOfKeys.forEach(file => {
archive.append(getStream(SrcBucket,listOfKeys[file]),{ name: listOfKeys[file].split('/')[3] })
})
(不确定它是否有效,请告诉我)
来源:Array.prototype.forEach() | MDN
编辑:
所以 archive.finalize()
应该在循环结束后调用,有几种方法可以做到,但我认为这个应该可以正常工作。见:Callback after all asynchronous forEach callbacks are completed
//There's probably a better way to do it but it works :
keysCounter = 0
listOfKeys.forEach(file => {
archive.append(getStream(SrcBucket,{ name: listOfKeys[file].split('/')[3] })
keyCounter++
if(keyCounter >= Object.keys(listOfKeys).length) {
// Called at the end of the loop
archive.finalize();
}
})
,
我可能会更积极地重写整个内容,但要回答您的具体问题:将您的 listOfKeys.forEach
语句替换为:
await Promise
.all(
listOfKeys.map(key => archive.append(getStream(SrcBucket,key),{ name: key.split('/')[3] }))
);
,
不要尝试在一个 lambda 函数中执行它们,而是使用 SQS 来抵消它们,使用单独的 lambda 来处理每个 zip。
这样,您可以隔离以下内容:
- 每个 zip 存档之间的失败
- 并行运行每个 zip 进程
- 在单个 lambda 函数中隔离每个 zip 的处理
- 为无法处理的消息(或 zip)实施死信队列
- 在您的应用程序中创建 SRP(单一职责),即一个 lambda 使用 zip 文件,另一个来处理它