使用侧面输出,使用DataflowApache Beam将数据从具有不同架构的同一个文件插入BigQuery的单独表中

问题描述

我有这个要求,需要在其中开发审核机制。例如,有一个JSON文件名emp.json:

{
    "magic": "atMSG","type": "DT","headers": null,"messageschemaid": null,"messageschema": null,"message": {
        "data": {
            "ID": "data_in_quotes","NAME": "data_in_quotes","SALARY": "data_in_quotes"
        },"beforeData": null,"headers": {
            "operation": "INSERT","changeSequence": "20200822230048000000000017887787417","timestamp": "2020-08-22T23:00:48.000","transactionId": "some_id"
        }
    }
}

我需要先将数据插入Bigquery表: Staging.emp

ID,NAME,SALARY

1,ABC,20000

2,XYZ,30000

我还需要将ID,时间戳(加载时间戳)和文件名插入单独的表中: misc-dataset.Audit_table

ID,时间戳,文件

1,28-08-2020 22:55,emp.json

2,28-08-2020 22:55,emp.json

到目前为止,我正在插入文件进行测试。如果解决了,我会用

| "WritetoBigQuery" >> beam.io.WritetoBigQuery(
                        "{0}:{1}.emp_data".format(projectId,datasetId),schema=table_schema,#write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED
                        )

使用下面的代码,我无法通过侧面输出获得该信息。

class InsertIntoBQAndAudit(beam.DoFn):
    
    def process(self,element):
        norm = json_normalize(json.loads(element),max_level=1)
        l = norm["message.data"]
        return l
    
    def process(self,element):
        norm1 = json_normalize(json.loads(element),max_level=1)
        l1 = norm1["message.data"]
        
        l1['ID']
        l1['TIMESTAMP':datetime.Now()]
        l1.[m.Metadata_list for m in gcs.match(['gs://ti-project-1/emP*'])]
        return [beam.pvalue.TaggedOutput('Audit',l1)]

options = PipelineOptions()
p = beam.Pipeline(options=options)

data_from_source = (p
                    | "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
                     | "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit',main='Load')
                   )

Audit_col = data_from_source.Audit
Load_col = data_from_source.Load

Load_col | "Write to Actual Table" >> WritetoText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WritetoText("gs://ti-project-1/misc-temp/audit_out")

p.run()

解决方法

DoFn不能有两种处理方法。而是创建两个单独的DoFns,并将每个DoFns应用​​于相同的PCollection,例如

class Audit(beam.DoFn):
  def process(self,l1)
    l1['ID']
    l1['TIMESTAMP':datetime.now()]
    # This line above seems to have gotten mangled in transit...
    l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
    yield l1

with beam.Pipeline(options=options) as p:
  data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
  parsed_data = data_from_source | beam.Map(
      lambda element: json_normalize(json.loads(element),max_level=1)["message.data"])
  
  Load_col = parsed_data
  Audit_col = parsed_data | beam.ParDo(Audit)
  ...

您可以为此使用多输出DoFn,看起来像

class ParseAndAudit(beam.DoFn):
    
    def process(self,element):
        norm = json_normalize(json.loads(element),max_level=1)
        l = norm["message.data"]
        yield l  # emit the main output
    
        # continue in the same process method
        l1 = l
        
        l1['ID']
        l1['TIMESTAMP':datetime.now()]
        l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
        yield beam.pvalue.TaggedOutput('Audit',l1)

然后像上面一样使用它,但是编写和维护起来可能更复杂。