kedro节点输入接受kwargs吗?

问题描述

https://kedro.readthedocs.io/en/stable/kedro.pipeline.node.Node.html#kedro.pipeline.node.Node.inputs

我有一个功能

def函数(** kwargs): 返回

如何将变量作为节点输入传递给它?

**inputs**
Return node inputs as a list,in the order required to bind them properly to the node’s function. If the node’s function contains kwargs,then kwarg inputs are sorted alphabetically (for python 3.5 deterministic behavior).

解决方法

使用装饰器工厂功能可以解决此问题。在此示例中使用python3。

您可以从 funtools

中导入包装装饰器

    from functools import wraps

    def deco_factory_df_params(
        *deco_args,**deco_kwargs,) -> Callable:
        def deco_fn_df_params(
                func: Callable,) -> Callable:

            @wraps(func)
            def add_df_params(*args,**kwargs):
                kwargs = {**kwargs,**deco_kwargs}
                return func(*args,*deco_args,**kwargs)

            return add_df_params

        return deco_fn_df_params

,然后在调用kedro管道中的节点时将您选择的关键字参数传递给该装饰工厂函数。 例如。


    node(
        func=deco_factory_df_params(
            parameter1="value1",parameter2="value2",parameter3="value3",)(your_function_name),inputs="data_input1",outputs=None,name="name_of_node",),

您的节点(功能)定义可能如下所示


    def your_function_name(
        df_input: pyspark.sql.DataFrame,parameter1: str,parameter2: str,parameter3: str,**kwargs,# this is important to be here to avoid error
    ) -> None:
        function_code_here
        
        return None

@mediumnok:我希望这能解决您的问题。

,

您将要使用node内部可用的kedro.pipeline函数。您可以使用字典将kwargs传递给该函数。

from kedro.pipeline input Pipeline,node

def my_kwarg_fun(**kwargs):
    print(kwargs)

def create_pipeline():
    return Pipeline([
        node(
            my_kwarg_fun,inputs={
                "my_kwarg": "example_my_kwarg"
            },outputs=None
        )
    ])
# This should print
# {'example_my_kwarg': ...}