在Google Dataflow模板中将“日期”作为运行时参数传递

问题描述

我目前正在尝试生成Google Dataflow自定义模板,该模板在运行时会调用API,并将结果写入BigQuery表。

但是我遇到的问题是该API要求传入日期参数​​'YYYY-MM-DD'才能使其正常工作。

不幸的是,似乎在构造模板时,数据流要求您将ValueProvider(as described here)用于与作业运行时间(即今天的日期)相关的任何变量。否则,它将使用与模板最初创建时相同的日期进行操作。 (即dt.date.today()等-h/t to this post

因此,使用我所获得的代码,有什么方法可以生成模板,以便它可以在运行时正确地利用今天的日期作为参数,而不是无限期地使用相同的静态日期-或当前情况-根本不转换为模板。

from __future__ import print_function,absolute_import
import argparse
import logging
import sys

import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import PipelineOptions,GoogleCloudOptions,StandardOptions,SetupOptions
from apache_beam.options.value_provider import ValueProvider

import datetime as dt
from datetime import timedelta,date
import time
import re

logging.getLogger().setLevel(logging.INFO)

class GetAPI():
  def __init__(self,data={},date=None):
    self.num_api_errors = Metrics.counter(self.__class__,'num_api_errors')
    self.data = data
    self.date = date

  def get_job(self):
    import requests
    endpoint = f'https://www.rankranger.com/api/v2/?rank_stats&key={self.data.api_key}&date={self.date}'\
               f'&campaign_id={self.data.campaign}&se_id={self.data.se}&domain={self.data.domain}&output=json'
    logging.info("Endpoint: {}".format(str(endpoint)))
    try:
      res = requests.get(endpoint)
      if res.status_code == 200:
        # logging.info("Reponse: {}".format(str(res.text)))
        json_data = res.json()
        ## Store the API response
        if 'result' in json_data:
          response = json_data.get('result')
          return response

    except Exception as e:
      self.num_api_errors.inc()
      logging.error(f'Exception: {e}')
      logging.error(f'Extract error on "%s"','Rank API')


def format_dates(api):
  api['date'] = dt.datetime.strptime(api['date'],"%m/%d/%Y").strftime("%Y-%m-%d")
  return api


# Class to pass in date generated at runtime to template
class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls,parser):
        ## Special runtime argument e.g. date
        parser.add_value_provider_argument('--date',type=str,default=(dt.date.today()).strftime("%Y-%m-%d"),help='Run date in YYYY-MM-DD format.')


def run(argv=None):
    """
      Main entry point; defines the static arguments to be passed in.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--api_key',default=API_KEY,help='API key for Rank API.')
    parser.add_argument('--campaign',default=CAMPAIGN,help='Campaign ID for Rank API')
    parser.add_argument('--se',default=SE,help='Search Engine ID for Rank API')
    parser.add_argument('--domain',default=DOMAIN,help='Domain for Rank API')
    parser.add_argument('--dataset',default=DATASET,help='BigQuery Dataset to write tables to. Must already exist.')
    parser.add_argument('--table_name',default=TABLE_NAME,help='The BigQuery table name. Should not already exist.')
    parser.add_argument('--project',default=PROJECT,help='Your GCS project.')
    parser.add_argument('--runner',default="DataflowRunner",help='Type of DataFlow runner.')

    args,pipeline_args = parser.parse_kNown_args(argv)

    # Create and set your PipelineOptions.
    options = PipelineOptions(pipeline_args)
    user_options = options.view_as(UserOptions)

    pipeline = beam.Pipeline(options=options)

    # Gets data from Rank Ranger API
    api = (
        pipeline
        | 'create' >> beam.Create(GetAPI(data=args,date=user_options.date).get_job())
        | 'format dates' >> beam.Map(format_dates)
    )

    # Write to bigquery based on specified schema
    BQ = (api | "WritetoBigQuery" >> beam.io.WritetoBigQuery(args.table_name,args.dataset,SCHEMA))

    pipeline.run()


if __name__ == '__main__':
    run()

错误消息中可以看出,它没有传递整齐的格式“ YYYY-MM-DD”参数,而是传递了完整的ValueProvider对象,该对象正在停止API调用并返回nonetype错误

(Apache) C:\Users\user.name\Documents\Alchemy\Dataflow\production_pipeline\templates>python main.py --runner DataflowRunner --project <PROJECT> --staging_location gs://<STORAGE-BUCKET>/staging --temp_location gs://<STORAGE-BUCKET>/temp --template_location gs://<STORAGE-BUCKET>/template/<TEMPLATE> --region europe-west2
INFO:root:Endpoint: https://www.rankranger.com/api/v2/?rank_stats&key=<API_KEY>&date=RuntimeValueProvider(option: date,type: str,default_value: '2020-08-25')&campaign_id=<CAMPAIGN>&se_id=<SE>&domain=<DOMAIN>&output=json
Traceback (most recent call last):
  File "main.py",line 267,in <module>
    run()
  File "main.py",line 257,in run
    | 'format dates' >> beam.Map(format_dates)
  File "C:\Users\user.name\Anaconda3\envs\Apache\lib\site-packages\apache_beam\transforms\core.py",line 2590,in __init__
    self.values = tuple(values)
TypeError: 'nonetype' object is not iterable

任何帮助将不胜感激!

解决方法

您的诊断正确。您应该考虑迁移到Flex Templates,以解决此(以及其他)问题并提供更大的灵活性。