如何修改使用Kafka Connect S3连接器上传的S3对象的文件名?

问题描述

我已经使用S3连接器已有几个星期了,我想更改连接器命名每个文件的方式。我正在使用HourlyBasedPartition,因此每个文件的路径已经足以让我找到每个文件,并且我希望文件名对于所有文件都是通用的,例如“ Data.json.gzip”(具有相应路径)从分区程序)。

例如,我想从这里开始:

<prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

对此:

<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>

此操作的目的是只调用一次S3即可稍后下载文件,而不必先查找文件名然后下载。

在名为“ kafka-connect-s3”的文件夹中搜索文件时,我找到了以下文件https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java,其末尾具有以下某些功能

private RecordWriter getWriter(SinkRecord record,String encodedPartition)
      throws ConnectException {
    if (writers.containsKey(encodedPartition)) {
      return writers.get(encodedPartition);
    }
    String commitFilename = getCommitFilename(encodedPartition);
    log.debug(
        "Creating new writer encodedPartition='{}' filename='{}'",encodedPartition,commitFilename
    );
    RecordWriter writer = writerProvider.getRecordWriter(connectorConfig,commitFilename);
    writers.put(encodedPartition,writer);
    return writer;
  }

  private String getCommitFilename(String encodedPartition) {
    String commitFile;
    if (commitFiles.containsKey(encodedPartition)) {
      commitFile = commitFiles.get(encodedPartition);
    } else {
      long startOffset = startOffsets.get(encodedPartition);
      String prefix = getDirectoryPrefix(encodedPartition);
      commitFile = fileKeyToCommit(prefix,startOffset);
      commitFiles.put(encodedPartition,commitFile);
    }
    return commitFile;
  }

  private String fileKey(String topicsPrefix,String keyPrefix,String name) {
    String suffix = keyPrefix + dirDelim + name;
    return StringUtils.isNotBlank(topicsPrefix)
           ? topicsPrefix + dirDelim + suffix
           : suffix;
  }

  private String fileKeyToCommit(String dirPrefix,long startOffset) {
    String name = tp.topic()
                      + fileDelim
                      + tp.partition()
                      + fileDelim
                      + String.format(zeroPadOffsetFormat,startOffset)
                      + extension;
    return fileKey(topicsDir,dirPrefix,name);
  }

我不知道这是否可以根据我的意愿进行定制,但似乎与我的意图接近/相关。希望对您有所帮助。

(也向Github提交了问题:https://github.com/confluentinc/kafka-connect-storage-cloud/issues/369

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)