问题描述
问题
我正在编写一个GCP云函数,该函数将从pubsub消息中获取输入ID,进行处理,并将表输出到BigQuery。
代码如下:
from __future__ import absolute_import
import base64
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import os
def processing_data_function():
# do stuff and return desired data
def create_data_from_id():
# take scrapinghub's job id and extract the data through api
def run(event,context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
# Take pubsub message and also Scrapinghub job's input id
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
agrv = ['--project=project-name','--region=us-central1','--runner=DataflowRunner','--temp_location=gs://temp/location/','--staging_location=gs://staging/location/']
p = beam.Pipeline(options=PipelineOptions(agrv))
(p
| 'Read from Scrapinghub' >> beam.Create(create_data_from_id(pubsub_message))
| 'Trim b string' >> beam.FlatMap(processing_data_function)
| 'Write Projects to BigQuery' >> beam.io.WritetoBigQuery(
'table_name',schema=schema,# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED,write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
run()
请注意,有2个函数create_data_from_id
和processing_data_function
处理来自Scrapinghub(一个用于刮擦的刮擦站点)的数据,它们很长,因此我不想在这里包括它们。它们也与该错误无关,因为如果我从云外壳运行该代码并改为使用argparse.ArgumentParser()
传递参数,则此代码将起作用。
关于我的错误,虽然部署代码没有问题,并且pubsub消息可以成功触发该功能,但数据流作业失败并报告了此错误:
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",line 279,in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 275,in loads
return load(file,ignore,**kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 270,in load
return Unpickler(file,ignore=ignore,**kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 472,in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 826,in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
During handling of the above exception,another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",line 649,in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",line 179,in execute
op.start()
File "apache_beam/runners/worker/operations.py",line 662,in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py",line 664,line 665,line 284,in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py",line 290,line 611,in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py",line 616,in apache_beam.runners.worker.operations.DoOperation.setup
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",line 283,in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
我尝试过的
鉴于我可以从云外壳运行相同的管道,但是使用参数解析器而不是指定选项,我认为选项说明的方式是问题所在。因此,我尝试了不同的选项组合,无论有无--save_main_session
,--staging_location
,--requirement_file=requirements.txt
,--setup_file=setup.py
...他们都或多或少地报告了同样的问题,所有莳萝都不知道该选择哪个模块。在指定save_main_session
的情况下,无法启动主会话。在指定了require_file和setup_file的情况下,甚至没有成功创建作业,因此我可以省去查找其错误的麻烦。我的主要问题是我不知道这个问题是从哪里来的,因为我以前从未使用过莳萝,为什么从shell和云函数运行管道有何不同?有人有线索吗?
谢谢
解决方法
您也可以尝试将最后一部分修改为,并测试以下各项是否有效:
if __name__ == "__main__":
...
此外,请确保您在正确的文件夹中执行脚本,这可能与文件的命名或目录中的位置有关。
请考虑以下来源,您可能会有所帮助:Source 1,Source 2
我希望这些信息对您有帮助。
,您可能正在使用gunicorn在Cloud Run上启动应用程序(作为标准做法),例如:
from django.http import JsonResponse
data = {'machines': machines}
return JsonResponse(data)
我也遇到了同样的问题,并且找到了一种解决方法,可以在不使用gunicorn的情况下启动该应用程序:
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
可能是因为gunicorn跳过了 main 上下文并直接启动main:app对象。我不知道如何使用Gunicorn进行修复。
===附加说明===
我找到了一种使用金枪鱼的方法。
- 将一个函数(启动管道)移动到另一个模块,例如
CMD exec python3 main.py
。
df_pipeline/pipe.py
.
├── df_pipeline
│ ├── __init__.py
│ └── pipe.py
├── Dockerfile
├── main.py
├── requirements.txt
└── setup.py
- 在
# in main.py import df_pipeline as pipe result = pipe.preprocess(....)
所在的目录中创建setup.py
main.py
- 在
# setup.py import setuptools setuptools.setup( name='df_pipeline',install_requires=[],packages=setuptools.find_packages(include=['df_pipeline']),)
中将管道选项setup_file
设置为./setup.py
。