问题描述
我正在尝试创建一个数据流(批处理),该数据流每小时从Google Cloud Storage中读取一个文件,进行解析并将其写入BigQuery表中。该文件是一个.json文件,每行都有一个复杂的json。
我创建了一个简单的管道:
(p
| "Read file" >> beam.io.ReadFromText(cusom_options.file_name)
| "Parse line json" >> beam.Map(parse)
| "Write in BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=cusom_options.table))
解析函数是这样的:
def parse(input_elem):
log = json.loads(input_elem)
result = {}
... # Get some fields from input and add to "result"
return result
该管道可以正常工作,文件大小为100 Mb,行数为70K(每个作业5分钟左右的距离)。但是,当文件增加时,数据流将花费更多的时间(15分钟,200-300 Mb)或未完成并以失败告终(超过1.5 Gb和350K的行)。
我进行了一些测试,当我在函数 parse 中创建了一个json示例,但未使用 input_elem 时,数据流正常运行,并为7中的每个条目创建了一行-8分钟。
我不知道管道的问题在哪里,有人遇到类似的问题吗?
更多信息
- 我正在使用 beam.io.gcp.bigquery.WriteToBigQuery 从BQ表中自动检测架构,但是如果使用 beam.io.WriteToBigQuery ,同样的问题。
- 表名是在数据流之外预先计算的,并且是输入
- 数据流中使用的参数--experiment = use_beam_bq_sink,--subnetwork = MY_SUBNET,--region = MY_REGION
解决方法
我们终于解决了这个问题。与数据流并行运行,已在应用程序中创建了一些VPC网络,并且防火墙规则未正确配置。
这种情况类似于文档( The VPC network used for your job might be missing )中描述的情况。规则存在但配置不正确
谢谢!