问题描述
我有代码需要从 8 个“玩家”的 ArrayLists 中找到 N 个最佳组合,其中恰好是 1 个“玩家”。每个 ArrayList 都在 20 到 40 之间。这导致了大量迭代的巨大运行时间。在尝试优化运行时间之前,代码完全正常运行,只是运行时间不理想。我认为最好的方法是使用 Executor Services 和 Java Streams,尽管我对这两者都不熟悉。这是我下面代码的最小化版本:
public static void main(String [] args){
//Lineup is an object which I store the combination of players in
ArrayList <Lineup> currBoard = new ArrayList<Lineup>();
final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
String threadId = Thread.currentThread().getName();
int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
int threadWorkLoad = newPG.size() / 4;
//int threadStart = (int)(tid - 1) * threadWorkLoad;
final List<Future<?>> futures = new ArrayList<>();
System.out.println(newPG.size());
for(int i=0; i < nThreads; i++){
Future<?> future = executorService.submit(() -> {
//System.out.println(pg.getName());
currBoard.addAll(parallelFunction(nThreads,amounts,newPG,newSG,newSF,newPF,newC,newG,newF,newL));
});
futures.add(future);
}
executorService.shutdown();
try{
executorService.awaitTermination(5000,TimeUnit.MILLISECONDS);
}catch(InterruptedException e){
System.out.println("oof");
}
try {
for (Future<?> future : futures) {
future.get(); // do anything you need,e.g. isDone(),...
}
} catch (InterruptedException | ExecutionException e) {
e.printstacktrace();
}
}
这是我遍历可能的阵容的函数
public ArrayList<Lineup> parallelFunction(int nThreads,int amounts,ArrayList<Player> newPG,ArrayList<Player> newSG,ArrayList<Player> newSF,ArrayList<Player> newPF,ArrayList<Player> newC,ArrayList<Player> newG,ArrayList<Player> newF,ArrayList<Player> newL) {
ArrayList<Player> temp = new ArrayList<Player>();
ArrayList<Lineup> threadBoard = new ArrayList<Lineup>();
String threadId = Thread.currentThread().getName();
int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
int threadWorkLoad = (newPG.size() + nThreads - 1) / nThreads;
int threadStart = (int)(tid - 1) * threadWorkLoad;
int threadStop = Math.min(threadStart + threadWorkLoad,newPG.size());
//reentrantlock lock = new reentrantlock();
ArrayList<Player> tempPG = new ArrayList<Player>();
for(int i = threadStart; i <= threadStop; i++){
tempPG.add(newPG.get(i));
}
Stream<Player> pgStream = StreamSupport.stream(tempPG.spliterator(),true);
pgStream.forEach(pg -> {
System.out.println("TID: " + tid + ",PG: " + pg.getName() + ",iterations: " + counting);
Stream<Player> sgStream = StreamSupport.stream(newSG.spliterator(),true);
sgStream.forEach(sg -> {
//System.out.println(tid + "," + sg.getName());
Stream<Player> sfStream = StreamSupport.stream(newSF.spliterator(),true);
sfStream.forEach(sf -> {
Stream<Player> pfStream = StreamSupport.stream(newPF.spliterator(),true);
pfStream.forEach(pf -> {
Stream<Player> cStream = StreamSupport.stream(newC.spliterator(),true);
cStream.forEach(c -> {
Stream<Player> gStream = StreamSupport.stream(newG.spliterator(),true);
gStream.forEach(g -> {
if ((!(g.getName().equals(sg.getName()) || g.getName().equals(pg.getName())))
&& ((pg.getSalary() + sg.getSalary() + sf.getSalary() + pf.getSalary()
+ c.getSalary() + 3000 + 3000 + 3000) < 50000)) {
Stream<Player> fStream = StreamSupport.stream(newF.spliterator(),true);
fStream.forEach(f -> {
if ((!(f.getName().equals(sf.getName()) || f.getName().equals(pf.getName())))
&& ((g.getSalary() + pg.getSalary() + sg.getSalary() + sf.getSalary()
+ pf.getSalary() + c.getSalary() + 3000 + 3000) < 50000)) {
Stream<Player> pStream = StreamSupport.stream(newL.spliterator(),true);
pStream.forEach(p -> {
if (!(p.getName().equals(pg.getName()) || p.getName().equals(sg.getName())
|| p.getName().equals(sf.getName())
|| p.getName().equals(pf.getName())
|| p.getName().equals(c.getName())
|| p.getName().equals(g.getName())
|| p.getName().equals(f.getName()))) {
double currscore = 0;
double totalSal = 0;
totalSal = pg.getSalary() + sg.getSalary() + sf.getSalary()
+ pf.getSalary() + c.getSalary() + f.getSalary() + g.getSalary()
+ p.getSalary();
currscore = pg.getProjection() + sg.getProjection() + sf.getProjection()
+ pf.getProjection() + c.getProjection() + f.getProjection()
+ g.getProjection() + p.getProjection();
if (totalSal <= 50000.0) {
counting += 1;
temp.clear();
temp.add(pg);
temp.add(sg);
temp.add(sf);
temp.add(pf);
temp.add(c);
temp.add(g);
temp.add(f);
temp.add(p);
ArrayList<Lineup> tempBoard = new ArrayList<Lineup>();
Lineup aLine = new Lineup(temp,currscore);
if (threadBoard.size() < amounts) {
if (!alreadyIn(threadBoard,temp)) {
threadBoard.add(aLine);
}
} else if (currscore > threadBoard.get(amounts - 1).totalscore) {
// Collections.sort(currBoard,Lineup.TotComp);
if (!alreadyIn(threadBoard,temp)) {
for (int i = 0; i < threadBoard.size() - 1; i++) {
tempBoard.add(threadBoard.get(i));
}
threadBoard.clear();
threadBoard.addAll(tempBoard);
tempBoard.clear();
threadBoard.add(aLine);
}
}
Collections.sort(threadBoard,Lineup.TotComp);
}
// temp.clear();
} // p if
});
pStream.close();
} // f if
});
fStream.close();
} // g if
});
gStream.close();
});
cStream.close();
});
pfStream.close();
});
sfStream.close();
});
sgStream.close();
});
pgStream.close();
return threadBoard;
}
经过大量迭代后,这会导致此错误,java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
是不是我对 executorService 或流做错了什么导致了这个。
或者有没有人有任何建议来提高这段代码的速度,因为它在崩溃之前仍然不能令人满意。
编辑
主要:
ArrayList<Lineup> currBoard = new ArrayList<Lineup>();
int nThreads = Runtime.getRuntime().availableProcessors();
final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
String threadId = Thread.currentThread().getName();
int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
int threadWorkLoad = newPG.size() / 4;
//int threadStart = (int)(tid - 1) * threadWorkLoad;
final List<Future<ThreadBoard>> futures = new ArrayList<Future<ThreadBoard>>();
for(int i=0; i < nThreads; i++){
Future<ThreadBoard> future = executorService.submit(() -> {
//System.out.println(pg.getName());
return parallelFunction(nThreads,newL);
});
futures.add(future);
}
executorService.shutdown();
try{
executorService.awaitTermination(5000,TimeUnit.MILLISECONDS);
}catch(InterruptedException e){
System.out.println("oof");
}
try {
for (Future<ThreadBoard> future : futures) {
currBoard.addAll(future.get().getCurrentLineup()); // do anything you need,...
}
} catch (InterruptedException | ExecutionException e) {
e.printstacktrace();
}
并行函数:
public ThreadBoard parallelFunction(int nThreads,ArrayList<Player> newL) {
ThreadBoard threadBoard = new ThreadBoard();
String threadId = Thread.currentThread().getName();
int tid = Integer.parseInt(threadId.substring(threadId.length() - 1));
int threadWorkLoad = (newPG.size() + nThreads - 1) / nThreads;
int threadStart = (int)(tid - 1) * threadWorkLoad;
int threadStop = Math.min(threadStart + threadWorkLoad,newPG.size());
//reentrantlock lock = new reentrantlock();
ArrayList<Player> tempPG = new ArrayList<Player>();
for(int i = threadStart; i <= threadStop; i++){
tempPG.add(newPG.get(i));
}
Stream<Player> pgStream = StreamSupport.stream(tempPG.spliterator(),false);
pgStream.forEach(pg -> {
System.out.println("TID: " + tid + ",false);
sgStream.forEach(sg -> {
//System.out.println(tid + ",false);
sfStream.forEach(sf -> {
Stream<Player> pfStream = StreamSupport.stream(newPF.spliterator(),false);
pfStream.forEach(pf -> {
Stream<Player> cStream = StreamSupport.stream(newC.spliterator(),false);
cStream.forEach(c -> {
Stream<Player> gStream = StreamSupport.stream(newG.spliterator(),false);
gStream.forEach(g -> {
if ((!(g.getName().equals(sg.getName()) || g.getName().equals(pg.getName())))
&& ((pg.getSalary() + sg.getSalary() + sf.getSalary() + pf.getSalary()
+ c.getSalary() + 3000 + 3000 + 3000) < 50000)) {
Stream<Player> fStream = StreamSupport.stream(newF.spliterator(),false);
fStream.forEach(f -> {
if ((!(f.getName().equals(sf.getName()) || f.getName().equals(pf.getName())))
&& ((g.getSalary() + pg.getSalary() + sg.getSalary() + sf.getSalary()
+ pf.getSalary() + c.getSalary() + 3000 + 3000) < 50000)) {
Stream<Player> pStream = StreamSupport.stream(newL.spliterator(),false);
pStream.forEach(p -> {
if (!(p.getName().equals(pg.getName()) || p.getName().equals(sg.getName())
|| p.getName().equals(sf.getName())
|| p.getName().equals(pf.getName())
|| p.getName().equals(c.getName())
|| p.getName().equals(g.getName())
|| p.getName().equals(f.getName()))) {
double currscore = 0;
double totalSal = 0;
totalSal = pg.getSalary() + sg.getSalary() + sf.getSalary()
+ pf.getSalary() + c.getSalary() + f.getSalary() + g.getSalary()
+ p.getSalary();
currscore = pg.getProjection() + sg.getProjection() + sf.getProjection()
+ pf.getProjection() + c.getProjection() + f.getProjection()
+ g.getProjection() + p.getProjection();
if (totalSal <= 50000.0) {
counting += 1;
ArrayList<Player> temp = new ArrayList<Player>();
temp.clear();
temp.add(pg);
temp.add(sg);
temp.add(sf);
temp.add(pf);
temp.add(c);
temp.add(g);
temp.add(f);
temp.add(p);
Lineup aLine = new Lineup(temp,currscore);
threadBoard.addLineup(aLine,currscore,temp);
// if (threadBoard.size() < amounts) {
// //if (!alreadyIn(threadBoard,temp)) {
// threadBoard.add(aLine);
// //}
// } else if (currscore > threadBoard.get(amounts - 1).totalscore) {
// // Collections.sort(currBoard,Lineup.TotComp);
// //if (!alreadyIn(threadBoard,temp)) {
// for (int i = 0; i < threadBoard.size() - 1; i++) {
// tempBoard.add(threadBoard.get(i));
// }
// threadBoard.clear();
// threadBoard.addAll(tempBoard);
// tempBoard.clear();
// threadBoard.add(aLine);
// //}
// }
//Collections.sort(threadBoard.getCurrentLineup(),Lineup.TotComp);
}
//temp.clear();
} // p if
});
pStream.close();
} // f if
});
fStream.close();
} // g if
});
gStream.close();
});
cStream.close();
});
pfStream.close();
});
sfStream.close();
});
sgStream.close();
});
pgStream.close();
return threadBoard;
}
ThreadBoard 类:
public class ThreadBoard {
private List<Lineup> lineups;
public ThreadBoard(){
this.lineups = new ArrayList<>();
}
public synchronized void addLineup(Lineup aLine,double currscore,ArrayList<Player> temp) {
if (lineups.size() < amounts) {
if (!this.alreadyIn(lineups,temp)) {
lineups.add(aLine);
}
} else if (currscore > lineups.get(amounts - 1).totalscore) {
// Collections.sort(currBoard,Lineup.TotComp);
if (!this.alreadyIn(lineups,temp)) {
lineups.set(amounts - 1,aLine);
}
}
Collections.sort(lineups,Lineup.TotComp);
}
public List<Lineup> getCurrentLineup() {
return lineups;
}
public boolean alreadyIn(List<Lineup> board,ArrayList<Player> temp) {
boolean found = false;
for (int i = 0; i < board.size(); i++) {
ArrayList <Player> check = board.get(i).getL();
boolean pg = (check.contains(temp.get(0)));
boolean sg = (check.contains(temp.get(1)));
boolean pf = (check.contains(temp.get(2)));
boolean sf = (check.contains(temp.get(3)));
boolean c = (check.contains(temp.get(4)));
boolean g = (check.contains(temp.get(5)));
boolean f = (check.contains(temp.get(6)));
boolean all = (check.contains(temp.get(7)));
if (pg && sg && pf && sf && c && g && f && all) {
found = true;
}
}
return found;
}
}
解决方法
ArrayList<Lineup> threadBoard
的问题在于它用于收集、排序和限制来自不同并行流的结果。恕我直言,ArrayList 不是这些任务的正确数据结构。我将创建一个单独的类来执行以下步骤:
public class ThreadBoard {
private List<Lineup> currentLineup = new ArrayList<>();
public void addLineup(Lineup aLine,double currScore) {
if (currentLineup.size() < amounts) {
if (!alreadyIn(currentLineup,temp)) {
currentLineup.add(aLine);
}
} else if (currScore > currentLineup.get(amounts - 1).totalScore) {
// Collections.sort(currBoard,Lineup.TotComp);
if (!alreadyIn(currentLineup,temp)) {
currentLineup.set(amounts - 1,aLine);
Collections.sort(currentLineup,Lineup.TotComp);
}
}
public List<Lineup> getCurrentLineup() {
return currentLineup;
}
}
为了使该线程安全,您可以将 addLineup()
方法声明为同步的。
但是,我不确定所有这些并行流是否有助于提高性能(因为您已经将整个作业拆分为并行处理的块)。
也许将整个作业拆分成更多、更小的块,然后仅使用顺序流来处理一个块会更有意义。最后,您的计算机只有有限数量的内核(可能是 4 个,也可能是 16 个),并且将 CPU 绑定的作业拆分为比内核数更多的部分(并且通过使用嵌套的并行流来执行此操作)并没有多少感觉。