使用Google Cloud Dataflow在BigQuery中解析和写入巨大的json文件时出现问题

问题描述

我正在尝试创建一个数据流(批处理),该数据流每小时从Google Cloud Storage中读取一个文件,进行解析并将其写入BigQuery表中。该文件是一个.json文件,每行都有一个复杂的json。

我创建了一个简单的管道:

(p 
| "Read file" >> beam.io.ReadFromText(cusom_options.file_name)
| "Parse line json" >> beam.Map(parse)
| "Write in BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        table=cusom_options.table))

解析函数是这样的:

def parse(input_elem):
   log = json.loads(input_elem)
   result = {}

   ... # Get some fields from input and add to "result"

   return result

该管道可以正常工作,文件大小为100 Mb,行数为70K(每个作业5分钟左右的距离)。但是,当文件增加时,数据流将花费更多的时间(15分钟,200-300 Mb)或未完成并以失败告终(超过1.5 Gb和350K的行)。

我进行了一些测试,当我在函数 parse 中创建了一个json示例,但未使用 input_elem 时,数据流正常运行,并为7中的每个条目创建了一行-8分钟。

我不知道管道的问题在哪里,有人遇到类似的问题吗?

更多信息

  • 我正在使用 beam.io.gcp.bigquery.WriteToBigQuery 从BQ表中自动检测架构,但是如果使用 beam.io.WriteToBigQuery ,同样的问题。
  • 表名是在数据流之外预先计算的,并且是输入
  • 数据流中使用的参数--experiment = use_beam_bq_sink,--subnetwork = MY_SUBNET,--region = MY_REGION

解决方法

我们终于解决了这个问题。与数据流并行运行,已在应用程序中创建了一些VPC网络,并且防火墙规则未正确配置。

这种情况类似于文档( The VPC network used for your job might be missing )中描述的情况。规则存在但配置不正确

谢谢!

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...