问题描述
我们尝试从GBQ加载表,进行转换并移回GBQ。但这给我一个错误
如
Workflow Failed. Causes: S01:Read data from BQ+Preprocess data+Write data to BQ/Write/NativeWrite Failed.,BigQuery import job Error while reading data,error message: JSON table encountered too many errors,giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.,error: Error while reading data,error message: JSON processing encountered too many errors,giving up. Rows: 1; errors: 1; max bad: 0; error percent: 0,error message: JSON parsing error in row starting at position 0: Start of array encountered without start of object.
代码如下:
query = ('SELECT * FROM `<project>.<Dataset>.<table>` limit 100')
bq_source = beam.io.BigQuerySource(query=query,use_standard_sql=True)
table_schema = "Col1:INTEGER,col2:INTEGER,col3:INTEGER"
p = beam.Pipeline(options=pipeline_options)
(p
| "Read data from BQ" >> beam.io.Read(bq_source)
| "Preprocess data" >> beam.ParDo(transferinput(project=<project_id>,bucket_name=<bucket>))
#| 'Write to Cloud Storage' >> beam.io.WritetoText(bucket>+ '/output_rec.txt')
|"Write data to BQ" >> beam.io.WritetoBigQuery(table=<table_name_new>,dataset=<Dataset>,project=<project_id>,schema=table_schema,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED,write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE)
)
class transferinput(beam.DoFn):
def __init__(self,project=None,bucket_name='bucket_name'):
self._project = project
self._bucket_name = bucket_name
def process(self,element):
""" Format the input to the desired shape"""
df = pd.DataFrame([element],columns=element.keys())
df[features] = df[features].fillna('-1',)
output = df[features]
output = output.to_dict('records')
return [output]
返回格式有问题吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)