Kedro: ValueError: 管道不包含名为 ['preprocess_companies_node'] 的节点

问题描述

与前面描述的 question 类似,我遵循了 spaceflights tutorial,在 create pipeline 步骤,我在运行 int 时遇到以下错误

kedro run --node=preproces_companies_node

相关文件按照教程中的说明指定

  • src/kedro_tutorial/pipelines/data_processing/pipeline.py
ValueError: Pipeline does not contain nodes named ['preprocess_companies_node'].
  • src/kedro_tutorial/pipelines/data_processing/nodes.py
from kedro.pipeline import Pipeline,node

from .nodes import preprocess_companies,preprocess_shuttles

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess_companies,inputs="companies",outputs="preprocessed_companies",name="preprocess_companies_node",),node(
                func=preprocess_shuttles,inputs="shuttles",outputs="preprocessed_shuttles",name="preprocess_shuttles_node",]
    )
  • src/kedro_tutorial/pipeline_registry.py
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data,with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data,with `price` converted to a float and `d_check_complete`,`moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles

我确保我已经注册一个 from typing import Dict from kedro.pipeline import Pipeline from kedro_tutorial.pipelines import data_processing as dp def register_pipelines() -> Dict[str,Pipeline]: """Register the project's pipeline. Returns: A mapping from a pipeline name to a ``Pipeline`` object. """ data_processing_pipeline = dp.create_pipeline() return { "__default__": data_processing_pipeline,"dp": data_processing_pipeline,} 管道并且我的节点名称与命令运行时完全一致 __default__

我的 Kedro 版本是 0.16.6,python 版本是 3.7.10

知道我在这里做错了什么吗?

谢谢。

解决方法

问题是您在使用 0.17.3+ 时遵循了版本 kedro==0.16.6 的教程。这是一个容易犯的错误,不要担心。 pipeline_registry.py 模块是在 0.17.3 中引入的。您可以选择升级到最新的 kedro 版本,或者将您的管道注册到名为 hooks.py 而不是 pipeline_registry.py 的模块中。

# src/<project_name>/hooks.py
"""Project hooks."""
from typing import Any,Dict,Iterable,Optional

from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal

from sixteen.pipelines import data_engineering as de
from sixteen.pipelines import data_science as ds


class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str,Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """
        data_engineering_pipeline = de.create_pipeline()
        data_science_pipeline = ds.create_pipeline()

        return {
            "de": data_engineering_pipeline,"ds": data_science_pipeline,"__default__": data_engineering_pipeline + data_science_pipeline,}

    @hook_impl
    def register_config_loader(self,conf_paths: Iterable[str]) -> ConfigLoader:
        return ConfigLoader(conf_paths)

    @hook_impl
    def register_catalog(
        self,catalog: Optional[Dict[str,Dict[str,Any]]],credentials: Dict[str,Any]],load_versions: Dict[str,str],save_version: str,journal: Journal,) -> DataCatalog:
        return DataCatalog.from_config(
            catalog,credentials,load_versions,save_version,journal
        )


project_hooks = ProjectHooks()

您可以通过针对此版本运行 kedro new 命令为自己生成此版本的完整示例。

# these bash two commands are safe to run outside of a virtual environment
# pipx creates the virtual environment for you
pip install pipx
pipx run --spec kedro==0.16.6 kedro new

你的其余代码在我看来是有效的 0.16.6 kedro。将 pipeline_registry 移入 hooks 后,您可以确认它与 kedro pipeline list 命令一起使用,以确保 kedro 获取您的管道代码。