添加“--setup_files setup.py”会导致管道在 Google Dataflow 上运行时冻结

问题描述

这是怎么回事,

首先,我创建了这个 main.py,它基本上将 CSV 解析为数据帧并记录结果。

import argparse
import logging

import apache_beam as beam
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions


class ReadInput(beam.PTransform):
    def __init__(self,input_path: str):
        self.input_path = input_path

    def expand(self,pcoll):
        df = pcoll | "Read CSV" >> read_csv(self.input_path)
        df = df.rename({"_id": "id"},axis="columns")
        return to_pcollection(df)


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",dest="input",default=""gs://trending-test/view_input_minimal.csv"",)

    kNown_args,pipeline_args = parser.parse_kNown_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        p | ReadInput(kNown_args.input) | beam.Map(
            lambda x: logging.info(f"input is {x}")
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

我用这个命令运行,一切正常。

python -m main --runner dataflow --project $PROJECT --temp_location gs://$BUCKET/tmp --staging_location gs://$BUCKET/staging

enter image description here

然后我开始重构,

我将 Class ReadInput 移动到如下所示的子目录中并创建了一个 setup.py

main.py
setup.py
__init__.py
pipelines/
  > __init__.py
  > read/
      > __init__.py
      > ReadInput.py

所以现在main.py看起来像这样

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions

from pipelines.read.ReadInput import ReadInput


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",default="gs://trending-test/view_input_minimal.csv",pipeline_args = parser.parse_kNown_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        p | ReadInput(kNown_args.input) | beam.Map(
            lambda x: logging.info(f"input is {x}")
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

带有 setup.py 如下,

import setuptools

setuptools.setup(
    name="trending-python",version="0.1.0",install_requires=[],packages=setuptools.find_packages(),)

现在我使用添加--setup_file 参数的相同命令运行,

python -m main --runner dataflow --project $PROJECT --temp_location gs://$BUCKET/tmp --staging_location gs://$BUCKET/staging --setup_file ./setup.py

管道冻结了(现在已经运行了 30 分钟)。见下文。

enter image description here

enter image description here

谁能指出为什么添加 --setup_file ./setup.py 会导致此问题?

附言所有代码都可以使用直接运行器正常运行


这里是依赖项,

python = "^3.8"
apache-beam = {extras = ["gcp"],version = "^2.28.0"}
pandas = "^1.2.3"

解决方法

经过一整天的调试,我终于让它工作了。

对于这个项目,我使用 poetry 进行包管理,而在我的 pyproject.toml 中我有

[tool.poetry]
name = "trending-demo"
version = "0.1.0"
description = ""
...

导致以下错误(显示在工作启动日志中),

" poetry.core.masonry.utils.module.ModuleOrPackageNotFound: No file/folder found for package trending-demo"

所以我所做的是将 trending-demo 重命名为 pipeline(以匹配我的目录),现在我的 pyproject.toml 变成了

[tool.poetry]
name = "pipelines"
version = "0.1.0"
description = ""

并且我还将 def run(argv=None) ... 移动到名为 pipelines.py 的“管道”目录中的一个新文件中。

所以现在我的项目结构是,

main.py
setup.py
__init__.py
pipelines/
  > __init__.py
  > pipelines.py # add this file
  > read/
      > __init__.py
      > ReadInput.py

pipeline.py 在哪里

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions

from pipelines.compute.group_inputs import DataType,GroupInputs
from pipelines.read.read_input import ReadInput


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",dest="input",default="gs://trending-test/view_input_minimal.csv",)

    known_args,pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        p | ReadInput(known_args.input) | GroupInputs(DataType.VIEWS) | beam.Map(
            lambda x: logging.info(f"input is {x}")
        )

main.py变成

import logging

from pipelines import pipelines

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    pipelines.run()

运行项目,现在 Google Dataflow 中的管道可以正常工作。


感谢 @robertwb 指出有一个工作启动日志可以显示软件包安装过程中的错误。

附言可以通过将 logName=("projects/{your-project}/logs/dataflow.googleapis.com%2Fworker-startup") 添加到日志资源管理器中的查询构建器来查看日志。