有没有办法在我的 python 代码中使用 apache beam

问题描述

下面是我运行管道的python代码

from __future__ import absolute_import
import apache_beam as beam
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from datetime import date
today = date.today()
current_date = today.strftime("%Y%m%d")
def run(argv=None):
    parser = argparse.ArgumentParser()
    kNown_args,pipeline_args = parser.parse_kNown_args(argv)
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    (p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT disTINCT(IF(LENGTH(MOBILE)=10,CONCAT('91',MOBILE),REPLACE(MOBILE,'+91 ','91'))) FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW` WHERE REGEXP_CONTAINS(REGEXP_REPLACE(Mobile,' ',''),r'^(?:(?:\+|0{0,2})91(\s*[\-]\s*)?|[0]?)?[6789]\d{9}$')",use_standard_sql=True))
       | 'read values' >> beam.Map(lambda x: x.values())
       | 'CSV format' >> beam.Map(lambda row:'|'.join ("WRPOOL|5667788|"+ str(column) +'|"'+"Hi,This msg is from Whirlpool DL" + '"' for column in row))
       | 'Write_to_GCS' >> beam.io.WritetoText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+ str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG')
    p.run().wait_until_finish()
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

在此代码中,一旦创建了我的 csv 文件,我还需要创建一个文件。我在我的 csv 文件之后尝试了这个选项,但它不会创建一个文件,而是将 csv 文件名写入其中。

| '创建 .done 文件' >> beam.io.WritetoText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst'+str(current_date),file_name_suffix='.done'))

所以我尝试了作为 beam.Create('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst.done') 的选项

这是一个错误。任何人都可以帮助创建一个文件的选项。

解决方法

我认为没有任何内置方法可以创建空文件。您最好的选择是直接使用 Cloud Storage API 在 WriteToText 转换后在 DoFn 中创建空文件