使用 Dataflow 将 csv 数据从 Pub/Sub 订阅流式传输到 BigQuery

问题描述

使用 GCP 探索 ETL 过程。我在 Dataflow 中使用发布/订阅订阅 BigQuery 模板。

Pub/Sub 订阅中的消息数据为 csv 格式,如下所示

53466,06/30/2020,Trinidad and Tobago,2020-07-01 04:33:52,130.0,8.0,113.0

这会在加载到 BigQuery 表时留下错误。如何在模板中将 CSV 数据转换为 JSON?

解决方法

我猜您使用了 this template ,它只能用于 Pub/Sub 订阅中的 JSON 格式字符串。文件也这么说。

据我所知,另一种方法是自行为 CSV 流数据自定义 this code

,

解决了!!

在使用发布/订阅 Bigquery 模板创建作业时,单击查看选项参数。在哪里可以设置.js文件路径和UDF函数名。

enter image description here

这里是转换的 JS 代码,即从 CSV 格式到 JSON 格式。

function transform(messages) {
  var values = messages.split(',');

  // Construct output and add transformations
  var obj = new Object();
  obj.SNo = values[0];
  var dateObj = values[1];
  // Date format in file is dd/mm/YYYY
  // Transform the field to Date format required for BigQuery that is YYYY-mm-dd
  obj.ObservationDate = dateObj.replace(/(\d\d)\/(\d\d)\/(\d{4})/,"$3-$1-$2");
  obj.Provision_State = values[2];
  obj.Country_Region = values[3];
  obj.Last_Update = values[4];
  obj.Confirmed = values[5];
  obj.Deaths = values[6];
  obj.Recovered = values[7];
  // add object to JSON
  var jsonString = JSON.stringify(obj);

  return jsonString;
}