由于Scrapinghub客户端库不接受ValueProvider,因此无法创建数据流模板

问题描述

我正在尝试创建一个数据流模板,该模板可以从由pubsub消息触发的云函数调用。 pubsub消息将作业ID从Scrapinghub(用于刮板刮板的平台)发送到云函数,该云函数触发输入其为作业ID的数据流模板,并向BigQuery输出相应的数据。此设计的所有其他步骤均已完成,但由于Scrapinghub的客户端库和apache梁之间可能不兼容,我无法创建模板。

代码

from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider


class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
    parser.add_value_provider_argument('--input')
    parser.add_value_provider_argument('--output',type=str)


class IngestionBQ:
    def __init__(self): pass

    @staticmethod
    def parse_method(item):
        dic = {k: item[k] for k in item if k not in [b'_type',b'_key']}
        new_d = {}
        for key in dic:
            try: 
                new_d.update({key.decode("utf-8"): dic[key].decode("utf-8")})
            except AttributeError:
                new_d.update({key.decode("utf-8"): dic[key]})
        yield new_d          


class ShubConnect():
    def __init__(self,api_key,job_id):
        self.job_id = job_id
        self.client = ScrapinghubClient(api_key)

    def get_data(self):
        data = []
        item = self.client.get_job(self. job_id)
        for i in item.items.iter():
            data.append(i)
        return data


def run(argv=None,save_main_session==True):
    """The main function which creates the pipeline and runs it."""
    data_ingestion = IngestionBQ()
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    api_key = os.environ.get('api_key')
    user_options = pipeline_options.view_as(UserOptions)
    (p
        | 'Read Data from Scrapinghub' >> beam.Create(ShubConnect(api_key,user_options.input).get_data())
        | 'Trim b string' >> beam.FlatMap(data_ingestion.parse_method)
        | 'Write Projects to BigQuery' >> beam.io.WritetoBigQuery(
                user_options.output,schema=schema,# Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED,write_disposition=beam.io.BigQuerydisposition.WRITE_EMPTY)
     )
    p.run()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

然后使用此命令在云外壳中部署模板:

python main.py 
--project=project-name 
--region=us-central1 
--runner=DataflowRunner  
--temp_location gs://temp/location/
--template_location gs://templates/location/ 

出现错误

Traceback (most recent call last):
  File "main.py",line 69,in <module>
    run()
  File "main.py",line 57,in run
    | 'Write Projects to BigQuery' >> beam.io.WritetoBigQuery(
  File "main.py",line 41,in get_data
    item = self.client.get_job(self. job_id)
  File "/home/user/data-flow/venv/lib/python3.7/site-packages/scrapinghub/client/__init__.py",line 99,in get_job
    project_id = parse_job_key(job_key).project_id
  File "/home/user/data-flow/venv/lib/python3.7/site-packages/scrapinghub/client/utils.py",line 60,in parse_job_key
    .format(type(job_key),repr(job_key)))
ValueError: Job key should be a string or a tuple,got <class 'apache_beam.options.value_provider.RuntimeValueProvider'>: <apache_beam.options.value_provider.RuntimeValueProvider object at 0x7f1
4760a3630>

因此,在此之前,我成功创建了一个模板,但没有使用parser.add_value_provider_argument,而是使用了parser.add_argument。但是,尽管可以创建模板,但由于parser.add_argument不支持运行时参数,因此无法运行。但是,不仅可以使用parser.add_argument创建模板,还可以使用parser.add_argument从云外壳运行管道。为什么Scrapinghub的客户端API没有通过parser.add_argument而是通过parser.add_value_provider_argument引发错误?两者之间在程序上的根本区别是什么?而且,当然,如何仍然可以使用ValueProvider参数创建此模板?

非常感谢。

编辑

阅读文档后,我了解到发生了错误,因为不支持非I / O模块的ValueProvider对象。参考:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#python_5

解决方法

阅读文档后,我了解到发生了错误,因为不支持非I / O模块的ValueProvider对象。参考:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#python_5

因此,要实现我需要做的事情,我可以切换到Java SDK或提出另一个想法。但是,直到非I / O模块支持ValueProvider为止,这条路才是死路一条。