如何在满足条件时将数据从 MongoDB 传递到具有源连接器和管道属性的 Kafka 主题?

问题描述

我在源连接器中工作,以观察 Mongo 集合中的更改并将它们带到 Kafka 主题。这很好用,直到我添加要求,如果满足特定条件(名称 = Kathe),将它们放在 Kafka 主题中。这意味着只要更新过程将名称更改为 Kathe,我就需要将数据放入主题中。

我的连接器的配置如下:

{
    "connection.uri":"xxxxxx","connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","key.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","topic.prefix": "qu","database":"sample_analytics","collection":"customers","copy.existing": "true","pipeline":"[{\"$match\":{\"name\":\"Kathe\"}}]","publish.full.document.only": "true","flush.timeout.ms":"15000"
}

我也试过

"pipeline":"[{\"$match\":{\"name\":{ \"$eq\":\"Kathe\"}}}]"

但是当条件满足时它不会产生消息。

我是不是搞错了?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)