HourlyPartitioner 失败的 S3 连接器

问题描述

当我们尝试使用认配置通过 S3 接收器连接器写入 S3 时,工作正常,没有任何问题。但是当我们尝试每小时分区失败并出现以下错误时。 请找到代码错误信息并帮助我们解决此问题

认:

{
  "value.converter.schemas.enable": "false","name": "tibconew1-test-s3standard-default-sink-connector","connector.class": "io.confluent.connect.s3.S3SinkConnector","tasks.max": "2","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","errors.tolerance": "all","topics": [
    "test.s3custom.default.dax.shipment.data","test.s3custom.default.dax.shipment.data","test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn"
  ],"topics.regex": "","errors.deadletterqueue.topic.name": "dlq_test.s3custom.default.dax.shipment.data","errors.deadletterqueue.context.headers.enable": "true","format.class": "io.confluent.connect.s3.format.json.JsonFormat","flush.size": "1000","s3.bucket.name": "test-stg-raw","s3.region": "us-east-1","s3.credentials.provider.class": "com.amazonaws.auth.InstanceProfileCredentialsProvider","s3.acl.canned": "bucket-owner-full-control","storage.class": "io.confluent.connect.s3.storage.S3Storage","topics.dir": "streams_dir","partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}

每小时:

{
  "value.converter.schema.registry.url": "https://confschema.test-dsol-core.testdigital-stg.com","value.converter.schemas.enable": "false","name": "test.s3custom.hourly.tibco.dax_shipment.dpp_asn.sink-connector","value.converter": "org.apache.kafka.connect.json.JsonConverter","topics": [
    "test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn"
  ],"errors.deadletterqueue.topic.name": "dlq_test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn.sink","flush.size": "10","partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner","locale": "en-US","timezone": "America/Chicago","timestamp.extractor": "RecordField","timestamp.field": "DPP_ASN.LST_UPDT_TS"
} 

错误

enter image description here

解决方法

终于找到原因了。由于从有效载荷收到的时间戳是一个无效格式,其中有额外的空间。所以我们在源端更正了格式。对于每小时分区器,连接器期望该值基于小时。 每小时分区:

io.confluent.connect.storage.partitioner.HourlyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH and

消息是:"LST_UPDT_TS":"2021-02-01 07:16:23.567"

更正为:"LST_UPDT_TS":"2015-08-01T17:00:00.69243-05:00"