问题描述
我在源连接器中工作,以观察 Mongo 集合中的更改并将它们带到 Kafka 主题。这很好用,直到我添加要求,如果满足特定条件(名称 = Kathe),将它们放在 Kafka 主题中。这意味着只要更新过程将名称更改为 Kathe,我就需要将数据放入主题中。
我的连接器的配置如下:
{
"connection.uri":"xxxxxx","connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","key.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","topic.prefix": "qu","database":"sample_analytics","collection":"customers","copy.existing": "true","pipeline":"[{\"$match\":{\"name\":\"Kathe\"}}]","publish.full.document.only": "true","flush.timeout.ms":"15000"
}
我也试过
"pipeline":"[{\"$match\":{\"name\":{ \"$eq\":\"Kathe\"}}}]"
但是当条件满足时它不会产生消息。
我是不是搞错了?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)