使用Prefect分割任务

问题描述

我需要创建一个可以使用此工作流程处理任务的批处理:

                                             | task 4
                                  | task 3 ->| task 4
                       | task 2 ->           | task 4
                           
                                  | task 3 ->| task 4
    input ->  task 1 ->
                       | task 2 -> ... 
  • 任务1处理输入数据并返回列表列表。
  • 任务2从任务1接收列表,并且还返回列表列表。
  • 任务#3从任务#2接收列表,并返回列表列表。
  • 任务4从任务4接收列表并处理列表中的数据。

例如,任务1返回[[],[],[]]。这意味着流程必须并行运行4个任务#2。每个任务2返回[[],[]]。现在我们必须执行4x3任务3。然后,任务3返回[[],[]]。最后,流程必须运行4x3x2任务4。

可以使用Prefect Flow吗?我尝试使用映射功能,但似乎不支持这种复杂的工作流模式(或者我可能使用不正确)。

with Flow('test') as flow:
   res1 = task1()
   res2 = task2.map(res1)
   res3 = task3.map(res2)
   res4 = task4.map(res3)

当我运行流程task1时,返回正确数量的列表。然后流程创建4个task2,每个任务返回三个列表的列表。但是,该流程没有创建12个task3,而是仅创建了4个。 每个task3都会收到由task1创建的4个列表的列表,而不是task2的1个列表。

关于如何创建这样的工作流程的任何想法?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)