问题描述
我已经设置了一个非常简单的mongo kafka source connector来将mongo的oplog流式传输到kafka。但是,我看到在连接器发布的消息中,序列化的oplog事件不遵守extended JSON spec;例如,日期时间字段表示为:
{"$date": 1597841586927}
当规格说明时,其格式应为:
{"$date": {"$numberLong": "1597841586927"}}
为什么我不能得到干净的扩展JSON?
注意:我的连接器配置文件如下所示:
{
"name": "mongosource","config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","tasks.max": 1,"connection.uri": "...","topic.prefix":"mongosource","database": "mydb","copy.existing": true,"change.stream.full.document": "updateLookup",}
}
解决方法
源连接器的默认json格式化程序是旧版格式化程序(请参见连接器JIRA项目上的this issue)。
从该连接器的1.3.0
版开始,有一个新的配置选项可以添加,以要求连接器输出正确的扩展JSON:
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"