问题描述
这是怎么回事,
首先,我创建了这个 main.py
,它基本上将 CSV 解析为数据帧并记录结果。
import argparse
import logging
import apache_beam as beam
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions
class ReadInput(beam.PTransform):
def __init__(self,input_path: str):
self.input_path = input_path
def expand(self,pcoll):
df = pcoll | "Read CSV" >> read_csv(self.input_path)
df = df.rename({"_id": "id"},axis="columns")
return to_pcollection(df)
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",dest="input",default=""gs://trending-test/view_input_minimal.csv"",)
kNown_args,pipeline_args = parser.parse_kNown_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
p | ReadInput(kNown_args.input) | beam.Map(
lambda x: logging.info(f"input is {x}")
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
我用这个命令运行,一切正常。
python -m main --runner dataflow --project $PROJECT --temp_location gs://$BUCKET/tmp --staging_location gs://$BUCKET/staging
然后我开始重构,
我将 Class ReadInput
移动到如下所示的子目录中并创建了一个 setup.py
。
main.py
setup.py
__init__.py
pipelines/
> __init__.py
> read/
> __init__.py
> ReadInput.py
所以现在main.py
看起来像这样
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions
from pipelines.read.ReadInput import ReadInput
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",default="gs://trending-test/view_input_minimal.csv",pipeline_args = parser.parse_kNown_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
p | ReadInput(kNown_args.input) | beam.Map(
lambda x: logging.info(f"input is {x}")
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
带有 setup.py
如下,
import setuptools
setuptools.setup(
name="trending-python",version="0.1.0",install_requires=[],packages=setuptools.find_packages(),)
现在我使用添加了 --setup_file
参数的相同命令运行,
python -m main --runner dataflow --project $PROJECT --temp_location gs://$BUCKET/tmp --staging_location gs://$BUCKET/staging --setup_file ./setup.py
管道冻结了(现在已经运行了 30 分钟)。见下文。
谁能指出为什么添加 --setup_file ./setup.py
会导致此问题?
附言所有代码都可以使用直接运行器正常运行
这里是依赖项,
python = "^3.8"
apache-beam = {extras = ["gcp"],version = "^2.28.0"}
pandas = "^1.2.3"
解决方法
经过一整天的调试,我终于让它工作了。
对于这个项目,我使用 poetry 进行包管理,而在我的 pyproject.toml
中我有
[tool.poetry]
name = "trending-demo"
version = "0.1.0"
description = ""
...
导致以下错误(显示在工作启动日志中),
" poetry.core.masonry.utils.module.ModuleOrPackageNotFound: No file/folder found for package trending-demo"
所以我所做的是将 trending-demo
重命名为 pipeline
(以匹配我的目录),现在我的 pyproject.toml
变成了
[tool.poetry]
name = "pipelines"
version = "0.1.0"
description = ""
并且我还将 def run(argv=None) ...
移动到名为 pipelines.py
的“管道”目录中的一个新文件中。
所以现在我的项目结构是,
main.py
setup.py
__init__.py
pipelines/
> __init__.py
> pipelines.py # add this file
> read/
> __init__.py
> ReadInput.py
pipeline.py
在哪里
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions
from pipelines.compute.group_inputs import DataType,GroupInputs
from pipelines.read.read_input import ReadInput
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",dest="input",default="gs://trending-test/view_input_minimal.csv",)
known_args,pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
p | ReadInput(known_args.input) | GroupInputs(DataType.VIEWS) | beam.Map(
lambda x: logging.info(f"input is {x}")
)
和main.py
变成
import logging
from pipelines import pipelines
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
pipelines.run()
运行项目,现在 Google Dataflow 中的管道可以正常工作。
感谢 @robertwb 指出有一个工作启动日志可以显示软件包安装过程中的错误。
附言可以通过将 logName=("projects/{your-project}/logs/dataflow.googleapis.com%2Fworker-startup")
添加到日志资源管理器中的查询构建器来查看日志。