流管道上的 Node.Js 异步迭代器

问题描述

我有以下管道:

readFile > parseCSV > otherProcess

readFile 是标准的 Node.Js createReadStream,而 parseCSV 是 Node.js 转换流(模块 link)。

我想逐行遍历一个 csv 文件并在当时处理一行。因此,流和异步迭代器是绝配。

我有以下运行正常的代码

async function* readByLine(path,opt) {
  const readFileStream = fs.createReadStream(path);
  const csvParser = parse(opt);
  const parser = readFileStream.pipe(csvParser);
  for await (const record of parser) {
    yield record;
  }
}

我对 Node.Js 流很陌生,但我从许多来源了解到,模块 stream.pipeline 比读取流的 .pipe 方法更受欢迎。

如何更改上面的代码以使用 stream.pipeline(实际上是从 util.promisify(pipeline) 获得的 promise 版本)并同时生成一行?

解决方法

您实际上应该能够将 fs-stream 和解析器流都传递给 pipeline() 并在解析器流上使用您的异步迭代器:

const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream')
const util = require('util');
const pipeline = util.promisify(stream.pipeline);

async function* readByLine(path,opt) {
    const readFileStream = fs.createReadStream(path);
    const csvParser = parse(opt);
    await pipeline(readFileStream,csvParser);
    for await (const record of csvParser) {
        yield record;
    }
}