Java / Spring引导-流式传输大文件的最佳做法

问题描述

我正在通过获取预签名链接,使用WebClient调用并使用InputStream / OutputStream,PipedInputStream / PipedOutputStream和Flux ByteBuffer类型返回流文件的方式,从S3存储桶中流式传输大文件。我这样做是为了避免在返回给用户之前将整个文件加载到内存中,并且在某种程度上确实可行,并且当我使用探查器运行应用程序时,可以看到一些积极的结果。但是,这也是大量反复试验的结果,有些人坦率地说,我不确定这是否是最有效,最优雅的解决方案。

我的问题是:这是获得最佳性能(最低的cpu,内存堆使用率和响应时间)的最佳实践吗?

相关代码如下。

Controller类中的端点定义:

    @GetMapping (value = "/stream/{filename}",produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
    public void stream(@PathVariable("filename") String filename,final HttpServletResponse response) throws Exception {
        s3Service.getAsync(filename,response);
    }

“ getAsync”方法获取“ readAsInputStream”方法的预签名链接,并获取响应头的ObjectMetadata):

    public void getAsync(final String key,final HttpServletResponse response) throws Exception {
        final Map<String,Object> map = getMetadataAndPresignedLink(key);
        final String url = map.get("url").toString();
        final ObjectMetadata Metadata = (ObjectMetadata)map.get("Metadata");

        InputStream is = readAsInputStream(url);
        response.setHeader("Content-disposition","attachment; filename=\"" + key + "\"");
        response.setContentLength((int)Metadata.getContentLength());
        int read=0;
        byte[] bytes = new byte[1024];
        OutputStream os = response.getoutputStream();

        while((read = is.read(bytes))!= -1){
            os.write(bytes,read);
        }
        os.flush();
        os.close();
    }

“ readAsInputStream”:

    private InputStream readAsInputStream(String url) throws IOException {
        PipedOutputStream osPipe = new PipedOutputStream();
        PipedInputStream isPipe = new PipedInputStream(osPipe);

        ClientResponse response = webClient.get().uri(url)
                .accept(MediaType.APPLICATION_OCTET_STREAM)
                .exchange()
                .block();
          Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
                .doOnError(t -> {
                    logger.error("Error reading body.",t);
                    // close pipe to force InputStream to error
                    try {
                        isPipe.close();
                        osPipe.close();
                    } catch (IOException ioe) {
                        logger.error("Error closing streams",ioe);
                    }
                })
                .doFinally(s -> {
                    try {
                        osPipe.close();
                    } catch (Exception ioe) {
                        logger.error("Error closing streams",ioe);
                    }
                });

        DataBufferUtils.write(body,osPipe)
                .subscribe(DataBufferUtils.releaseConsumer());
        return isPipe;
    }

WebClient配置(可在实现WebMvcConfigurer的Configuration类中找到):

    @Bean
    WebClient webClient() {
        DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
        factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.NONE);
        TcpClient tcpClient = TcpClient
                .create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,DEFAULT_TIMEOUT)
                .doOnConnected(connection -> {
                    connection.addHandlerLast(new ReadTimeoutHandler(DEFAULT_TIMEOUT,TimeUnit.MILLISECONDS));
                    connection.addHandlerLast(new WriteTimeoutHandler(DEFAULT_TIMEOUT,TimeUnit.MILLISECONDS));
                });

        return WebClient.builder()
                .uriBuilderFactory(factory)
                .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
                .build();
    }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)