在 MongoDB Kafka 源连接器中配置管道

问题描述

我正在使用源连接器的以下配置来过滤和读取来自 MongoDB 的状态为“PENDING”的特定记录。只需要轮询所有以 PENDING 状态插入/更新的记录。如果排除管道,源连接器能够轮询所有记录。有人可以帮助我了解如何解决这个问题,还有没有办法知道轮询已完成,就像批处理作业已完成以在 kafka 消费者上编排另一个进程?

name=mongo-source-demo
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://username:password@hostname:27017
database=test
collection=mongoDBtest
topic.prefix=mongodb.connector
poll.max.batch.size=1000
poll.await.time.ms=100000
publish.full.document.only=true
pipeline=[{"$match": { "Status" : "PENDING" }},{"$project":{"_id":1,"fullDocument":1}} ]

解决方法

问题在于 MongoDB 运算符中的“”。正确的管道配置应该是:

pipeline=[{$match: { "Status" : "PENDING" }},{$project:{"_id":1,"fullDocument":1}} ]