集成 BigQuery SubPub 和 Cloud Functions

问题描述

我在一个项目中,我们需要使用 BigQuery、PubSub、Logs explorer 和 Cloud Functions。

项目:

每次发生特定事件(例如用户接受 cookie)时,系统都会向 BigQuery 中插入一个包含许多列(参数)的新查询,例如:utm_source、utm_medium、consent_cookies 等...

在我的表中有这个新查询后,我需要读取列并获取要在云函数中使用的值。

云函数中,我想使用这些值进行 api 调用

到目前为止我设法做到的:

我创建了一个日志路由接收器,用于过滤新条目并将日志发送到我的 PubSub 主题

我被困的地方:

我想创建一个每次有新日志进入时触发的云函数,在该函数中我想访问日志中包含的信息,例如 utm_source、utm_medium、consent_cookies 等......并使用进行 api 调用的值。

谁能帮帮我?提前致谢!

我做了一个项目来说明流程:

  1. 插入表格:

    enter image description here

2.从此插入在日志中创建一个接收器:(过滤)

enter image description here

  1. 现在,每次我创建一个查询时,它都会转到 PUB/SUB,我会得到查询的日志

    enter image description here

  1. 我想要做的是触发一个关于这个主题函数,并使用我在查询中的值来执行调用 api 等操作...

到目前为止,我能够编写此代码

"use strict";

function main() {
  // Import the Google Cloud client library
  const { BigQuery } = require("@google-cloud/bigquery");

  async function queryDb() {
    
    const bigqueryClient = new BigQuery();

   
    const sqlQuery = `SELECT *  FROM \`mydatatable\``;

    const options = {
      query: sqlQuery,location: "europe-west3",};

    // Run the query
    const [rows] = await bigqueryClient.query(options);

    rows.forEach((row) => {
      const username = row.user_name;
    });
  }

  queryDb();
}

main();

现在我又被卡住了,我不知道如何从我创建的接收器中获取正确的查询并使用这些信息来拨打我的电话...

解决方法

您有 2 种解决方案可以从 PubSub 消息中调用您的云函数

  • HTTP 函数:您可以设置 HTTP 调用。在 trigger-http 中创建您的 Cloud Functions 函数,并在您的 PubSub 主题上创建推送订阅以调用 Cloud Functions。不要忘记添加安全性(将您的函数设为私有并在 PubSub 上启用安全性),因为您的函数是可公开访问的
  • 后台函数:您可以将您的云函数直接绑定到 PubSub 主题。订阅会自动创建并链接到 Cloud Functions。安全性是内置的。

而且,因为您有 2 种类型的函数,所以您有 2 个不同的函数签名。我提供给你们两个,处理是(完全)相同的。

function extractQuery(pubSubMessage){
    // Decide base64 the PubSub message
    let logData = Buffer.from(pubSubMessage,'base64').toString();
    // Convert it in JSON
    let logMessage= JSON.parse(logData)
    // Extract the query from the log entry
    let query = logMessage.protoPayload.serviceData.jobInsertRequest.resource.jobConfiguration.query.query

    console.log(query)
    return query
}

// For HTTP functions
exports.bigqueryQueryInLog = (req,res) => {

    console.log(req.body)
    const query = extractQuery(req.body.message.data)

    res.status(200).send(query);
}

// For Background functions
exports.bigqueryQueryInLogTopic = (message,context) => {
    extractQuery(message.data)
};

记录的 query 是您在日志条目中的 insert into...。然后,您必须解析您的 SQL 请求以提取您想要的部分。