并行执行有向无环图

问题描述

我有一个任务列表[Task-A,Task-B,Task-C,Task-D,...]
任务可以选择依赖于其他任务。
例如:
任务A具有3个从属任务:B,C和D
任务B具有2个相关任务:C和E
等等..

它基本上是一个有向非循环图,只有在独立任务执行成功或异常之后才执行任务。例如:任务:C和E独立应首先运行任务B和任务D最后是Task-A,这应该是执行的顺序。

Task datamodel
@Data //lambok
Public Class Task{
     Private String name;
     Private List<Task> dependentTasks;
     
     public void run(){
     // business logic
     }
}

解决方法

other answer 工作正常,但太复杂了。

更简单的方法是并行执行 Kahn's algorithm

关键是并行执行所有依赖的任务。

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


class DependencyManager {
private final ConcurrentHashMap<String,List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final  AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);

private static Runnable getRunnable(DependencyManager dependencyManager,String taskId){
    return () -> {
    try {
        Thread.sleep(2000);  // A task takes 2 seconds to finish.
        dependencyManager.taskCompleted(taskId);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    };
}

/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
    _dependencies.putIfAbsent(taskId,new ArrayList<>());
    _reverseDependencies.putIfAbsent(taskId,new ArrayList<>());
    _tasks.putIfAbsent(taskId,getRunnable(this,taskId));
    _numDependenciesExecuted.putIfAbsent(taskId,0);
}

private void addEdge(String dependentTaskId,String dependeeTaskId) {
    _dependencies.get(dependentTaskId).add(dependeeTaskId);
    _reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}

public void addDependency(String dependentTaskId,String dependeeTaskId) {
    addVertex(dependentTaskId);
    addVertex(dependeeTaskId);
    addEdge(dependentTaskId,dependeeTaskId);
}

private void taskCompleted(String taskId) {
    System.out.println(String.format("%s:: Task %s done!!",Instant.now(),taskId));
    _numTasksExecuted.incrementAndGet();
    _reverseDependencies.get(taskId).forEach(nextTaskId -> {
    _numDependenciesExecuted.computeIfPresent(nextTaskId,(__,currValue) -> currValue + 1);
    int numDependencies = _dependencies.get(nextTaskId).size();
    int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
    if (numDependenciesExecuted == numDependencies) {
        // All dependencies have been executed,so we can submit this task to the threadpool. 
        _executorService.submit(_tasks.get(nextTaskId));
    }
    });
    if (_numTasksExecuted.get() == _tasks.size()) {
    topoSortCompleted();
    }
}

private void topoSortCompleted() {
    System.out.println("Topo sort complete!!");
    _executorService.shutdownNow();
}

public void executeTopoSort() {
    System.out.println(String.format("%s:: Topo sort started!!",Instant.now()));
    _dependencies.forEach((taskId,dependencies) -> {
    if (dependencies.isEmpty()) {
        _executorService.submit(_tasks.get(taskId));
    }
    });
}
}

public class TestParallelTopoSort {

public static void main(String[] args) {
    DependencyManager dependencyManager = new DependencyManager();
    dependencyManager.addDependency("8","5");
    dependencyManager.addDependency("7","6");
    dependencyManager.addDependency("6","3");
    dependencyManager.addDependency("6","4");
    dependencyManager.addDependency("5","1");
    dependencyManager.addDependency("5","2");
    dependencyManager.addDependency("5","3");
    dependencyManager.addDependency("4","1");
    dependencyManager.executeTopoSort();
    // Parallel version takes 8 seconds to execute.
    // Serial version would have taken 16 seconds.

}
}

这个例子中构造的有向无环图是这样的:

Directed Acyclic Graph

,

我们可以创建一个DAG,其中图形的每个顶点都是任务之一。
之后,我们可以计算其topological sorted顺序。
然后,我们可以使用优先级字段装饰Task类,并使用ThreadPoolExecutor运行PriorityBlockingQueue,该run()使用优先级字段比较Tasks。

最后一个技巧是重写import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Testing { private static Callable<Void> getCallable(String taskId){ return () -> { System.out.println(String.format("Task %s result",taskId)); Thread.sleep(100); return null; }; } public static void main(String[] args) throws ExecutionException,InterruptedException { Callable<Void> taskA = getCallable("A"); Callable<Void> taskB = getCallable("B"); Callable<Void> taskC = getCallable("C"); Callable<Void> taskD = getCallable("D"); Callable<Void> taskE = getCallable("E"); PrioritizedFutureTask<Void> pfTaskA = new PrioritizedFutureTask<>(taskA); PrioritizedFutureTask<Void> pfTaskB = new PrioritizedFutureTask<>(taskB); PrioritizedFutureTask<Void> pfTaskC = new PrioritizedFutureTask<>(taskC); PrioritizedFutureTask<Void> pfTaskD = new PrioritizedFutureTask<>(taskD); PrioritizedFutureTask<Void> pfTaskE = new PrioritizedFutureTask<>(taskE); // Create a DAG graph. pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE); pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD); // Now that we have a graph,we can just get its topological sorted order. List<PrioritizedFutureTask<Void>> topological_sort = new ArrayList<>(); topological_sort.add(pfTaskE); topological_sort.add(pfTaskC); topological_sort.add(pfTaskB); topological_sort.add(pfTaskD); topological_sort.add(pfTaskA); ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,new PriorityBlockingQueue<Runnable>(1,new CustomRunnableComparator())); // Its important to insert the tasks in the topological sorted order,otherwise its possible that the thread pool will be stuck forever. for (int i = 0; i < topological_sort.size(); i++) { PrioritizedFutureTask<Void> pfTask = topological_sort.get(i); pfTask.setPriority(i); // The lower the priority,the sooner it will run. executor.execute(pfTask); } } } class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> { private Integer _priority = 0; private final Callable<T> callable; private final List<PrioritizedFutureTask> _dependencies = new ArrayList<>(); ; public PrioritizedFutureTask(Callable<T> callable) { super(callable); this.callable = callable; } public PrioritizedFutureTask(Callable<T> callable,Integer priority) { this(callable); _priority = priority; } public Integer getPriority() { return _priority; } public PrioritizedFutureTask<T> setPriority(Integer priority) { _priority = priority; return this; } public PrioritizedFutureTask<T> addDependency(PrioritizedFutureTask dep) { this._dependencies.add(dep); return this; } @Override public void run() { for (PrioritizedFutureTask dep : _dependencies) { try { dep.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } super.run(); } @Override public int compareTo(PrioritizedFutureTask<T> other) { if (other == null) { throw new NullPointerException(); } return getPriority().compareTo(other.getPriority()); } } class CustomRunnableComparator implements Comparator<Runnable> { @Override public int compare(Runnable task1,Runnable task2) { return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2); } } ,以首先等待所有相关任务完成。

由于每个任务都无限期地等待其相关任务完成,因此我们不能让线程池完全占据拓扑排序顺序中较高的任务;线程池将永远卡住。
为了避免这种情况,我们只需要根据拓扑顺序为任务分配优先级即可。

Task E result
Task C result
Task B result
Task D result
Task A result

输出:

1

PS:Here是经过测试且简单的Python拓扑排序实现,您可以轻松地将其移植到Java中。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...