GCP Dataflow 管道在 DirectRunner 中的运行速度比 DataflowRunner 快

问题描述

我对使用 Dataflow (GCP) 还很陌生。我构建了一个在 DirectRunner 模式下运行比 DataflowRunner 模式更快的管道,我不知道如何改进。管道从 Bigquery 中的多个表中读取数据并返回一个 csv 文件,它接收日期作为执行参数来过滤查询

def get_pipeline_options(pipeline_args):
    pipeline_args = ['--%s=%s' % (k,v) for (k,v) in {
        'project': PROJECT_ID,'region': REGION,'job_name': JOB_NAME,'staging_location': STORAGE + 'STAGING_DIRECTORY','temp_location': STORAGE + 'TEMP_DIRECTORY',}.items()] + pipeline_args
    options = PipelineOptions(pipeline_args)
    return options

class Reader(beam.DoFn):
  import datetime
  def __init__(self,fechaIni,fechaFin):
    self.fechaIni = fechaIni
    self.fechaFin = fechaFin
  
  def process(self,text):
    from google.cloud import bigquery
    from datetime import datetime
    dateIni = self.fechaIni.get()
    dateEnd = self.fechaFin.get()
      
    query = """  
        #A huge query from multiple tables with joins
     """
    client = bigquery.Client()
    query_job = client.query(query)
    result_fields = query_job.result()
    
    return result_fields       
  


class Campaignoptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls,parser):
      parser.add_value_provider_argument('--fechaIni',type=str)
      parser.add_value_provider_argument('--fechaFin',type=str)


def run(argv=None,save_main_session=True):
    """The main function which creates the pipeline and runs it."""
    parser = argparse.ArgumentParser()
 
    parser.add_argument(
      '--output',dest='output',default='gs://mybucket/input_'+datetime.datetime.Now().strftime('%Y%m%d'),help='Output files.')


    kNown_args,pipeline_args = parser.parse_kNown_args(argv)
    pipeline_args.extend([
        '--project=myproject','--staging_location=gs://mybucket','--region=us-central1','--temp_location=gs://gs://mybucket','--job_name=myjob'
    ])
    pipeline_options = PipelineOptions(pipeline_args)

    campaign_options = pipeline_options.view_as(Campaignoptions)

    with beam.Pipeline(options=campaign_options) as pipeline:
        r = (
          pipeline
          | 'Initialize'>> beam.Create([':-)' ])
          | 'Read from BigQuery' >> beam.ParDo(Reader(campaign_options.fechaIni,campaign_options.fechaFin))
          | 'Read values' >> beam.Map(lambda x: x.values())
          | 'CSV format' >> beam.Map(lambda row: ','.join([str(column) for column in row]))
          | 'Write' >>beam.io.WritetoText(num_shards=1,file_path_prefix = kNown_args.output )
        )
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)