数据流从GBQ加载表,转换并加载回BGQ

问题描述

我们尝试从GBQ加载表,进行转换并移回GBQ。但这给我一个错误Workflow Failed. Causes: S01:Read data from BQ+Preprocess data+Write data to BQ/Write/NativeWrite Failed.,BigQuery import job Error while reading data,error message: JSON table encountered too many errors,giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.,error: Error while reading data,error message: JSON processing encountered too many errors,giving up. Rows: 1; errors: 1; max bad: 0; error percent: 0,error message: JSON parsing error in row starting at position 0: Start of array encountered without start of object.

代码如下:

query = ('SELECT * FROM `<project>.<Dataset>.<table>` limit 100')
bq_source = beam.io.BigQuerySource(query=query,use_standard_sql=True)
table_schema = "Col1:INTEGER,col2:INTEGER,col3:INTEGER"
p = beam.Pipeline(options=pipeline_options)
(p
     | "Read data from BQ" >> beam.io.Read(bq_source)
     | "Preprocess data" >> beam.ParDo(transferinput(project=<project_id>,bucket_name=<bucket>)) 
     #| 'Write to Cloud Storage' >> beam.io.WritetoText(bucket>+ '/output_rec.txt')
     |"Write data to BQ" >> beam.io.WritetoBigQuery(table=<table_name_new>,dataset=<Dataset>,project=<project_id>,schema=table_schema,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED,write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE)
     )

class transferinput(beam.DoFn):
    def __init__(self,project=None,bucket_name='bucket_name'): 
        self._project = project
        self._bucket_name = bucket_name  
     
    def process(self,element):
        """ Format the input to the desired shape"""
        df = pd.DataFrame([element],columns=element.keys())        
        df[features] = df[features].fillna('-1',)  
        output = df[features]
        output = output.to_dict('records')
        return [output]

返回格式有问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)