问题描述
我想使用MongoDB更改流来监视第一个集合上的插入/更新,以在满足条件时填充另一个具有从监视的集合中提取出的计算值的集合。
在Mongodb tutorial之后,我得到了以下结果:
require('dotenv').config();
const { MongoClient } = require('mongodb');
const stream = require('stream');
const es = require('event-stream');
async function monitorListingsUsingStreamAPI(client,pipeline = []) {
const collection = client
.db(process.env.MONGO_DB)
.collection(process.env.COLLECTION_TO_MONITOR);
const changeStream = collection.watch(pipeline);
const collection_dest = client
.db(process.env.MONGO_DB)
.collection(process.env.COLLECTION_TO_POPULATE);
changeStream.pipe(
es.map(function (doc,next) {
const { _id,...data } = doc.fullDocument;
const new_doc = { size: data.samples.length,data };
(async () => {
await collection_dest.insertOne(new_doc,next);
})();
}),);
}
async function main() {
const uri = process.env.MONGO_DB_URI;
const client = new MongoClient(uri,{
useUnifiedTopology: true,useNewUrlParser: true,});
try {
// Connect to the MongoDB cluster
await client.connect();
const pipeline = [
{
$match: {
operationType: 'insert','fullDocument.samples': { $size: 3 },},];
// Monitor new listings using the Stream API
await monitorListingsUsingStreamAPI(client,pipeline);
}
}
实际上,这似乎可行,但是我使用了event-stream
到pipe
的MongoDB更改流到另一个流,在其中我使用了立即调用的匿名异步函数来填充第二个集合。
我想知道这种方法是否正确?如何使用转换流?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)