什么样的数据被路由到死信队列主题?

问题描述

我已经在 Kafka 中实现了死信队列错误处理。它工作并且数据被发送到DLQ主题。我不明白在 DLQ 主题中路由了哪些类型的数据。

This is my DLQ topics data

And this is the normal data that got sunk

第一张图是路由到DLQ主题的数据,第二张图是沉入数据库的普通数据。 有谁知道我使用 id 作为密钥时该密钥是如何更改的?

这是我的源和接收器属性:

    "name": "jdbc_source_postgresql_analytics","config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:postgresql://192.168.5.40:5432/abc","connection.user": "abc","connection.password": "********","topic.prefix": "test_","mode": "timestamp+incrementing","incrementing.column.name": "id","timestamp.column.name": "updatedAt","validate.non.null": true,"table.whitelist": "test","key.converter": "org.apache.kafka.connect.converters.IntegerConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": false,"value.converter.schemas.enable": false,"catalog.pattern": "public","transforms": "createKey,extractInt","transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey","transforms.createKey.fields": "id","transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractInt.field": "id","errors.tolerance": "all"

    }
}

sink properties: 
{
    "name": "es_sink_analytics","config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","key.converter.schemas.enable": "false","topics": "TEST","topic.index.map": "TEST:te_test","value.converter.schemas.enable": "false","connection.url": "http://192.168.10.40:9200","connection.username": "******","key.ignore": "false","errors.tolerance": "all","errors.deadletterqueue.topic.name": "dlq-error-es","errors.deadletterqueue.topic.replication.factor": "1","schema.ignore": "true","error.tolerance":"all"
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)