javascript – 从ORM返回的Promise序列生成RxJS Observable

我试图从sql数据库的一系列(批处理)记录生成一个Observable,我试图运行数据库中的所有记录.我在node-js上使用ORM,Sequelize返回包含在promise中的记录.

我已经定义了一个函数fetchbatch(),它获取一个批处理并返回一个Promise [Array [Record]]并将该结果平面映射到一个Observable.

根据查询是否返回没有记录,我的条件(终止)被设置为promise的then块中的全局,但是永远不会调用回调,只会无限地返回promise,因此永远不会满足终止条件.有关如何处理的任何建议?这是代码的要点.

function getAllPaginated(conditions) {
    var remaining = true;
    var batch_size = 20;
    function condition(){ return remaining; }
    function selector(promisedBatchOfRecords){
      //console.log(promisedBatchOfRecords);
      //return Observable.fromPromise(promisedBatchOfRecords[1]);
      return (promisedBatchOfRecords[1]);
    }
    function fetchBatch(batchNumberAndBatch) { // Returns [NextBatchNumber,Promise[Array[Record]]]
      //console.log(remaining);
      var batch_number = batchNumberAndBatch[0];
      var offset = (batch_number - 1) * batch_size;
      var rs = Records.findAll({where: conditions,offset: offset,limit: batch_size});
      return [batch_number + 1,rs.then(function(batch) {
                console.log(batch.length);
                if (!(batch.length > 0)){
                  remaining = false;
                };
                return batch.map(function(r){r.dataValues});
              })];
    }
    return Observable.generate(fetchBatch([1,[]]),condition,fetchBatch).flatMap(Ramda.identity/*over the promise*/).flatMap(Ramda.identity/*over the list*/);
  }
var o = getAllPaginated({where: {a: "b"}})
o.subScribeOnNext(console.log)

解决方法

你可以试试这样的东西:

const result = new Rx.Subject;
const batch_size = 3;

// Init the recursion
whileFind(0)
  .subscribe();

// Grab the result here
result
  .mergeAll()
  .map(batch => batch.dataValues)
  .subscribe(value => console.log(value));
    
// Recursion function
function whileFind(offset) {
  return Rx.Observable.fromPromise(findAll(offset))
    .concatMap(batch => {
      if (batch.length <= 0) { // Stop condition
        return Rx.Observable.of(null);
      }
      else {
        result.next(batch); // Push the chunk to the result
        return whileFind(offset + batch_size);
      }
  });
}

// Emulate Records.findAll from your BO
function findAll(offset): Promise<Object[]> {
  const data = [
    { dataValues: 1 },{ dataValues: 2 },{ dataValues: 3 },{ dataValues: 4 },{ dataValues: 5 },{ dataValues: 6 },{ dataValues: 7 },{ dataValues: 8 },{ dataValues: 9 },{ dataValues: 10 }
  ];

  return Promise.resolve(data.slice(offset,offset + batch_size));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.12/Rx.min.js"></script>

相关文章

前言 做过web项目开发的人对layer弹层组件肯定不陌生,作为l...
前言 前端表单校验是过滤无效数据、假数据、有毒数据的第一步...
前言 图片上传是web项目常见的需求,我基于之前的博客的代码...
前言 导出Excel文件这个功能,通常都是在后端实现返回前端一...
前言 众所周知,js是单线程的,从上往下,从左往右依次执行,...
前言 项目开发中,我们可能会碰到这样的需求:select标签,禁...