问题描述
我正在尝试编写一个 lambda 函数,该函数可以将一个巨大的 csv 文件从 s3 存储桶转换为多个小的 json 文件(比如 2000 行的 json 文件)。我虽然有一些限制,例如在 256 MB 的有限 RAM 内存中运行。
我可以通过将文件作为文件而不是像下面这样的流来做同样的事情。
但是由于内存限制,我需要在流中处理这个问题。有没有办法使用流来做同样的事情?
// transformationClass.js
const csv = require('csvtojson');
const extension = '.json';
class S3CsvToJson {
static async perform(input,output,headers) {
let jsonArray = null;
const s3Object = await s3.getobject(); // getting the s3 object
const csvString = s3Object.Body.toString('utf8');
await csv({
noheader: false,})
.fromString(csvString)
.then((csvRow) => {
jsonArray = csvRow;
});
const fileNames = await S3CsvToJson.writetoFile(jsonArray,output);
return { files: fileNames };
}
static async writetoFile(jsonArray,output) {
const minNumber = 0;
const maxnumber = 1999;
const fileNames = [];
let outFile;
if (jsonArray && Array.isArray(jsonArray)) {
let fileIterator = 1;
while (jsonArray.length) {
outFile = `${output.key}-${fileIterator}${extension}`;
await // s3.putObject(). writing to s3
.putObject(
outFile,output.bucketName,JSON.stringify(jsonArray.splice(minNumber,maxnumber)),);
console.log('rows left :',jsonArray.length);
fileNames.push(outFile);
fileIterator += 1;
}
}
return fileNames;
}
}
module.exports = S3CsvToJson;
这里是处理函数
// handler.js
module.exports.perform = async (event,context,callback) => {
context.callbackWaitsForEmptyEventLoop = false;
await s3CsvToJson.perform(event.input,event.output,event.headerMapping)
.then((result) => callback(null,result));
console.log('leaving - ',Date.Now());
};
提前致谢!!
解决方法
在经历了很多事情之后,我终于找到了一种方法来完成它。
我要做的是,将整个过程包装成一个promise并返回。我从 s3 创建了一个读取流,将它转发到解析器,然后转发到写入流。 我想在这里分享它,所以它可能对其他人有用。也欢迎任何更好的优化解决方案。
// transformationClass.js
const csv = require('fast-csv');
const { Transform,pipeline } = require('stream');
const extension = '.json';
class S3CsvToJson {
static async perform(input,output,headers) {
console.log(input,headers);
const threshold = 2000;
try {
const promiseTransformData = () => new Promise((resolve,reject) => {
try {
let jsonOutput = [];
let fileCounter = 0;
const fileNames = [];
const writableStream = new Transform({
objectMode: true,autoDestroy: true,async transform(data,_,next) {
if (jsonOutput.length === threshold) {
fileCounter += 1;
const fileUpload = new Promise((resolveWriter) => {
s3
.putObject(
`${output.key}-${fileCounter}${extension}`,output.bucketName,JSON.stringify(jsonOutput),)
.then(() => resolveWriter());
});
await fileUpload;
fileNames.push(`${output.key}-${fileCounter}${extension}`);
jsonOutput = [];
}
jsonOutput.push(data);
next();
},});
const readFileStream = s3.getReadStream(input.key,input.bucketName);
pipeline(
readFileStream,csv.parse({ headers: true }),writableStream,(error) => {
// if (err) throw new Error('Pipeline error');
if (error) {
console.error(`Error occurred in pipeline - ${error}`);
resolve({ errorMessage: error.message });
}
},);
writableStream.on('finish',async () => {
if (jsonOutput.length) {
fileCounter += 1;
const fileUpload = new Promise((resolveWriter) => {
s3
.putObject(
`${output.key}-${fileCounter}${extension}`,)
.then(() => resolveWriter());
});
await fileUpload;
fileNames.push(`${output.key}-${fileCounter}${extension}`);
jsonOutput = [];
}
console.log({ status: 'Success',files: fileNames });
resolve({ status: 'Success',files: fileNames });
});
} catch (error) {
console.error(`Error occurred while transformation - ${error}`);
resolve({ errorMessage: error ? error.message : null });
}
});
return await promiseTransformData();
} catch (error) {
return error.message || error;
}
}
}
module.exports = S3CsvToJson;
对于处理程序,我像这样调用 S3CsvToJson
// handler.js
module.exports.perform = async (event,context,callback) => {
context.callbackWaitsForEmptyEventLoop = false;
await s3CsvToJson.perform(event.input,event.output,event.headerMapping)
.then((result) => callback(null,result))
.catch((error) => callback(error));
};
希望对你有帮助。谢谢!