我试图从sql数据库的一系列(批处理)记录生成一个Observable,我试图运行数据库中的所有记录.我在node-js上使用ORM,Sequelize返回包含在promise中的记录.
我已经定义了一个函数fetchbatch(),它获取下一个批处理并返回一个Promise [Array [Record]]并将该结果平面映射到一个Observable.
根据查询是否返回没有记录,我的条件(终止)被设置为promise的then块中的全局,但是永远不会调用回调,只会无限地返回promise,因此永远不会满足终止条件.有关如何处理的任何建议?这是代码的要点.
function getAllPaginated(conditions) { var remaining = true; var batch_size = 20; function condition(){ return remaining; } function selector(promisedBatchOfRecords){ //console.log(promisedBatchOfRecords); //return Observable.fromPromise(promisedBatchOfRecords[1]); return (promisedBatchOfRecords[1]); } function fetchBatch(batchNumberAndBatch) { // Returns [NextBatchNumber,Promise[Array[Record]]] //console.log(remaining); var batch_number = batchNumberAndBatch[0]; var offset = (batch_number - 1) * batch_size; var rs = Records.findAll({where: conditions,offset: offset,limit: batch_size}); return [batch_number + 1,rs.then(function(batch) { console.log(batch.length); if (!(batch.length > 0)){ remaining = false; }; return batch.map(function(r){r.dataValues}); })]; } return Observable.generate(fetchBatch([1,[]]),condition,fetchBatch).flatMap(Ramda.identity/*over the promise*/).flatMap(Ramda.identity/*over the list*/); } var o = getAllPaginated({where: {a: "b"}}) o.subScribeOnNext(console.log)
解决方法
你可以试试这样的东西:
const result = new Rx.Subject; const batch_size = 3; // Init the recursion whileFind(0) .subscribe(); // Grab the result here result .mergeAll() .map(batch => batch.dataValues) .subscribe(value => console.log(value)); // Recursion function function whileFind(offset) { return Rx.Observable.fromPromise(findAll(offset)) .concatMap(batch => { if (batch.length <= 0) { // Stop condition return Rx.Observable.of(null); } else { result.next(batch); // Push the chunk to the result return whileFind(offset + batch_size); } }); } // Emulate Records.findAll from your BO function findAll(offset): Promise<Object[]> { const data = [ { dataValues: 1 },{ dataValues: 2 },{ dataValues: 3 },{ dataValues: 4 },{ dataValues: 5 },{ dataValues: 6 },{ dataValues: 7 },{ dataValues: 8 },{ dataValues: 9 },{ dataValues: 10 } ]; return Promise.resolve(data.slice(offset,offset + batch_size)); }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.12/Rx.min.js"></script>