Node.js 上传 Image Stream.Readable 到 S3

问题描述

我的 lambda 是由浏览器的请求触发的。浏览器将图像作为 multipart/form-data 发送。

lambda 使用 busboy 来解析请求:

function parseForm(event: IHttpEvent) {
  return new Promise(
    (resolve,reject) => {
      const busboy = new Busboy({
        headers: event.headers,limits: { files: 10 },});
      const imageResponse = new Map<string,IImageParseResponse>();

      busboy.on("file",(id,file,filename,encoding,mimeType) => {
           imageResponse.set(id,{ file,mimeType });
      });

      busboy.on("error",(error) => reject(`Parse error: ${error}`));
      busboy.on("finish",() => resolve(imageResponse));

      busboy.write(event.body,event.isBase64Encoded ? "base64" : "binary");
      busboy.end();
    }
  );
}

当我解析请求时,我想将文件上传到 AWS S3。

export async function handler(event: IHttpEvent) {
  var res = await parseForm(event);
  const s3 = new S3Client({ region: "eu-central-1" });
  for (const [k,v] of res) {
    console.log(`File ${v.filename} ${v.mimeType} streaming`);
    const stream = new Readable().wrap(v.file);
    const upload = new Upload({
      client: s3,params: {
        Key: v.filename,Bucket: "my-image-bucket",Body: stream,ContentType: v.mimeType,},});
    upload.on("httpUploadProgress",(p) => console.log(p));
    const result = await upload.done();
    console.log(result);
    return result;
  }
}

这不起作用。但是,浏览器将收到带有 200 OK 正文响应的 null。更让我困惑的是 console.log(result); 没有将任何内容记录到控制台。

我的错误在哪里?我不完全了解流的机制。但据我所知,它会更节省内存。将来我计划一次上传多张图片。为了节省成本,我希望我的方法尽可能高效。

解决方法

总的来说,我犯了两个错误。

  1. 当流已被 busboy 读到最后时尝试上传流
  2. 在终止函数之前,我没有正确等待上传到 s3 的完成。

最后我得到了以下结果:

const s3 = new S3Client({ region: "eu-central-1" });
const { BUCKET_NAME,MAX_IMAGE_SIZE } = process.env;

export async function handler(event: IHttpEvent) {
  const results = await parseForm(event);
  const response = [];
  for (const r of results) {
    if (r.status === "fulfilled") {
      const value: any = r.value.result;
      response.push({
        id: r.value.id,key: value.Key,url: value.Location,});
    }
    if (r.status === "rejected")
      response.push({ id: r.reason.id,reason: r.reason.error });
  }
  return response;
}

async function doneHandler(
  id: string,uploadMap: Map<string,Upload>
): Promise<{ id: string; result: ServiceOutputTypes }> {
  try {
    var result = await uploadMap.get(id).done();
  } catch (e: any) {
    var error = e;
  } finally {
    uploadMap.delete(id);
    if (error) throw { id,error };
    return { id,result };
  }
}

function parseForm(event: IHttpEvent) {
  return new Promise( (resolve,reject) => {
      const busboy = new Busboy({
        headers: event.headers,limits: { files: 1,fileSize: parseInt(MAX_IMAGE_SIZE) },});

      const responses: Promise<{
        id: string;
        result: ServiceOutputTypes;
      }>[] = [];
      const uploads = new Map<string,Upload>();

      busboy.on("file",(id,file,filename,encoding,mimeType) => {
        uploads.set(
          id,new Upload({
            client: s3,params: {
              Bucket: BUCKET_NAME,Body: new Readable().wrap(file),Key: filename,ContentType: mimeType,ContentEncoding: encoding,},})
        );
        responses.push(doneHandler(id,uploads));

        file.on("limit",async () => {
          const aborts = [];
          for (const [k,upload] of uploads) {
            aborts.push(upload.abort());
          }
          await Promise.all(aborts);
          return reject(new Error("File is too big."));
        });
      });

      busboy.on("error",(error: any) => {
        reject(new Error(`Parse error: ${error}`));
      });
      busboy.on("finish",async () => {
        const res = await Promise.allSettled(responses);
        resolve(res);
      });

      busboy.write(event.body,event.isBase64Encoded ? "base64" : "binary");
      busboy.end();
    }
  );
}

此解决方案还处理文件限制并尝试中止所有待处理的 S3 上传