在两个单独的线程中读取和处理文件比一个线程慢两倍

问题描述

解决了计算文本文件中唯一行数的任务。每个字符串是一个有效的 ip 地址。该文件可以是任何大小(字面上,数百和数千 GB 都是可能的)。我写了一个简单的类,它实现了一个位数组并将其用于计数。

public class IntArrayBitCounter {
    public static final long MIN_BIT_CAPACITY = 1L;
    public static final long MAX_BIT_CAPACITY = 1L << 32;

    private final int intArraySize;
    private final int[] intArray;
    private long counter;

    public IntArrayBitCounter(long bitCapacity) {
        if (bitCapacity < MIN_BIT_CAPACITY || bitCapacity > MAX_BIT_CAPACITY) {
            throw new IllegalArgumentException("Capacity must be in range [1.." + MAX_BIT_CAPACITY + "].");
        }
        this.intArraySize = 1 + (int) ((bitCapacity - 1) >> 5);
        this.intArray = new int[intArraySize];
    }

    private void checkBounds(long bitIndex) {
        if (bitIndex < 0 || bitIndex > ((long) intArraySize << 5)) {
            throw new indexoutofboundsexception("Bit index must be in range [0.." + (MAX_BIT_CAPACITY - 1) + "].");
        }
    }

    public void setBit(long bitIndex) {
        checkBounds(bitIndex);
        int index = (int) (bitIndex >> 5);
        int bit = 1 << (bitIndex & 31);
        if ((intArray[index] & bit) == 0) {
            counter++;
            intArray[index] |= bit;
        }
    }

    public boolean isBitSets(long bitIndex) {
        checkBounds(bitIndex);
        int index = (int) (bitIndex >> 5);
        int bit = 1 << (bitIndex & 31);
        return (intArray[index] & bit) != 0;
    }

    public int getIntArraySize() {
        return intArraySize;
    }

    public long getBitCapacity() {
        return (long) intArraySize << 5;
    }

    public long getCounter() {
        return counter;
    }
}

我的简单单线程方法运行良好。它几乎完全利用了我旧硬盘的读取速度,大约为 130-135 MB/s。 Linux 中的系统监视器显示从磁盘到我的程序的读取速度约为 100-110 MB/s。

public class IpCounterapp {

    private static long toLongValue(String ipString) throws UnkNownHostException {
        long result = 0;
        for (byte b : InetAddress.getByName(ipString).getAddress())
            result = (result << 8) | (b & 255);
        return result;
    }

    public static void main(String[] args) {
        long startTime = System.nanoTime();

        String fileName = "src/test/resources/test.txt";
        var counter = new IntArrayBitCounter(1L << 32);
        long linesProcessed = 0;
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
            String line;
            while ((line = reader.readLine()) != null) {
                counter.setBit(toLongValue(line));
                linesProcessed++;
            }
        } catch (IOException e) {
            e.printstacktrace();
        }

        System.out.printf("%d unique lines in %d processed\n",counter.getCounter(),linesProcessed);
        long elapsedtime = System.nanoTime() - startTime;
        System.out.println("duration: " + elapsedtime / 1000000 + " milliseconds");
    }
}

然后我尝试开始从磁盘读取并在两个不同的线程中处理行,希望能有所改进。我创建了一个阻塞队列。第一个线程读取行并写入此队列。第二个线程从队列中读出并进行计数。 但是,10_000_000 个地址中的测试文件的执行速度,其中 5_000_000 唯一地崩溃了近 2 倍。读取速度也下降了一半至 50-55 MB/s。

public class ConcurrentIpCounterapp {

    public static void main(String[] args) {
        long startTime = System.nanoTime();

        String fileName = "src/test/resources/test.txt";
        var stringsQueue = new ArrayBlockingQueue<String>(1024);
        var reader = new BlockingQueueFileReader(stringsQueue,fileName);
        var counter = new BlockingQueueCounter(stringsQueue);

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Long> linesProcessed = executorService.submit(reader);
        Future<Long> uniqueLines = executorService.submit(counter);

        try {
            System.out.printf("%d unique lines in %d processed\n",uniqueLines.get(),linesProcessed.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printstacktrace();
        } finally {
            executorService.shutdown();
        }

        long elapsedtime = System.nanoTime() - startTime;
        System.out.println("duration: " + elapsedtime / 1000000 + " milliseconds");
    }
}
public class BlockingQueueCounter implements Callable<Long> {

    private final BlockingQueue<String> queue;
    private final IntArrayBitCounter counter;

    public BlockingQueueCounter(BlockingQueue<String> queue) {
        this.queue = queue;
        this.counter = new IntArrayBitCounter(1L << 32);
    }

    private static long toLongValue(String ipString) throws UnkNownHostException {
        long result = 0;
        for (byte b : InetAddress.getByName(ipString).getAddress())
            result = (result << 8) | (b & 255);
        return result;
    }
    
    @Override
    public Long call() {
        String line;
        while (true) {
            try {
                line = queue.take();
                if ("EOF".equals(line)) {
                    break;
                }
                counter.setBit(toLongValue(line));
            } catch (InterruptedException | UnkNownHostException e) {
                e.printstacktrace();
            }
        }
        return counter.getCounter();
    }
}
public class BlockingQueueFileReader implements Callable<Long> {

    private final BlockingQueue<String> queue;
    private final String fileName;
    private long totalLines;

    public BlockingQueueFileReader(BlockingQueue<String> queue,String fileName) {
        this.queue = queue;
        this.fileName = fileName;
    }

    @Override
    public Long call() {
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line);
                totalLines++;
            }
            queue.add("EOF");
        } catch (IOException | InterruptedException e) {
            e.printstacktrace();
        }
        return totalLines;
    }
}

请帮助我理解为什么会发生这种情况。我自己找不到答案。

解决方法

要回答为什么多线程尝试比单线程慢两倍的问题,请尝试衡量

  • 整个过程所用的时间(您已经这样做了)
  • 生产者活动时间(从磁盘读取并格式化队列数据)
  • 生产者队列等待时间(实际将数据填充到最终阻塞的队列中的时间)

我想这就是你得到答案的地方。

,

阻塞队列是否有可能在一大块数据入队时不仅阻塞消费者而且阻塞发送者?在这种情况下,您的读取线程必须暂停,并且可能启动下一次读取操作意味着要等到硬盘驱动器完成下一次旋转。

如果增加阻塞队列的大小,您会获得什么性能?

所以你必须确保阅读器永远不会暂停。如果队列增长过大,请增加消耗线程的数量。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...