Prefect:如何基于从参数派生的任务列表强制性地创建任务

问题描述

我试图基于列表来强制性地定义任务。挑战在于该列表应基于Prefect参数。

下面是我尝试过的代码,但显然它不起作用,因为task_dependency_pairs是任务,而不是列表。

如何在不破坏Parameter任务与其他动态生成的任务之间的依赖关系的情况下使它工作?

from prefect import task,Flow,Parameter,Task
import time

@task
def task_dependency_pairs(param):
    return [
    ('task 1',f'{param}A',''),('task 2',f'{param}B','task 1'),('task 3',f'{param}C','task 1')]

class Task_class(Task):

    def run(self,**kwarg):   
        time.sleep(5)
        print(f"This task {kwarg['task_name']} does a lot of things with {kwarg.get('calc_value','')}.")

for task_name,dependency in task_dependency_pairs:
    globals()[task_name] = type(task_name,(Task_class,),{"__module__": __name__})

with Flow("my_process") as flow:

    param = Parameter("param",default="default_param")
    task_dependency_pairs_list = task_dependency_pairs(param)
    for task_name,calc_value,dependency in task_dependency_pairs_list:   # This won't work
        task_instance = globals()[task_name](name=task_name)
        flow.add_task(task_instance(task_name = task_name,calc_value = calc_value))

    for task_name,dependency in task_dependency_pairs_list:  # This won't work
        if len(dependency) >0:
            flow.get_tasks(name=task_name)[0].set_upstream(flow.get_tasks(name=dependency)[0])

flow.visualize()

解决方法

尝试在省长流程中动态创建任务是最好的管理方式via the mapping functionality.

但是,映射仅会在流程运行期间从可迭代对象生成任务。它不会随意调整所生成任务的依赖性;它们都共享主要mapped任务上定义的依赖项。

但是,如果要在运行时生成流(具有程序依赖性),我想到的唯一方法是创建一个任务,该任务创建一个流并立即运行它。

查找流程的方式是:

...

@task
def run_flow(inputs):
    with Flow("subflow") as sub_flow:
        for (name,calc_value,dependency) in inputs:
            inst = Task_class(name=name)(task_name=name,calc_value=calc_value)
            sub_flow.add_task(inst)
            if dependency:
                inst.set_upstream(sub_flow.get_tasks(name=dependency)[0])

    sub_flow.run()

with Flow("my_process") as flow:
    param = Parameter("param",default="default_param")
    task_dependency_pairs_list = task_dependency_pairs(param)
    run_flow(task_dependency_pairs_list)