在 Kedro 中等待节点完成

问题描述

我在 Kedro 中有一个如下所示的管道:

from kedro.pipeline import Pipeline,node
from .nodes import *

def foo():
    return Pipeline([
        node(a,inputs=["train_x","test_x"],outputs=dict(bar_a="bar_a"),name="A"),node(b,outputs=dict(bar_b="bar_b"),name="B"),node(c,outputs=dict(bar_c="bar_c"),name="C"),node(d,outputs=dict(bar_d="bar_d"),name="D"),])

节点 A、B 和 C 不是很占用资源,但它们需要一段时间,所以我想并行运行它们,另一方面,节点 D 几乎占用了我所有的内存,它会如果它与其他节点一起执行,则失败。有没有办法告诉 Kedro 在执行节点 D 之前等待 A、B 和 C 完成并保持代码井井有条?

解决方法

Kedro 根据不同节点的输入/输出之间的相互依赖性来确定执行顺序。在您的情况下,节点 D 不依赖于任何其他节点,因此无法保证执行顺序。同样,如果使用并行运行器,则无法确保节点 D 不会与 A、B 和 C 并行运行。

也就是说,有几种解决方法可以用来实现特定的执行顺序。

1 [Preferred] 单独运行节点

您可以这样做:

kedro run --parallel

这可以说是首选解决方案,因为它不需要更改代码(如果您曾经在不同的机器上运行相同的管道,这很好)。如果您希望节点 D 仅在 A、B 和 C 成功时运行,您可以执行 kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D 而不是 &&。如果运行逻辑变得更复杂,您可以将其存储在 Makefile/bash 脚本中。

2 使用虚拟输入/输出

您还可以通过引入虚拟数据集来强制执行顺序。类似的东西:

;

空列表可以用于虚拟数据集。底层函数还必须返回/获取额外的参数。

这种方法的优点是 def foo(): return Pipeline([ node(a,inputs=["train_x","test_x"],outputs=[dict(bar_a="bar_a"),"a_done"],name="A"),node(b,outputs=[dict(bar_b="bar_b"),"b_done"],name="B"),node(c,outputs=[dict(bar_c="bar_c"),"c_done"],name="C"),node(d,"test_x","a_done","b_done",outputs=dict(bar_d="bar_d"),name="D"),]) 将立即产生所需的执行逻辑。缺点是污染了节点的定义和底层功能。

如果你走这条路,你还必须决定是否要将虚拟数据集存储在数据目录中(污染更多,但允许自己运行节点 D)或不(节点 D 不能)自行运行)。


相关讨论 [1,2]