问题描述
我需要创建一个可以使用此工作流程处理任务的批处理:
| task 4
| task 3 ->| task 4
| task 2 -> | task 4
| task 3 ->| task 4
input -> task 1 ->
| task 2 -> ...
- 任务1处理输入数据并返回列表列表。
- 任务2从任务1接收列表,并且还返回列表列表。
- 任务#3从任务#2接收列表,并返回列表列表。
- 任务4从任务4接收列表并处理列表中的数据。
例如,任务1返回[[],[],[]]
。这意味着流程必须并行运行4个任务#2。每个任务2返回[[],[]]
。现在我们必须执行4x3任务3。然后,任务3返回[[],[]]
。最后,流程必须运行4x3x2任务4。
可以使用Prefect Flow吗?我尝试使用映射功能,但似乎不支持这种复杂的工作流模式(或者我可能使用不正确)。
with Flow('test') as flow:
res1 = task1()
res2 = task2.map(res1)
res3 = task3.map(res2)
res4 = task4.map(res3)
当我运行流程task1时,返回正确数量的列表。然后流程创建4个task2,每个任务返回三个列表的列表。但是,该流程没有创建12个task3,而是仅创建了4个。 每个task3都会收到由task1创建的4个列表的列表,而不是task2的1个列表。
关于如何创建这样的工作流程的任何想法?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)