如何在 Java 8 中并行处理更多文件处理

问题描述

在我的 Java Web 应用程序中是基于文件的集成。他们过去常常在我们的生产服务器 opt/app/proceed/ 文件夹中发送一堆 xml 文件(例如:10000)。但是根据当前的配置,我们的应用程序能够按顺序处理 200 个文件。因此,延迟处理文件。我正在尝试以并行方式增加文件处理的数量。请找到代码块供您参考。

public class FileEx {

   public static void main(String[] args) throws IOException {
       String fileDir = "C:\\Users\\inputfiles"; //contains more than 10000 files
       new FileEx().traverseFilesFromDir(new File(fileDir));
   }

   public void traverseFilesFromDir(File dir) throws IOException {
       List<File> files = new ArrayList<File>();
       if (dir == null || !dir.isDirectory()) {
           throw new IllegalArgumentException("Not a valid directory (value: " + dir + ").");
       }
       File[] acknFiles = dir.listFiles();
       int fileCount = (acknFiles == null ? 0 : acknFiles.length);

       System.out.println("fileCount:::::::::" + fileCount);

       Arrays.sort(acknFiles,new Comparator<File>() {
           public int compare(File f1,File f2) {
               return Long.valueOf(f1.lastModified()).compareto(f2.lastModified());
           }
       });

       **int maxNoFiles = acknFiles.length <= 500 ? acknFiles.length : 500;**
       System.out.println(acknFiles.length + " Ackn found and starting to process oldest " + maxNoFiles + " files.");

       for (int i = 0; i < maxNoFiles; i++) {
           files.add(acknFiles[i]);
       }

       int fileCount1 = (files == null ? 0 : files.size());

       if (fileCount1 > 0) {
           for (int i = 0; i < fileCount1; i++) {

               boolean success = true;// processFile(files.get(i));
               if (success) {
                   System.out.println("File Successfully processed.");
               }
           }
       }
   }
}

如何着手改变文件处理方式。等待需要的支持/指导。

解决方法

// java 8.1 以后可以使用的并行流 // 这里paralell() 默认使用ForkJoinPool.commonPool() 执行

Files.lines(Paths.get( files.get(i))
  .parallel()
  .map(Your_FileBean::new) 
  .forEach(/*process Your_FileBean*/);
,

您可以通过 Files.walk() 执行此操作,它返回一个 Stream<Path> 对象。如果您想并行处理此流,您可以尝试使用 parallel()collect(Collectors.toList()).parallelStream()。由于 Files.walk() 计算延迟,因此单独 parallel() 可能无法有效利用所有可用内核。

您可以根据需要应用 sortingfiltering。您的处理步骤可以通过流末尾的 forEachOrdered() 实现。

这是一个例子:

Files.walk(Paths.get("/path/to/root/directory")) // create a stream of paths
    .collect(Collectors.toList()) // collect paths into list to better parallize
    .parallelStream() // process this stream in multiple threads
    .filter(Files::isRegularFile) // filter out any non-files (such as directories)
    .map(Path::toFile) // convert Path to File object
    .sorted((a,b) -> Long.compare(a.lastModified(),b.lastModified())) // sort files date
    .limit(500) // limit processing to 500 files (optional)
    .forEachOrdered(f -> {
        // do processing here
        System.out.println(f);
    });