带有 Java Streams 的 ExecutorService 给出了这个错误“java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException”

问题描述

我有代码需要从 8 个“玩家”的 ArrayLists 中找到 N 个最佳组合,其中恰好是 1 个“玩家”。每个 ArrayList 都在 20 到 40 之间。这导致了大量迭代的巨大运行时间。在尝试优化运行时间之前,代码完全正常运行,只是运行时间不理想。我认为最好的方法是使用 Executor Services 和 Java Streams,尽管我对这两者都不熟悉。这是我下面代码的最小化版本:

public static void main(String [] args){
//Lineup is an object which I store the combination of players in
ArrayList <Lineup> currBoard = new ArrayList<Lineup>();
final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
    String threadId = Thread.currentThread().getName();
    int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
    int threadWorkLoad = newPG.size() / 4;
    //int threadStart = (int)(tid - 1) * threadWorkLoad;
    final List<Future<?>> futures = new ArrayList<>();
    System.out.println(newPG.size());
    for(int i=0; i < nThreads; i++){        
        Future<?> future = executorService.submit(() -> {
                    //System.out.println(pg.getName());
                    
                    currBoard.addAll(parallelFunction(nThreads,amounts,newPG,newSG,newSF,newPF,newC,newG,newF,newL));
                    
                
                
        });
        futures.add(future);
    }
    executorService.shutdown();
    try{
        executorService.awaitTermination(5000,TimeUnit.MILLISECONDS);
    }catch(InterruptedException e){
        System.out.println("oof");
    }
    try {
        for (Future<?> future : futures) {
            future.get(); // do anything you need,e.g. isDone(),...
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printstacktrace();
    }
}

这是我遍历可能的阵容的函数

public ArrayList<Lineup> parallelFunction(int nThreads,int amounts,ArrayList<Player> newPG,ArrayList<Player> newSG,ArrayList<Player> newSF,ArrayList<Player> newPF,ArrayList<Player> newC,ArrayList<Player> newG,ArrayList<Player> newF,ArrayList<Player> newL) {

    ArrayList<Player> temp = new ArrayList<Player>();
    
    ArrayList<Lineup> threadBoard = new ArrayList<Lineup>();

    String threadId = Thread.currentThread().getName();
    int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
    int threadWorkLoad = (newPG.size() + nThreads - 1) / nThreads;
    int threadStart = (int)(tid - 1) * threadWorkLoad;
    int threadStop = Math.min(threadStart + threadWorkLoad,newPG.size());
    //reentrantlock lock = new reentrantlock();
    
    
    ArrayList<Player> tempPG = new ArrayList<Player>();

    for(int i = threadStart; i <= threadStop; i++){
        tempPG.add(newPG.get(i));
    }

    Stream<Player> pgStream = StreamSupport.stream(tempPG.spliterator(),true);
    pgStream.forEach(pg -> {

        System.out.println("TID: " + tid + ",PG: " + pg.getName() + ",iterations: " + counting);
        Stream<Player> sgStream = StreamSupport.stream(newSG.spliterator(),true);
        sgStream.forEach(sg -> {
        //System.out.println(tid + "," + sg.getName());
        Stream<Player> sfStream = StreamSupport.stream(newSF.spliterator(),true);
        sfStream.forEach(sf -> {
            Stream<Player> pfStream = StreamSupport.stream(newPF.spliterator(),true);
            pfStream.forEach(pf -> {
                Stream<Player> cStream = StreamSupport.stream(newC.spliterator(),true);
                cStream.forEach(c -> {
                    Stream<Player> gStream = StreamSupport.stream(newG.spliterator(),true);
                    gStream.forEach(g -> {
                        if ((!(g.getName().equals(sg.getName()) || g.getName().equals(pg.getName())))
                                && ((pg.getSalary() + sg.getSalary() + sf.getSalary() + pf.getSalary()
                                        + c.getSalary() + 3000 + 3000 + 3000) < 50000)) {
                                Stream<Player> fStream = StreamSupport.stream(newF.spliterator(),true);
                                fStream.forEach(f -> {
                                if ((!(f.getName().equals(sf.getName()) || f.getName().equals(pf.getName())))
                                        && ((g.getSalary() + pg.getSalary() + sg.getSalary() + sf.getSalary()
                                                + pf.getSalary() + c.getSalary() + 3000 + 3000) < 50000)) {
                                        Stream<Player> pStream = StreamSupport.stream(newL.spliterator(),true);
                                        pStream.forEach(p -> {
                                        if (!(p.getName().equals(pg.getName()) || p.getName().equals(sg.getName())
                                                || p.getName().equals(sf.getName())
                                                || p.getName().equals(pf.getName())
                                                || p.getName().equals(c.getName())
                                                || p.getName().equals(g.getName())
                                                || p.getName().equals(f.getName()))) {
                                            double currscore = 0;
                                            double totalSal = 0;
                                            totalSal = pg.getSalary() + sg.getSalary() + sf.getSalary()
                                                    + pf.getSalary() + c.getSalary() + f.getSalary() + g.getSalary()
                                                    + p.getSalary();
                                            currscore = pg.getProjection() + sg.getProjection() + sf.getProjection()
                                                    + pf.getProjection() + c.getProjection() + f.getProjection()
                                                    + g.getProjection() + p.getProjection();
                                            if (totalSal <= 50000.0) {
                                                counting += 1;
                                                temp.clear();
                                                temp.add(pg);
                                                temp.add(sg);
                                                temp.add(sf);
                                                temp.add(pf);
                                                temp.add(c);
                                                temp.add(g);
                                                temp.add(f);
                                                temp.add(p);
                                                ArrayList<Lineup> tempBoard = new ArrayList<Lineup>();

                                                Lineup aLine = new Lineup(temp,currscore);
                                                
                                                if (threadBoard.size() < amounts) {
                                                    if (!alreadyIn(threadBoard,temp)) {
                                                        
                                                        threadBoard.add(aLine);
                                                    }


                                                } else if (currscore > threadBoard.get(amounts - 1).totalscore) {
                                                    // Collections.sort(currBoard,Lineup.TotComp);
                                                    if (!alreadyIn(threadBoard,temp)) {
                                                        for (int i = 0; i < threadBoard.size() - 1; i++) {
                                                            tempBoard.add(threadBoard.get(i));
                                                        }
                                                        threadBoard.clear();
                                                        threadBoard.addAll(tempBoard);
                                                        tempBoard.clear();
                                                        threadBoard.add(aLine);
                                                    }
                                                }
                                                Collections.sort(threadBoard,Lineup.TotComp);
                                            }

                                            // temp.clear();
                                        } // p if
                                    });
                                    pStream.close();
                                } // f if
                            });
                            fStream.close();
                        } // g if
                    });
                    gStream.close();
                });
                cStream.close();
            });
            pfStream.close();
        });
        sfStream.close();
    });
    sgStream.close();
});
pgStream.close();







return threadBoard;
}

经过大量迭代后,这会导致此错误,java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: java.util.ConcurrentModificationException

是不是我对 executorService 或流做错了什么导致了这个。

或者有没有人有任何建议来提高这段代码的速度,因为它在崩溃之前仍然不能令人满意。


编辑

主要:

ArrayList<Lineup> currBoard = new ArrayList<Lineup>();
    int nThreads = Runtime.getRuntime().availableProcessors();

    final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
    String threadId = Thread.currentThread().getName();
    int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
    int threadWorkLoad = newPG.size() / 4;
    //int threadStart = (int)(tid - 1) * threadWorkLoad;
    final List<Future<ThreadBoard>> futures = new ArrayList<Future<ThreadBoard>>();
    
    for(int i=0; i < nThreads; i++){
        Future<ThreadBoard> future = executorService.submit(() -> {
                    //System.out.println(pg.getName());
                return parallelFunction(nThreads,newL); 
        });
        futures.add(future);
    }
    executorService.shutdown();
    try{
        executorService.awaitTermination(5000,TimeUnit.MILLISECONDS);
    }catch(InterruptedException e){
        System.out.println("oof");
    }
    try {
        for (Future<ThreadBoard> future : futures) {
            currBoard.addAll(future.get().getCurrentLineup()); // do anything you need,...
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printstacktrace();
    }

并行函数

public ThreadBoard parallelFunction(int nThreads,ArrayList<Player> newL) {

    
    ThreadBoard threadBoard = new ThreadBoard();

    String threadId = Thread.currentThread().getName();
    int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
    int threadWorkLoad = (newPG.size() + nThreads - 1) / nThreads;
    int threadStart = (int)(tid - 1) * threadWorkLoad;
    int threadStop = Math.min(threadStart + threadWorkLoad,newPG.size());
    //reentrantlock lock = new reentrantlock();
    
    
    
    ArrayList<Player> tempPG = new ArrayList<Player>();

    for(int i = threadStart; i <= threadStop; i++){
        tempPG.add(newPG.get(i));
    }

    Stream<Player> pgStream = StreamSupport.stream(tempPG.spliterator(),false);
    pgStream.forEach(pg -> {

        System.out.println("TID: " + tid + ",false);
        sgStream.forEach(sg -> {
        //System.out.println(tid + ",false);
        sfStream.forEach(sf -> {
            Stream<Player> pfStream = StreamSupport.stream(newPF.spliterator(),false);
            pfStream.forEach(pf -> {
                Stream<Player> cStream = StreamSupport.stream(newC.spliterator(),false);
                cStream.forEach(c -> {
                    Stream<Player> gStream = StreamSupport.stream(newG.spliterator(),false);
                    gStream.forEach(g -> {
                        if ((!(g.getName().equals(sg.getName()) || g.getName().equals(pg.getName())))
                                && ((pg.getSalary() + sg.getSalary() + sf.getSalary() + pf.getSalary()
                                        + c.getSalary() + 3000 + 3000 + 3000) < 50000)) {
                                Stream<Player> fStream = StreamSupport.stream(newF.spliterator(),false);
                                fStream.forEach(f -> {
                                if ((!(f.getName().equals(sf.getName()) || f.getName().equals(pf.getName())))
                                        && ((g.getSalary() + pg.getSalary() + sg.getSalary() + sf.getSalary()
                                                + pf.getSalary() + c.getSalary() + 3000 + 3000) < 50000)) {
                                        Stream<Player> pStream = StreamSupport.stream(newL.spliterator(),false);
                                        pStream.forEach(p -> {
                                        if (!(p.getName().equals(pg.getName()) || p.getName().equals(sg.getName())
                                                || p.getName().equals(sf.getName())
                                                || p.getName().equals(pf.getName())
                                                || p.getName().equals(c.getName())
                                                || p.getName().equals(g.getName())
                                                || p.getName().equals(f.getName()))) {
                                            double currscore = 0;
                                            double totalSal = 0;
                                            totalSal = pg.getSalary() + sg.getSalary() + sf.getSalary()
                                                    + pf.getSalary() + c.getSalary() + f.getSalary() + g.getSalary()
                                                    + p.getSalary();
                                            currscore = pg.getProjection() + sg.getProjection() + sf.getProjection()
                                                    + pf.getProjection() + c.getProjection() + f.getProjection()
                                                    + g.getProjection() + p.getProjection();
                                            if (totalSal <= 50000.0) {
                                                counting += 1;
                                                ArrayList<Player> temp = new ArrayList<Player>();
                                                temp.clear();
                                                temp.add(pg);
                                                temp.add(sg);
                                                temp.add(sf);
                                                temp.add(pf);
                                                temp.add(c);
                                                temp.add(g);
                                                temp.add(f);
                                                temp.add(p);

                                                Lineup aLine = new Lineup(temp,currscore);
                                                threadBoard.addLineup(aLine,currscore,temp);
                                                
                                                // if (threadBoard.size() < amounts) {
                                                //  //if (!alreadyIn(threadBoard,temp)) {
                                                        
                                                //      threadBoard.add(aLine);
                                                //  //}


                                                // } else if (currscore > threadBoard.get(amounts - 1).totalscore) {
                                                //  // Collections.sort(currBoard,Lineup.TotComp);
                                                //  //if (!alreadyIn(threadBoard,temp)) {
                                                //      for (int i = 0; i < threadBoard.size() - 1; i++) {
                                                //          tempBoard.add(threadBoard.get(i));
                                                //      }
                                                //      threadBoard.clear();
                                                //      threadBoard.addAll(tempBoard);
                                                //      tempBoard.clear();
                                                //      threadBoard.add(aLine);
                                                //  //}
                                                // }
                                                //Collections.sort(threadBoard.getCurrentLineup(),Lineup.TotComp);
                                            }

                                            //temp.clear();
                                        } // p if
                                    });
                                    pStream.close();
                                } // f if
                            });
                            fStream.close();
                        } // g if
                    });
                    gStream.close();
                });
                cStream.close();
            });
            pfStream.close();
        });
        sfStream.close();
    });
    sgStream.close();
});
pgStream.close();







return threadBoard;
}

ThreadBoard 类:

public class ThreadBoard {
private List<Lineup> lineups;

public ThreadBoard(){
    this.lineups = new ArrayList<>();
}

public synchronized void addLineup(Lineup aLine,double currscore,ArrayList<Player> temp) {
    if (lineups.size() < amounts) {
        if (!this.alreadyIn(lineups,temp)) {
            lineups.add(aLine);
        }
    } else if (currscore > lineups.get(amounts - 1).totalscore) {
        // Collections.sort(currBoard,Lineup.TotComp);
        if (!this.alreadyIn(lineups,temp)) {
            lineups.set(amounts - 1,aLine);
        }
    }
    Collections.sort(lineups,Lineup.TotComp);
}

public List<Lineup> getCurrentLineup() {
    return lineups;
}

public boolean alreadyIn(List<Lineup> board,ArrayList<Player> temp) {
    boolean found = false;
    for (int i = 0; i < board.size(); i++) {
        
        ArrayList <Player> check = board.get(i).getL();

        boolean pg = (check.contains(temp.get(0)));
        boolean sg = (check.contains(temp.get(1)));
        boolean pf = (check.contains(temp.get(2)));
        boolean sf = (check.contains(temp.get(3)));
        boolean c = (check.contains(temp.get(4)));
        boolean g = (check.contains(temp.get(5)));
        boolean f = (check.contains(temp.get(6)));
        boolean all = (check.contains(temp.get(7)));

        if (pg && sg && pf && sf && c && g && f && all) {
            found = true;
        }
    }
    

    return found;
}

}

解决方法

ArrayList<Lineup> threadBoard 的问题在于它用于收集、排序和限制来自不同并行流的结果。恕我直言,ArrayList 不是这些任务的正确数据结构。我将创建一个单独的类来执行以下步骤:

public class ThreadBoard {
    private List<Lineup> currentLineup = new ArrayList<>();
    
    public void addLineup(Lineup aLine,double currScore) {
        if (currentLineup.size() < amounts) {
            if (!alreadyIn(currentLineup,temp)) {
                currentLineup.add(aLine);
            }
        } else if (currScore > currentLineup.get(amounts - 1).totalScore) {
            // Collections.sort(currBoard,Lineup.TotComp);
            if (!alreadyIn(currentLineup,temp)) {
                currentLineup.set(amounts - 1,aLine);
                Collections.sort(currentLineup,Lineup.TotComp);
        }
    }

    public List<Lineup> getCurrentLineup() {
        return currentLineup;
    }
}

为了使该线程安全,您可以将 addLineup() 方法声明为同步的。

但是,我不确定所有这些并行流是否有助于提高性能(因为您已经将整个作业拆分为并行处理的块)。

也许将整个作业拆分成更多、更小的块,然后仅使用顺序流来处理一个块会更有意义。最后,您的计算机只有有限数量的内核(可能是 4 个,也可能是 16 个),并且将 CPU 绑定的作业拆分为比内核数更多的部分(并且通过使用嵌套的并行流来执行此操作)并没有多少感觉。