“对于没有 MD5 摘要元数据的上传 InputStream,markSupported() 方法必须评估为真”在 Spring 集成 AWS

问题描述

更新spring-integration-aws-2.3.4 中存在错误

我正在将 SFTP (SftpStreamingMessageSource) 作为源与 S3 作为目标进行集成。 我有类似的 Spring Integration 配置:

const sentences = [
  { reference: "Beautiful" },{ reference: "Is beautiful" },{ reference: "This is beautiful and This forest is beautiful" },];

function getSentences() {
  const result = [];

  for (let i = sentences.length - 1; i >= 0; --i) {
    if (i - 1 >= 0) {
      var regexp = new RegExp(sentences[i - 1].reference + "$","ig");
      const res = sentences[i].reference.replace(regexp,'');
      result.push(res);
    } else {
      result.push(sentences[i].reference);
    }
  }
  return result;
}

console.log(getSentences());

启动后,出现以下错误 @Bean public S3MessageHandler.UploadMetadataProvider uploadMetadataProvider() { return (Metadata,message) -> { if ( message.getPayload() instanceof DigestInputStream) { Metadata.setContentType( MediaType.APPLICATION_JSON_VALUE ); // can not read stream to manually compute MD5 // Metadata.setContentMD5("BLABLA=="); // this is wrong approach: Metadata.setContentMD5(BinaryUtils.toBase64((((DigestInputStream) message.getPayload()).getMessageDigest().digest())); } }; } @Bean @InboundChannelAdapter(channel = "ftpStream") public MessageSource<InputStream> ftpSource(SftpRemoteFileTemplate template) { SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template); messageSource.setRemoteDirectory("foo"); messageSource.setFilter(new AcceptAllFileListFilter<>()); messageSource.setMaxFetchSize(1); messageSource.setLoggingEnabled(true); messageSource.setCountsEnabled(true); return messageSource; } ... @Bean @ServiceActivator(inputChannel = "ftpStream") public MessageHandler s3MessageHandler(AmazonS3 amazonS3,S3MessageHandler.UploadMetadataProvider uploadMetadataProvider) { S3MessageHandler messageHandler = new S3MessageHandler(amazonS3,"bucketName"); messageHandler.setLoggingEnabled(true); messageHandler.setCountsEnabled(true); messageHandler.setCommand(S3MessageHandler.Command.UPLOAD); messageHandler.setUploadMetadataProvider(uploadMetadataProvider); messageHandler.setKeyExpression(new ValueExpression<>("key")); return messageHandler; }

这是因为 "For an upload InputStream with no MD5 digest Metadata,the markSupported() method must evaluate to true." 在没有标记/重置支持的情况下生成 ftpSource 负载。我什至尝试使用 InputStream 将 InputStream 转换为 BufferedInputStream,例如关注

@Transformer

但没有成功,因为那时我收到消息“java.io.IOException: Stream closed”,因为 return new BufferedInputStream((InputStream) message.getPayload()); 正在调用 S3MessageHandler:338 过早关闭

如何在 Spring Integration AWS 中轻松为所有消息生成 MD5?

我正在使用 spring-integration-aws-2.3.4.RELEASE

解决方法

S3MessageHandler 这样做:

if (payload instanceof InputStream) {
                InputStream inputStream = (InputStream) payload;
                if (metadata.getContentMD5() == null) {
                    Assert.state(inputStream.markSupported(),"For an upload InputStream with no MD5 digest metadata,"
                                    + "the markSupported() method must evaluate to true.");
                    String contentMd5 = Md5Utils.md5AsBase64(inputStream);
                    metadata.setContentMD5(contentMd5);
                    inputStream.reset();
                }
                putObjectRequest = new PutObjectRequest(bucketName,key,inputStream,metadata);
            }

最后 Md5Utils.md5AsBase64() 在哪里关闭 InputStream - 对我们不利。

这是我们这边的疏漏。请提出 GH 问题,我们将尽快修复。或者随时提供贡献。

作为一种解决方法,我建议在此 S3MessageHandler 前面使用转换器,代码如下:

return org.springframework.util.StreamUtils.copyToByteArray(inputStream);

这样您就已经有一个 byte[] 作为 S3MessageHandler 的有效负载,它将使用不同的分支进行处理:

 else if (payload instanceof byte[]) {
                byte[] payloadBytes = (byte[]) payload;
                InputStream inputStream = new ByteArrayInputStream(payloadBytes);
                if (metadata.getContentMD5() == null) {
                    String contentMd5 = Md5Utils.md5AsBase64(inputStream);
                    metadata.setContentMD5(contentMd5);
                    inputStream.reset();
                }
                if (metadata.getContentLength() == 0) {
                    metadata.setContentLength(payloadBytes.length);
                }
                putObjectRequest = new PutObjectRequest(bucketName,metadata);
            }