如何在mongodb java sdk中的部署级别changestream中的集合上进行过滤

问题描述

我试图使用MongoDB Java SDK在集合名称上使用管道筛选器设置部署级别更改流。这是代码段。

   List<Bson> pipeline = Collections.singletonList(Aggregates.match(Filters.or(
        Filters.eq("namespace","db1.c1"),Filters.eq("namespace","db1.c2"))));
 
    client.watch(pipeline)
        .cursor()
        .forEachRemaining(doc -> {
          System.out.println(doc);
        });

但是此查询与任何文档都不匹配。管道文档的以下变体版本也不起作用。

    List<Bson> pipeline =
    Collections.singletonList(Aggregates.match(Filters.or(
      Document.parse("{'namespace': 'db1.c1'}"),Document.parse("{'namespace': 'db1.c2'}")))); 

令人惊讶的是,管道可以在changestream文档的其他字段上工作。例如,这有效:

     List<Bson> pipeline = Collections
     .singletonList(Aggregates.match(Filters.and(
     Document.parse("{'fullDocument.seq': 4}"),Filters.in("operationType",Arrays.asList("insert")))));

我不确定为什么会这样。在这方面的任何帮助,我将不胜感激。

解决方法

为了获得数据库或集合级别的变更流事件,您需要在以下属性上使用过滤器:

ns.db 数据库的名称。

ns.coll 集合的名称。

如果您只对集合C1和c2感兴趣,请考虑使用以下过滤器。

List<Bson> pipeline = Collections.singletonList(Aggregates.match(Filters.or(
            Filters.eq("ns.coll","c1"),Filters.eq("ns.coll","c2"))));
     
        client.watch(pipeline)
            .cursor()
            .forEachRemaining(doc -> {
              System.out.println(doc);
            });

请注意:以上过滤器仅适用于集合,您只能在DB上为特定于集合形式的DB添加过滤器,例如(ns.db =“ db1” AND ns.coll:$ in:[“ c1”,“ c2”])

MongoDB文档参考:https://docs.mongodb.com/manual/reference/change-events/#change-stream-output