javascript – 如何将节点可读流转换为RX observable

如果我有一个Node js流,例如来自process.stdin或fs.createReadStream之类的东西,我如何使用RxJs5将其转换为RxJs Observable流?

我看到RxJs-Node一个fromreadableStream方法,但看起来它在近一年内没有更新.

解决方法

对于任何寻找此项的人,请遵循Mark的建议,I adapted rx-node fromStream implementation for rxjs5.
import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream,finishEventName = 'end',dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName,dataHandler);
    stream.addListener('error',errorHandler);
    stream.addListener(finishEventName,endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName,dataHandler);
      stream.removeListener('error',errorHandler);
      stream.removeListener(finishEventName,endHandler);
    };
  }).share();
}

相关文章

前言 做过web项目开发的人对layer弹层组件肯定不陌生,作为l...
前言 前端表单校验是过滤无效数据、假数据、有毒数据的第一步...
前言 图片上传是web项目常见的需求,我基于之前的博客的代码...
前言 导出Excel文件这个功能,通常都是在后端实现返回前端一...
前言 众所周知,js是单线程的,从上往下,从左往右依次执行,...
前言 项目开发中,我们可能会碰到这样的需求:select标签,禁...