如何在 kafka 连接中使用转换将字符串转换为时间戳,并使用 jdbc sink 连接器从 confluent 插入到 postgres 中?

问题描述

下面是我的 kafka-connect-sink.properties 文件 我使用的是 confluent-6.0.1。

name=enba-sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://IP:PORT/DB
connection.user=USERNAME
connection.password=PASSWORD
tasks.max=1
topics=postgresInsert
insert.mode=INSERT
table.name.format=schema."tableName"
auto.create=false
key.converter.schema.registry.url=http://localhost:8081 
key.converter.schemas.enable=false
value.converter.schemas.enable=false
config.action.reload=restart
value.converter.schema.registry.url=http://localhost:8081 
errors.tolerance=all 
errors.log.enable=true
errors.log.include.messages=true
print.key=true 
    
# Transforms
transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd HH:mm:ss
transforms.TimestampConverter.target.type=Timestamp
transforms.TimestampConverter.target.field=DATE_TIME

我使用的是 avro 数据,架构是:

 {\"type\":\"record\",\"name\":\"log\",\"namespace\":\"transform.name.space\",\"fields\":[{\"name\":\"TRANSACTION_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},{\"name\":\"MSISDN\",{\"name\":\"TRIGGER_NAME\",{\"name\":\"W_ID\",{\"name\":\"STEP\",{\"name\":\"REWARD_ID\",{\"name\":\"CAM_ID\",{\"name\":\"STATUS\",{\"name\":\"COMMENTS\",{\"name\":\"CCR_JSON\",{\"name\":\"DATE_TIME\",\"avro.java.string\":\"String\"}]}

基本上 Postgres 中的 DATE_TIME 列是 Timestamp 类型,我尝试从 avro 发送 String date 和 long 类型。 DATE_TIME = 2022-12-15 14:38:02

问题是如果我不使用转换,那么我会收到错误

ERROR: column "DATE_TIME" is of type timestamp with time zone but expression is of type character varying

如果我使用上面提到的转换,那么错误是:

    [2021-02-06 21:47:41,897] ERROR Error encountered in task enba-sink-postgres-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.TimestampConverter$Value',where consumed record is {topic='enba',partition=0,offset=69,timestamp=1612628261605,timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
org.apache.kafka.connect.errors.ConnectException: Schema Schema{com.package.kafkaconnect.Enbalog:STRUCT} does not correspond to a kNown timestamp type format

解决方法

我使用:

 # Transforms
transforms= timestamp
transforms.timestamp.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestamp.target.type= Timestamp
transforms.timestamp.field= DATE_TIME
transforms.timestamp.format= yyyy-MM-dd HH:mm:ss

由于某种原因,transforms=TimestampConverter 无法正常工作。