问题描述
我有这个要求,需要在其中开发审核机制。例如,有一个JSON文件名emp.json:
{
"magic": "atMSG","type": "DT","headers": null,"messageschemaid": null,"messageschema": null,"message": {
"data": {
"ID": "data_in_quotes","NAME": "data_in_quotes","SALARY": "data_in_quotes"
},"beforeData": null,"headers": {
"operation": "INSERT","changeSequence": "20200822230048000000000017887787417","timestamp": "2020-08-22T23:00:48.000","transactionId": "some_id"
}
}
}
我需要先将数据插入Bigquery表: Staging.emp
ID,NAME,SALARY
1,ABC,20000
2,XYZ,30000
我还需要将ID,时间戳(加载时间戳)和文件名插入单独的表中: misc-dataset.Audit_table
ID,时间戳,文件名
1,28-08-2020 22:55,emp.json
2,28-08-2020 22:55,emp.json
| "WritetoBigQuery" >> beam.io.WritetoBigQuery(
"{0}:{1}.emp_data".format(projectId,datasetId),schema=table_schema,#write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED
)
class InsertIntoBQAndAudit(beam.DoFn):
def process(self,element):
norm = json_normalize(json.loads(element),max_level=1)
l = norm["message.data"]
return l
def process(self,element):
norm1 = json_normalize(json.loads(element),max_level=1)
l1 = norm1["message.data"]
l1['ID']
l1['TIMESTAMP':datetime.Now()]
l1.[m.Metadata_list for m in gcs.match(['gs://ti-project-1/emP*'])]
return [beam.pvalue.TaggedOutput('Audit',l1)]
options = PipelineOptions()
p = beam.Pipeline(options=options)
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
| "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit',main='Load')
)
Audit_col = data_from_source.Audit
Load_col = data_from_source.Load
Load_col | "Write to Actual Table" >> WritetoText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WritetoText("gs://ti-project-1/misc-temp/audit_out")
p.run()
解决方法
DoFn不能有两种处理方法。而是创建两个单独的DoFns,并将每个DoFns应用于相同的PCollection,例如
class Audit(beam.DoFn):
def process(self,l1)
l1['ID']
l1['TIMESTAMP':datetime.now()]
# This line above seems to have gotten mangled in transit...
l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield l1
with beam.Pipeline(options=options) as p:
data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
parsed_data = data_from_source | beam.Map(
lambda element: json_normalize(json.loads(element),max_level=1)["message.data"])
Load_col = parsed_data
Audit_col = parsed_data | beam.ParDo(Audit)
...
您可以为此使用多输出DoFn,看起来像
class ParseAndAudit(beam.DoFn):
def process(self,element):
norm = json_normalize(json.loads(element),max_level=1)
l = norm["message.data"]
yield l # emit the main output
# continue in the same process method
l1 = l
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield beam.pvalue.TaggedOutput('Audit',l1)
然后像上面一样使用它,但是编写和维护起来可能更复杂。