问题描述
https://kedro.readthedocs.io/en/stable/kedro.pipeline.node.Node.html#kedro.pipeline.node.Node.inputs
如何将变量作为节点输入传递给它?
**inputs**
Return node inputs as a list,in the order required to bind them properly to the node’s function. If the node’s function contains kwargs,then kwarg inputs are sorted alphabetically (for python 3.5 deterministic behavior).
解决方法
使用装饰器工厂功能可以解决此问题。在此示例中使用python3。
您可以从 funtools 包
中导入包装装饰器
from functools import wraps
def deco_factory_df_params(
*deco_args,**deco_kwargs,) -> Callable:
def deco_fn_df_params(
func: Callable,) -> Callable:
@wraps(func)
def add_df_params(*args,**kwargs):
kwargs = {**kwargs,**deco_kwargs}
return func(*args,*deco_args,**kwargs)
return add_df_params
return deco_fn_df_params
,然后在调用kedro管道中的节点时将您选择的关键字参数传递给该装饰工厂函数。 例如。
node(
func=deco_factory_df_params(
parameter1="value1",parameter2="value2",parameter3="value3",)(your_function_name),inputs="data_input1",outputs=None,name="name_of_node",),
您的节点(功能)定义可能如下所示
def your_function_name(
df_input: pyspark.sql.DataFrame,parameter1: str,parameter2: str,parameter3: str,**kwargs,# this is important to be here to avoid error
) -> None:
function_code_here
return None
@mediumnok:我希望这能解决您的问题。
,您将要使用node
内部可用的kedro.pipeline
函数。您可以使用字典将kwargs传递给该函数。
from kedro.pipeline input Pipeline,node
def my_kwarg_fun(**kwargs):
print(kwargs)
def create_pipeline():
return Pipeline([
node(
my_kwarg_fun,inputs={
"my_kwarg": "example_my_kwarg"
},outputs=None
)
])
# This should print
# {'example_my_kwarg': ...}