问题描述
我目前正在尝试生成Google Dataflow自定义模板,该模板在运行时会调用API,并将结果写入BigQuery表。
但是我遇到的问题是该API要求传入日期参数'YYYY-MM-DD'才能使其正常工作。
不幸的是,似乎在构造模板时,数据流要求您将ValueProvider(as described here)用于与作业运行时间(即今天的日期)相关的任何变量。否则,它将使用与模板最初创建时相同的日期进行操作。 (即dt.date.today()等-h/t to this post)
因此,使用我所获得的代码,有什么方法可以生成模板,以便它可以在运行时正确地利用今天的日期作为参数,而不是无限期地使用相同的静态日期-或当前情况-根本不转换为模板。
from __future__ import print_function,absolute_import
import argparse
import logging
import sys
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import PipelineOptions,GoogleCloudOptions,StandardOptions,SetupOptions
from apache_beam.options.value_provider import ValueProvider
import datetime as dt
from datetime import timedelta,date
import time
import re
logging.getLogger().setLevel(logging.INFO)
class GetAPI():
def __init__(self,data={},date=None):
self.num_api_errors = Metrics.counter(self.__class__,'num_api_errors')
self.data = data
self.date = date
def get_job(self):
import requests
endpoint = f'https://www.rankranger.com/api/v2/?rank_stats&key={self.data.api_key}&date={self.date}'\
f'&campaign_id={self.data.campaign}&se_id={self.data.se}&domain={self.data.domain}&output=json'
logging.info("Endpoint: {}".format(str(endpoint)))
try:
res = requests.get(endpoint)
if res.status_code == 200:
# logging.info("Reponse: {}".format(str(res.text)))
json_data = res.json()
## Store the API response
if 'result' in json_data:
response = json_data.get('result')
return response
except Exception as e:
self.num_api_errors.inc()
logging.error(f'Exception: {e}')
logging.error(f'Extract error on "%s"','Rank API')
def format_dates(api):
api['date'] = dt.datetime.strptime(api['date'],"%m/%d/%Y").strftime("%Y-%m-%d")
return api
# Class to pass in date generated at runtime to template
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
## Special runtime argument e.g. date
parser.add_value_provider_argument('--date',type=str,default=(dt.date.today()).strftime("%Y-%m-%d"),help='Run date in YYYY-MM-DD format.')
def run(argv=None):
"""
Main entry point; defines the static arguments to be passed in.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--api_key',default=API_KEY,help='API key for Rank API.')
parser.add_argument('--campaign',default=CAMPAIGN,help='Campaign ID for Rank API')
parser.add_argument('--se',default=SE,help='Search Engine ID for Rank API')
parser.add_argument('--domain',default=DOMAIN,help='Domain for Rank API')
parser.add_argument('--dataset',default=DATASET,help='BigQuery Dataset to write tables to. Must already exist.')
parser.add_argument('--table_name',default=TABLE_NAME,help='The BigQuery table name. Should not already exist.')
parser.add_argument('--project',default=PROJECT,help='Your GCS project.')
parser.add_argument('--runner',default="DataflowRunner",help='Type of DataFlow runner.')
args,pipeline_args = parser.parse_kNown_args(argv)
# Create and set your PipelineOptions.
options = PipelineOptions(pipeline_args)
user_options = options.view_as(UserOptions)
pipeline = beam.Pipeline(options=options)
# Gets data from Rank Ranger API
api = (
pipeline
| 'create' >> beam.Create(GetAPI(data=args,date=user_options.date).get_job())
| 'format dates' >> beam.Map(format_dates)
)
# Write to bigquery based on specified schema
BQ = (api | "WritetoBigQuery" >> beam.io.WritetoBigQuery(args.table_name,args.dataset,SCHEMA))
pipeline.run()
if __name__ == '__main__':
run()
从错误消息中可以看出,它没有传递整齐的格式“ YYYY-MM-DD”参数,而是传递了完整的ValueProvider对象,该对象正在停止API调用并返回nonetype错误。
(Apache) C:\Users\user.name\Documents\Alchemy\Dataflow\production_pipeline\templates>python main.py --runner DataflowRunner --project <PROJECT> --staging_location gs://<STORAGE-BUCKET>/staging --temp_location gs://<STORAGE-BUCKET>/temp --template_location gs://<STORAGE-BUCKET>/template/<TEMPLATE> --region europe-west2
INFO:root:Endpoint: https://www.rankranger.com/api/v2/?rank_stats&key=<API_KEY>&date=RuntimeValueProvider(option: date,type: str,default_value: '2020-08-25')&campaign_id=<CAMPAIGN>&se_id=<SE>&domain=<DOMAIN>&output=json
Traceback (most recent call last):
File "main.py",line 267,in <module>
run()
File "main.py",line 257,in run
| 'format dates' >> beam.Map(format_dates)
File "C:\Users\user.name\Anaconda3\envs\Apache\lib\site-packages\apache_beam\transforms\core.py",line 2590,in __init__
self.values = tuple(values)
TypeError: 'nonetype' object is not iterable
任何帮助将不胜感激!
解决方法
您的诊断正确。您应该考虑迁移到Flex Templates,以解决此(以及其他)问题并提供更大的灵活性。