问题描述
我正在使用具有融合 kafka 代码的融合 s3 接收器连接器作为基础 kafka-connect(v5.2.1)。
最初,MysqL cdc 是以 JSON(使用 maxwell)的形式写入 kafka 主题(未编写架构)。 这个kafka连接器从上面的apache kafka集群中读取数据并写入s3。
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
我正在使用具有以下连接器配置的分布式连接器:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector","behavior.on.null.values": "ignore","s3.region": "ap-southeast-1","flush.size": "1000","schema.compatibility": "NONE","topics": "audit","tasks.max": "3","s3.part.size": "5242880","timezone": "UTC","locale": "en_US_POSIX","retry.backoff.ms": "100","format.class": "io.confluent.connect.s3.format.json.JsonFormat","partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner","name": "s3-sink-connector-um-cdc-events","storage.class": "io.confluent.connect.s3.storage.S3Storage","s3.bucket.name": "tickledb/prod/audit","timestamp.extractor": "Record","s3.retry.backoff.ms": "100","rotate.schedule.interval.ms": "10000"
}
虽然这在一般情况下有效,但我一直看到这个错误。
task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data Failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): Expected space separating root-level values
at [Source: (byte[])"3d5b807d-a6c7-43da-bbcc-d40efe9753a7"; line: 1,column: 3]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): Expected space separating root-level values
at [Source: (byte[])"3d5b807d-a6c7-43da-bbcc-d40efe9753a7"; line: 1,column: 3]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:638)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1635)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1375)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokennotinObject(UTF8StreamJsonParser.java:830)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
在 Jackson 中出现解析错误。此数据来自 MysqL 的 cdc 事件。因此不可能更改这些中的数据(没有破坏性更改)。如果 kafka 正在接受 json 数据,那么它对于 kafka-connector 也应该是合法的。
解决方法
我没有找到失败的原因,但我更新了连接器配置以防止任务进入“失败”状态并捕获导致失败的实际消息。
更新配置字段
function get_status_dropdown_values() {
$currentpassword = array();
$table_id = $this->input->post("ids",true);
$row = $this->db->query("SELECT status from contact where id IN('$table_id')")->row();
echo $currentpassword = $row->status;
return $currentpassword;
}
使用此更新后的配置,事件解析不会停止我的任务。它将消息发送到配置的 DLQ。但是,它不会将实际的错误消息发送到 DLQ,只会发送解析失败的消息。