node.js – Meteor:如何将大文件流式传输并解析为异步节点功能?

我正在使用 job-collection软件包执行以下操作:

>下载包含大量有关网页元数据的大型文件
>使用NPM事件流包从正则表达式拆分的文件元数据中创建流
>检查集合中的元数据是否匹配(我一直试图将每个网页的元数据流式传输到另一个函数来执行此操作)

该文件太大而无法缓冲,因此需要流式传输. Here is a small file with a few examples of the metadata 如果你想试试这个.

作业集合包中的每个作业都已在异步函数中:

var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');

function (job,callback) {

//This download is much too long to block
  request({url: job.fileURL,encoding: null},function (error,response,body) {
    if (error) console.error('Error downloading File');
    if (response.statusCode !== 200) console.error(downloadResponse.statusCode,'Status not 200');

    var responseEncoding = response.headers['content-type'];
    console.log('response encoding is %s',responseEncoding);
    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
      console.log('Received binary/octet-stream');
      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
              .pipe(EventStream.split(regexSplit))
              .pipe(EventStream.map(function (webpageMetaData) {
      /* Need parse the metaData or pass each webpageMetaData to function
       * This next function could block if it had to */
      searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous
    }));
    } else {
      console.error('Wrong encoding');
    }
  });
}

function searchWebPageMetaData(metaData) {
  //  Parse JSON and search collection for match
}

>有没有更好的方法来构建它?我是在正确的轨道上吗?
>在哪里放置Meteor.bindEnvironment? – 每次传递给searchWebPageMetaData()时,我都会绑定环境吗?我需要在这里明确使用纤维吗?
>如果我将其运行到process.stdout,则在运行此流时会停止.我应该将流放入Meteor的一个包裹中
>我知道Meteor.wrapAsync.我想在Meteor.wrapAsync中包装最里面的searchWebPageMetaData()函数吗? (当我输入时,想想我回答的是)
>流是否会缓慢以补偿数据库调用的缓慢?我的猜测不是,但我该怎么处理?

我花了很长时间了解Meteor的wrapAsync和bindEnvironment,但是无法将它们整合在一起并了解在哪里使用它们.

补充1

只是为了澄清,步骤是:

>下载文件;
>创建流;
>解压缩;
>将其拆分为单独的webPages – EventStream处理此问题
>将其发送给函数 – 不需要返回值;这可能是阻塞,只是一些搜索和数据库调用

我试图做这样的事情,除了我需要帮助的核心代码是在一个不同文件的函数中.以下代码中包含@ electric-jesus的大部分答案.

processJobs('parseWatFile',{
     concurrency: 1,cargo: 1,pollInterval: 1000,prefetch: 1
   },function (job,callback) {

     if (job.data.watZipFileLink) {
       queue.pause();
       console.log('queue should be paused now');


       var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz';
       function searchPageMetaData(webpageMetaData,callback) {
         console.log(webpageMetaData);  // Would be nice to just get this function logging each webPageMetaData
         future.return(callback(webpageMetaData));  //I don't need this to return any value - do I have to return something?
     }

      if (!watFile)
        console.error('No watFile passed to downloadAndSearchWatFileForEntity ');

      var future = new Future(); // Doc Brown would be proud.

      if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.');

    request({url: watFile,body) {

      if (error)                        future.throw('Error Downloading File');
      if (response.statusCode !== 200)  future.throw('Expected status 200,got ' + response.statusCode + '.');

      var responseEncoding = response.headers['content-type'];

    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {

      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
        .pipe(EventStream.split(regexSplit))
        .pipe(EventStream.map(function (webpageMetaData) {
        searchPageMetaData(webpageMetaData,callback);
      })
    ));
    } else {
      future.throw('Wrong encoding');
    }
    });

    return future.wait();

    } else {
      console.log('No watZipFileLink for this job');
      job.log('ERROR: NO watZipFileLink from commonCrawlJob collection');
    }
      queue.resume();
      job.done;
      callback();
  }

解决方法

有意思,看起来还不错.我从未使用过工作集,但它似乎只是一个Mongo驱动的任务队列..所以我假设它像常规队列一样工作.我总是找到回调的东西,我当然使用Future模式.例如:
var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');

var Future = Npm.require('fibers/future');


var searchWebPageMetaData = function (metaData) {
  //  Parse JSON and search collection for match
  // make it return something
  var result = /droids/ig.test(metaData);
  return result;
}

var processJob = function (job,callback) {

  var future = new Future(); // Doc Brown would be proud.

  if(typeof callback !== 'function') future.throw("Oops,you forgot that callbacks are supposed to be functions.. not undefined or whatever.");

  //This download is much too long to block
  request({url: job.fileURL,body) {

    if (error)                        future.throw("Error Downloading File");
    if (response.statusCode !== 200)  future.throw("Expected status 200,got " + downloadResponse.statusCode + ".");

    var responseEncoding = response.headers['content-type'];


    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {

      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
              .pipe(EventStream.split(regexSplit))
              .pipe(EventStream.map(function (webpageMetaData) {
      /* Need parse the metaData or pass each webpageMetaData to function
       * This next function could block if it had to */

      // pass each metadatum to this function to update a collection - this function can be synchronous

      future.return(callback(webpageMetaData)); // this way,processJob returns whatever we find in the completed webpage,via callback.

    }));
    } else {
      future.throw('Wrong encoding');
    }
  });

  return future.wait();
}

用法示例:

所以每当你在这里分配变量:

var currentJob = processJob(myjob,searchWebPageMetaData);

即使使用同步类型获取/变量分配,您也可以及时完成异步工作并及时传输.

要回答你的问题,

>在哪里放置Meteor.bindEnvironment? – 每次传递给searchWebPageMetaData()时,我都会绑定环境吗?我需要在这里明确使用纤维吗?

不是真的,我相信明确使用纤维/未来已经解决了这个问题.
>如果我将其运行到process.stdout,则在运行此流时会停止.我应该将流放入Meteor的一个包裹中

你是什​​么意思?我依稀记得process.stdout是阻塞的,这可能是一个原因.再次,将结果包装在未来应该照顾这一点.
>我知道Meteor.wrapAsync.我想在Meteor.wrapAsync中包装最里面的searchWebPageMetaData()函数吗? (当我输入时,想想我回答的是)

Take a look at the Meteor.wrapAsync helper code.它基本上是应用的未来分辨率,当然你可以再做一遍,你也可以自己明确地使用光纤/未来没有问题.
>流是否会缓慢以补偿数据库调用的缓慢?我的猜测不是,但我该怎么处理?

不太确定你在这里的意思..但由于我们正在尝试使用异步光纤,我的猜测也不是很好.我还没有看到使用纤维有任何缓慢.可能只有在同时启动(并同时运行)多个作业的情况下,您才会在内存使用方面遇到性能问题.保持并发队列低,因为Fibers可以在运行内容时非常强大.你只有一个核心来处理它,这是一个可悲的事实,因为节点不能多核:(

相关文章

这篇文章主要介绍“基于nodejs的ssh2怎么实现自动化部署”的...
本文小编为大家详细介绍“nodejs怎么实现目录不存在自动创建...
这篇“如何把nodejs数据传到前端”文章的知识点大部分人都不...
本文小编为大家详细介绍“nodejs如何实现定时删除文件”,内...
这篇文章主要讲解了“nodejs安装模块卡住不动怎么解决”,文...
今天小编给大家分享一下如何检测nodejs有没有安装成功的相关...