使用networkxPython创建并执行任务工作流

问题描述

摘要:

目标是运行由步骤(图形节点)及其依赖关系(图形边缘)的集合创建的工作流。

是否可以创建networkx.DiGraph()并运行遍历以在不同节点上执行回调?

示例:

请查看示例图:

Possible execution orders of the example workflow

对于那组任务和依赖项,执行的可能性是:

  1. 以步骤a为切入点
  2. 在步骤b之后的
  3. 步骤a
  4. 在步骤c之后立即并行执行步骤db(步骤c首先完成)
  5. 步骤g之后的步骤c(步骤d仍在进行中)
  6. 步骤e之后的步骤d(因为步骤c已经完成)
  7. 在步骤f之后的
  8. 步骤e

  1. 将步骤a作为入口点
  2. 在步骤b之后的
  3. 步骤a
  4. 在步骤c之后立即并行执行步骤db(步骤d首先完成)
  5. 并行执行步骤ge(因为步骤d在步骤c之前完成了)
  6. 在步骤f之后的
  7. 步骤e

下面,请在networkx中找到创建此示例图的代码段:

import networkx as nx

W = nx.DiGraph()

nodes = ["a","b","c","d","e","f"]
edges = [("a","b"),("b","c"),("c","g"),"d"),"e"),("d",("e","f")]
W.add_nodes_from(nodes)
W.add_edges_from(edges)

是否有已知的解决方案以上述方式遍历图,打印节点的值(步骤名称)并随机休眠几秒钟以模拟正在运行的任务并执行一些计算? (使用基本的多线程)

谢谢。

解决方法

拓扑排序(nx.topological_sort)将返回任务的有效序列。
示例:

>>> list(nx.topological_sort(W))
['a','b','d','c','e','f','g']

如果您希望可以同时执行多组任务,则可以对其进行一些更改以进行分组。

def topological_sort_grouping(g):
    # copy the graph
    _g = g.copy()
    res = []
    # while _g is not empty
    while _g:
        zero_indegree = [v for v,d in _g.in_degree() if d == 0]
        res.append(zero_indegree)
        _g.remove_nodes_from(zero_indegree)
    return res

示例:

>>> topological_sort_grouping(W)
[['a'],['b'],['c','d'],['e','g'],['f']]

给出组,您可以遍历它们并同时在同一组中执行。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...