如何使用 Cloud Functions 读取存储在 Google Cloud Storage 中的 CSV 数据

问题描述

作为与庞大用户群沟通工作的一部分,我每天需要发送超过 75,000 封电子邮件。我正在联系的用户的电子邮件存储在一个 CSV 文件中。我一直在使用 Postman Runner 通过 SendGrid(电子邮件 API)发送这些请求,但是由于数量如此之大,我的计算机要么速度变慢,要么 Postman 在批处理完成之前完全崩溃。即使没有崩溃,通过 Runner 发送这么多 POST 请求也需要 3 个小时以上的时间。

我想将包含电子邮件的 CSV 文件上传到 Cloud Storage 存储分区,然后使用 Cloud Functions 访问该文件以针对每封电子邮件发送 POST 请求。这样,所有的处理都可以由 GCP 处理,而不是由我的个人机器处理。但是,我似乎无法让 Cloud Function 逐行读取 CSV 数据。我已尝试将 Cloud Storage NodeJS 客户端库中的 createReadStream() 与 csv-parser 一起使用,但无法使此解决方案起作用。以下是我尝试过的:

const sendGridMail = require('@sendgrid/mail');
const { Storage } = require('@google-cloud/storage');
const fs = require('fs');
const csv = require('csv-parser');

exports.sendMailFromCSV = (file,context) => {

    console.log(`  Event: ${context.eventId}`);
    console.log(`  Event Type: ${context.eventType}`);
    console.log(`  Bucket: ${file.bucket}`);
    console.log(`  File: ${file.name}`);
    console.log(`  Metageneration: ${file.metageneration}`);
    console.log(`  Created: ${file.timeCreated}`);
    console.log(`  Updated: ${file.updated}`);

    const storage = new Storage();
    const bucket = storage.bucket(file.bucket);
    const remoteFile = bucket.file(file.name);
    console.log(remoteFile);

    let emails = [];
       
    fs.createReadStream(remoteFile)
        .pipe(csv())
        .on('data',function (row) {
            console.log(`Email read: ${row.email}`);
            emails.push(row.email);
        //send email using the SendGrid helper library
        const msg = {
                to: [{
                    "email": row.email;
                }],from: "fakeemail@gmail.com",template_id: "fakeTemplate",};

            sendGridMail.send(msg).then(() =>
                context.status(200).send(file.body))
                .catch(function (err) {
                    console.log(err);
                    context.status(400).send(file.body);
                });
        })
        .on('end',function () {
            console.table(emails);
        });    
};

云函数当前由上传到云存储存储桶触发。

有没有办法在不将文件加载到内存中的情况下构建此问题的解决方案? Cloud Functions 是向下移动的正确路径,还是使用 App Engine 或其他一些工具会更好?愿意尝试将这个过程移到云端的任何 GCP 解决方案

解决方法

Cloud Function 的内存可以作为临时目录 /tmp 共享/使用。因此,您可以将云存储桶中的 csv 文件作为本地文件下载到该目录中,然后对其进行处理,就像从本地驱动器处理该文件一样。

同时,您可能想记住 2 个主要限制:

  1. 内存 - 高达 2Gb 的所有内容
  2. 超时 - 每次调用不超过 540 秒。

我个人会根据一些 GCP 资源的组合创建一个解决方案。

第一个云函数由 'finlize' 事件触发 - 当 csv 文件保存在存储桶中时。此云功能读取文件,并为每条记录撰写包含相关详细信息的 Pub/Sub 消息(足以发送电子邮件)。该消息发布到 Pub/Sub 主题中。

Pub/Sub 主题用于传输来自第一个云函数的所有消息以触发第二个云函数。

第二个云函数由 Pub/Sub 消息触发,其中包含处理和发送电子邮件所需的所有详细信息。由于源 csv 文件中可能有 75K 条记录(例如),您应该期望第二个云函数调用 75K 次。

这在高层次上可能就足够了。 Pub/Sub 范式保证至少发送一次(但可能不止一次),因此如果您需要每个地址不超过一封电子邮件,则可能需要一些额外的资源来实现幂等行为。

,

基本上,您必须将文件本地下载到 Cloud Function 机器中才能以这种方式读取。

现在有多种方法可以解决此问题。

最基本/最简单的方法是配置 Compute Engine 机器,如果是一次性事件,则从它运行此操作。

如果您需要更频繁地(即每天)执行此操作,您可以使用在线工具将您的 csv 文件转换为 json 并将其导入 Firestore,然后您可以更快地阅读来自 Firestore 的电子邮件。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...