如何获取来自debezium的所有日期时间值到yyyy-mm-dd-ss.zzz格式

问题描述

我想知道如何获取来自Debezium sql服务器连接器的所有DATETIME(io.debezium.time.Timestamp),格式为YYYY-MM-DD hh:mm:ss。现在,这是以纪元int64格式给出的,这将很难使用spark转换数据。

用于以下的debezium sql服务器连接器的配置:

{
    "name": "localDB-sqlserverconnector","config": {
        "connector.class": "io.debezium.connector.sqlserver.sqlServerConnector","database.hostname": "192.168.1.22","database.port": "1433","database.user": "user","database.password": "user_123","database.dbname": "localDB","database.server.name": "DEV1","table.whitelist": "dbo.testtabledebezium","database.history.kafka.bootstrap.servers": "192.168.1.81:32105","database.history.kafka.topic": "history_DB.DEV1","include.schema.changes": false,"transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": true,"transforms.unwrap.delete.handling.mode": "rewrite","snapshot.mode": "schema_only"
        
    }
}

来自debezium的主题DEV1.dbo.testtabledebezium的结果

{"id":9,"column1":"t6","column2":1601480866593,"column3":18535,"__deleted":"false"}  

sqlserver表的实际数据

|id|    |column1|   |column2                |   |column3   | =>
|9 |    |t6     |   |2020-09-30 15:47:46.593|   |2020-09-30|

实际要求

{"id":9,"column2":"2020-09-30 15:47:46.593","column3":"2020-09-30","__deleted":"false"}  

解决方法

在连接器配置中添加“ time.precision.mode”:“ connect”。

引用https://debezium.io/documentation/reference/1.3/connectors/sqlserver.html#sqlserver-temporal-values

,

您需要在debezium连接器配置中添加以下内容

{
    "transforms":"unwrap,col2","transforms.col2.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.col2.target.type":"string","transforms.col2.field":"col2","transforms.col2.format":"YYYY-MM-DD hh:mm:ss","time.precision.mode":"connect"
} 

也将其添加到col3。