问题描述
尝试使用kafka-connect-jdbc
将数据接收到Postgresql,但是数据库中的某些值与kafka消息值不同。
接收器配置:
{
"name": "test-sink","config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","connection.url": "jdbc:postgresql://localhost:5432/test_db?user=test&password=test","dialect.name": "PostgresqlDatabaseDialect","topics.regex": "test.public.(.*)","transforms": "dropPrefix,unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter","transforms.dropPrefix.regex": "test.public.(.*)","transforms.dropPrefix.replacement": "public.test_$1","auto.create": "true","auto.evolve": "true","insert.mode": "upsert","pk.fields": "id","pk.mode": "record_key","delete.enabled": "true","batch.size": "1"
}
}
kafka主题内的消息:
{ “模式”:{ “ type”:“ struct”, “字段”:[ { “ type”:“ struct”, “字段”:[ { “ type”:“ int64”, “可选”:false, “ field”:“ id” }, { “ type”:“ int64”, “可选”:true, “ field”:“ test_c_id” }, { “ type”:“ string”, “可选”:true, “ field”:“ test_b_c” }, { “ type”:“ string”, “可选”:true, “ field”:“ test_a_t” }, { “ type”:“ array”, “项目”:{ “ type”:“ string”, “可选”:true }, “可选”:true, “ field”:“ test_p” }, { “ type”:“ int64”, “可选”:true, “ field”:“ test_d_id” } ], “可选”:true, “ name”:“ test.public.test.Value”, “ field”:“之前” }, { “ type”:“ struct”, “字段”:[ { “ type”:“ int64”, “可选”:false, “ field”:“ id” }, { “ type”:“ int64”, “可选”:true, “ field”:“ test_c_id” }, { “ type”:“ string”, “可选”:true, “ field”:“ test_b_c” }, { “ type”:“ string”, “可选”:true, “ field”:“ test_a_t” }, { “ type”:“ array”, “项目”:{ “ type”:“ string”, “可选”:true }, “可选”:true, “ field”:“ test_p” }, { “ type”:“ int64”, “可选”:true, “ field”:“ test_d_id” } ], “可选”:true, “ name”:“ test.public.test.Value”, “ field”:“之后” }, { “ type”:“ struct”, “字段”:[ { “ type”:“ string”, “可选”:false, “ field”:“版本” }, { “ type”:“ string”, “可选”:false, “ field”:“连接器” }, { “ type”:“ string”, “可选”:false, “ field”:“名称” }, { “ type”:“ int64”, “可选”:false, “ field”:“ ts_ms” }, { “ type”:“ string”, “可选”:true, “ name”:“ io.debezium.data.Enum”, “版本”:1, “参数”:{ “ allowed”:“ true,last,false” }, “ default”:“ false”, “ field”:“快照” }, { “ type”:“ string”, “可选”:false, “ field”:“ db” }, { “ type”:“ string”, “可选”:false, “ field”:“ schema” }, { “ type”:“ string”, “可选”:false, “ field”:“ table” }, { “ type”:“ int64”, “可选”:true, “ field”:“ txId” }, { “ type”:“ int64”, “可选”:true, “ field”:“ lsn” }, { “ type”:“ int64”, “可选”:true, “ field”:“ xmin” } ], “可选”:false, “ name”:“ io.debezium.connector.postgresql.source”, “ field”:“ source” }, { “ type”:“ string”, “可选”:false, “ field”:“ op” }, { “ type”:“ int64”, “可选”:true, “ field”:“ ts_ms” }, { “ type”:“ struct”, “字段”:[ { “ type”:“ string”, “可选”:false, “ field”:“ id” }, { “ type”:“ int64”, “可选”:false, “ field”:“ total_order” }, { “ type”:“ int64”, “可选”:false, “ field”:“ data_collection_order” } ], “可选”:true, “ field”:“交易” } ], “可选”:false, “ name”:“ test.public.test.Envelope” }, “有效载荷”:{ “之前”:null, “之后”:{ “ id”:4441, “ test_c_id”:3606, “ test_b_c”:“ QWERTY”, “ test_a_t”:null, “ test_p”:[ “ qwe”, “ asd”, “ zxc” ], “ test_d_id”:22827 }, “资源”: { “ version”:“ 1.2.2.Final”, “ connector”:“ postgresql”, “ name”:“ test”, “ ts_ms”:1599543319277, “ snapshot”:“ false”, “ db”:“测试”, “ schema”:“ public”, “ table”:“ test”, “ txId”:3914206, “ lsn”:108940649328, “ xmin”:null }, “ op”:“ u”, “ ts_ms”:1599543319509, “交易”:null } }
例如:
Kafka Sink DB
---------------------------------------
test_c_id: 3606 test_c_id: 22632
test_b_c: QWERTY test_b_c: null
大约5-10%的数据不匹配,并且所有数据仅出现在20列中的2列中。
有人知道什么可能导致这种问题吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)