使用 csv-parse 和 node 在读取和解析文件中添加快速随机访问

问题描述

我使用 csv-parser 库来处理 node.js 中的 csv 解析。该文件可能很大,从 50,000 到 500,000 行,甚至可能更大。我不得不在 csv 上执行一些计算,在将其提交给服务器之后,我正在考虑将 csv 分成块,然后我可以将其提供给工作线程以执行计算。工作线程将获得要跳过的行数,然后在达到特定限制后开始读取行数。我创建了一个读取流并将 csv-parser 与要跳过的行数选项一起传入。我试图对它执行一些基准测试,但在跳线和不跳线之间没有发现明显的好处。即使我阅读整个文件有时也比阅读结尾的 30,000 行要快。

我的猜测是这个问题是因为读取流是一个一个读取数据,因此不适合快速随机访问文件

也许我的基准测试是错误的?

这是一段代码

const csv = require('csv-parser');
const bench = require('nanobench');

const fs = require('fs');
const parse = (number) => {
// const results = [];
    fs.createReadStream('test.csv').pipe(csv({
        skipLines: number
    })).on('data',(data) => {}).on('end',() => {});
}

const arr = [0,30000,15000,15000/2,15000/4,15000/8];

arr.forEach(item => {
    bench(`CSV skip ${item} lines 40 times`,function(b) {
        b.start();
        for(let i = 0; i < 40; i++) parse(item);
        b.end();
    })
})

这是输出

# CSV skip 0 lines 40 times
ok ~4.14 ms (0 s + 4139981 ns)

# CSV skip 30000 lines 40 times
ok ~2.05 ms (0 s + 2054537 ns)

# CSV skip 15000 lines 40 times
ok ~2.7 ms (0 s + 2702328 ns)

# CSV skip 7500 lines 40 times
ok ~2.43 ms (0 s + 2434555 ns)

# CSV skip 3750 lines 40 times
ok ~1.97 ms (0 s + 1966652 ns)

# CSV skip 1875 lines 40 times
ok ~2.17 ms (0 s + 2172144 ns)

对于我的目标还有其他更好的方法吗?

解决方法

问题是,即使你想跳过 N 行,解析器仍然要读取和分析从上到下到第 N 行的所有字节。第一行离开头越远,执行的工作就越无用 (Schlemiel the Painter's algorithm)。

您可以考虑以下逻辑:

  • 对于每个文件,以 currentPosition = 0 开头
  • 寻找偏移量 currentPosition + chunkSize
  • 读取 i 个字节,直到遇到换行符或 EOF
  • 使用参数 position=currentPositionsize = chunkSize + i 分配一个新线程
  • 继续currentPosition = currentPosition + size + 1

这样,每个块将包含整数行。

在线程中,使用参数 positionsize 读取整个块并在内存中解析它。

在伪代码中:

size = fs.statSync("filename").size

chunkSize = 99999

currentPos = 0

fd = fs.open("filename")

while (currentPos < size) {

    endPos = currentPos + chunkSize
    fs.readSync(fd,buf,1000,endPos)
    
    i = 0
    while(buf[i] != \n) i++
    endPos += i
    
    threads.add(filename: "filename",position: currentPos,size: endPos - currentPos)

    currentPos = endPos + 1
}