无法在 kafka 连接中使用接收器连接器

问题描述

我正在尝试在 kafka connect 中使用 S3 接收器连接器,它稍后启动并失败。

我的配置如下:

{
    "name": "my-s3-sink3","config": {
         "connector.class":"io.confluent.connect.s3.S3SinkConnector","tasks.max":"1","topics":"mysource.topic","s3.region":"us-east-1","s3.bucket.name": "topicbucket001","s3.part.size":"5242880","flush.size":"1","storage.class":"io.confluent.connect.s3.storage.S3Storage","format.class": "io.confluent.connect.s3.format.json.JsonFormat","partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner","schema.compatibility":"NONE"
        }
    }

我的 connect-distributed.properties 看起来像:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all

完整的错误日志:

[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0,groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data Failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0,groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0,offsetEpoch=Optional.empty,currentleader=leaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)],epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)

消息类型:

{
   "registertime": 1511985752912,"userid": "User_6","regionid": "Region_8","gender": "FEMALE"
}

enter image description here

新的错误日志:

enter image description here

解决方法

问题在于 Key SerDe。根据您的屏幕截图,关键数据是非 JSON 字符串:

User_2
User_9
etc

所以代替

key.converter=org.apache.kafka.connect.json.JsonConverter

使用

key.converter=org.apache.kafka.connect.storage.StringConverter

编辑:

为您的连接器配置试试这个,明确指定转换器(如@OneCricketeer 所建议的那样)

{
    "name": "my-s3-sink3","config": {
         "connector.class"               : "io.confluent.connect.s3.S3SinkConnector","tasks.max"                     : "1","topics"                        : "mysource.topic","s3.region"                     : "us-east-1","s3.bucket.name"                : "topicbucket001","s3.part.size"                  : "5242880","flush.size"                    : "1","key.converter"                 : "org.apache.kafka.connect.storage.StringConverter","value.converter"               : "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable": "false","storage.class"                 : "io.confluent.connect.s3.storage.S3Storage","format.class"                  : "io.confluent.connect.s3.format.json.JsonFormat","partitioner.class"             : "io.confluent.connect.storage.partitioner.DefaultPartitioner","schema.compatibility"          : "NONE"
        }
    }
,

所以我能够解决这个问题。明确指定转换器后,我能够解决反序列化错误,然后遇到了 S3 分段上传问题,通过将 S3 IAM 策略附加到 ECS 任务定义,向 S3 存储桶授予 Fargate 任务权限解决了该问题。 感谢 Robin Moffatt 提供上述解决方案!