交错并行文件读取比顺序读取慢?

问题描述

如您所说,在磁盘上进行顺序读取要比读取-跳过-读取-跳过模式快得多。硬盘在顺序读取时具有较高的带宽,但是查找时间(延迟)很昂贵。

与其将文件的副本存储在每个磁盘中,不如将文件的块i存储在磁盘i(mod 2)上。这样,您可以依次从两个磁盘读取数据并将结果重新组合到内存中。

解决方法

我实现了一个小的IO类,它可以从不同磁盘(例如,两个包含相同文件的硬盘)上的多个相同文件中读取数据。在顺序情况下,两个磁盘在文件上的平均读取速度均为60MB
/ s,但是当我进行交错操作(例如4k磁盘1、4k磁盘2然后合并)时,有效读取速度会降低到40MB / s而不是增加吗?

上下文:Win 7 + JDK 7b70、2GB RAM,2.2GB测试文件。基本上,我尝试以穷人的方式模仿Win7的ReadyBoost和RAID x。

从本质上讲,当向类发出read()时,它将创建两个可运行对象,并带有从特定位置和长度读取预打开的RandomAccessFile的指令。使用执行程序服务和Future.get()调用,当两者都完成时,读取的数据将被复制到公共缓冲区中并返回给调用者。

我的方法是否存在概念错误?(例如,操作系统缓存机制将始终抵消吗?)

protected <T> List<T> waitForAll(List<Future<T>> futures) 
throws MultiIOException {
    MultiIOException mex = null;
    int i = 0;
    List<T> result = new ArrayList<T>(futures.size());
    for (Future<T> f : futures) {
        try {
            result.add(f.get());
        } catch (InterruptedException ex) {
            if (mex == null) {
                mex = new MultiIOException();
            }
            mex.exceptions.add(new ExceptionPair(metrics[i].file,ex));
        } catch (ExecutionException ex) {
            if (mex == null) {
                mex = new MultiIOException();
            }
            mex.exceptions.add(new ExceptionPair(metrics[i].file,ex));
        }
        i++;
    }
    if (mex != null) {
        throw mex;
    }
    return result;
}

public int read(long position,byte[] output,int start,int length) 
throws IOException {
    if (start < 0 || start + length > output.length) {
        throw new IndexOutOfBoundsException(
        String.format("start=%d,length=%d,output=%d",start,length,output.length));
    }
    // compute the fragment sizes and positions
    int result = 0;
    final long[] positions = new long[metrics.length];
    final int[] lengths = new int[metrics.length];
    double speedSum = 0.0;
    double maxValue = 0.0;
    int maxIndex = 0;
    for (int i = 0; i < metrics.length; i++) {
        speedSum += metrics[i].readSpeed;
        if (metrics[i].readSpeed > maxValue) {
            maxValue = metrics[i].readSpeed;
            maxIndex = i;
        }
    }
    // adjust read lengths
    int lengthSum = length;
    for (int i = 0; i < metrics.length; i++) {
        int len = (int)Math.ceil(length * metrics[i].readSpeed / speedSum);
        lengths[i] = (len > lengthSum) ? lengthSum : len;
        lengthSum -= lengths[i];
    }
    if (lengthSum > 0) {
        lengths[maxIndex] += lengthSum;
    }
    // adjust read positions
    long positionDelta = position;
    for (int i = 0; i < metrics.length; i++) {
        positions[i] = positionDelta;
        positionDelta += (long)lengths[i]; 
    }        
    List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
    // read in parallel
    for (int i = 0; i < metrics.length; i++) {
        final int j = i;
        futures.add(exec.submit(new Callable<byte[]>() {
            @Override
            public byte[] call() throws Exception {
                byte[] buffer = new byte[lengths[j]];
                long t = System.nanoTime();
                long t0 = t;

                long currPos = metrics[j].handle.getFilePointer();
                metrics[j].handle.seek(positions[j]);
                t = System.nanoTime() - t;
                metrics[j].seekTime = t * 1024.0 * 1024.0 / 
                    Math.abs(currPos - positions[j]) / 1E9 ;

                int c = metrics[j].handle.read(buffer);
                t0 = System.nanoTime() - t0;
                // adjust the read speed if we read something
                if (c > 0) {
                    metrics[j].readSpeed = (alpha * c * 1E9 / t0 / 1024 / 1024
                    + (1 - alpha) * metrics[j].readSpeed) ;
                }
                if (c < 0) {
                    return null;
                } else
                if (c == 0) {
                    return EMPTY_BYTE_ARRAY;
                } else
                if (c < buffer.length) {
                    return Arrays.copyOf(buffer,c);
                }
                return buffer;
            }
        }));
    }
    List<byte[]> data = waitForAll(futures);
    boolean eof = true;
    for (byte[] b : data) {
        if (b != null && b.length > 0) {
            System.arraycopy(b,output,start + result,b.length);
            result += b.length;
            eof = false;
        } else {
            break; // the rest probably reached EOF
        }
    }
    // if there was no data at all,we reached the end of file
    if (eof) {
        return -1;
    }
    sequentialPosition = position + (long)result;

    // evaluate the fastest file to read
    double maxSpeed = 0;
    maxIndex = 0;
    for (int i = 0; i < metrics.length; i++) {
        if (metrics[i].readSpeed > maxSpeed) {
            maxSpeed = metrics[i].readSpeed;
            maxIndex = i;
        }
    }
    fastest = metrics[maxIndex];
    return result;
}

(指标数组中的FileMetrics包含读取速度的测量值,以自适应地确定各种输入通道的缓冲区大小-在我的测试中,alpha = 0和readSpeed =
1结果均等分布)

编辑 我进行了一个无纠缠的测试(例如,在单独的线程中分别读取两个文件。),我的综合有效速度为110MB / s。

Edit2 我想我知道为什么会这样。

当我按顺序并行读取时,它不是对磁盘的顺序读取,而是由于交错(并且可能与分配表查找有关)而引起的“读取-跳过-读取-
跳过”模式。基本上,这会将每个磁盘的有效读取速度降低到一半或更低。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...