问题描述
我想知道如何获取来自Debezium sql服务器连接器的所有DATETIME(io.debezium.time.Timestamp),格式为YYYY-MM-DD hh:mm:ss。现在,这是以纪元int64格式给出的,这将很难使用spark转换数据。
用于以下的debezium sql服务器连接器的配置:
{
"name": "localDB-sqlserverconnector","config": {
"connector.class": "io.debezium.connector.sqlserver.sqlServerConnector","database.hostname": "192.168.1.22","database.port": "1433","database.user": "user","database.password": "user_123","database.dbname": "localDB","database.server.name": "DEV1","table.whitelist": "dbo.testtabledebezium","database.history.kafka.bootstrap.servers": "192.168.1.81:32105","database.history.kafka.topic": "history_DB.DEV1","include.schema.changes": false,"transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": true,"transforms.unwrap.delete.handling.mode": "rewrite","snapshot.mode": "schema_only"
}
}
来自debezium的主题DEV1.dbo.testtabledebezium的结果
{"id":9,"column1":"t6","column2":1601480866593,"column3":18535,"__deleted":"false"}
sqlserver表的实际数据
|id| |column1| |column2 | |column3 | =>
|9 | |t6 | |2020-09-30 15:47:46.593| |2020-09-30|
实际要求
{"id":9,"column2":"2020-09-30 15:47:46.593","column3":"2020-09-30","__deleted":"false"}
解决方法
在连接器配置中添加“ time.precision.mode”:“ connect”。
引用https://debezium.io/documentation/reference/1.3/connectors/sqlserver.html#sqlserver-temporal-values
,您需要在debezium连接器配置中添加以下内容
{
"transforms":"unwrap,col2","transforms.col2.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.col2.target.type":"string","transforms.col2.field":"col2","transforms.col2.format":"YYYY-MM-DD hh:mm:ss","time.precision.mode":"connect"
}
也将其添加到col3。