问题描述
在我的 Java Web 应用程序中是基于文件的集成。他们过去常常在我们的生产服务器 opt/app/proceed/ 文件夹中发送一堆 xml 文件(例如:10000)。但是根据当前的配置,我们的应用程序能够按顺序处理 200 个文件。因此,延迟处理文件。我正在尝试以并行方式增加文件处理的数量。请找到代码块供您参考。
public class FileEx {
public static void main(String[] args) throws IOException {
String fileDir = "C:\\Users\\inputfiles"; //contains more than 10000 files
new FileEx().traverseFilesFromDir(new File(fileDir));
}
public void traverseFilesFromDir(File dir) throws IOException {
List<File> files = new ArrayList<File>();
if (dir == null || !dir.isDirectory()) {
throw new IllegalArgumentException("Not a valid directory (value: " + dir + ").");
}
File[] acknFiles = dir.listFiles();
int fileCount = (acknFiles == null ? 0 : acknFiles.length);
System.out.println("fileCount:::::::::" + fileCount);
Arrays.sort(acknFiles,new Comparator<File>() {
public int compare(File f1,File f2) {
return Long.valueOf(f1.lastModified()).compareto(f2.lastModified());
}
});
**int maxNoFiles = acknFiles.length <= 500 ? acknFiles.length : 500;**
System.out.println(acknFiles.length + " Ackn found and starting to process oldest " + maxNoFiles + " files.");
for (int i = 0; i < maxNoFiles; i++) {
files.add(acknFiles[i]);
}
int fileCount1 = (files == null ? 0 : files.size());
if (fileCount1 > 0) {
for (int i = 0; i < fileCount1; i++) {
boolean success = true;// processFile(files.get(i));
if (success) {
System.out.println("File Successfully processed.");
}
}
}
}
}
解决方法
// java 8.1 以后可以使用的并行流 // 这里paralell() 默认使用ForkJoinPool.commonPool() 执行
Files.lines(Paths.get( files.get(i))
.parallel()
.map(Your_FileBean::new)
.forEach(/*process Your_FileBean*/);
,
您可以通过 Files.walk()
执行此操作,它返回一个 Stream<Path>
对象。如果您想并行处理此流,您可以尝试使用 parallel()
或 collect(Collectors.toList()).parallelStream()
。由于 Files.walk()
计算延迟,因此单独 parallel()
可能无法有效利用所有可用内核。
您可以根据需要应用 sorting 和 filtering。您的处理步骤可以通过流末尾的 forEachOrdered()
实现。
这是一个例子:
Files.walk(Paths.get("/path/to/root/directory")) // create a stream of paths
.collect(Collectors.toList()) // collect paths into list to better parallize
.parallelStream() // process this stream in multiple threads
.filter(Files::isRegularFile) // filter out any non-files (such as directories)
.map(Path::toFile) // convert Path to File object
.sorted((a,b) -> Long.compare(a.lastModified(),b.lastModified())) // sort files date
.limit(500) // limit processing to 500 files (optional)
.forEachOrdered(f -> {
// do processing here
System.out.println(f);
});