如何返回流并关闭下划线源?

问题描述

这是背景。我有一个操作可能会从 hbase 扫描许多行。因为行数可能很大,所以我想返回一个 Stream of rows。问题是:如何关闭 ResultScanner?

这样的方法

    public <T> Stream<T> getResultStream(String tableName,Scan scan,RowMapper<T> mapper){
        scan.setCaching(5000);//set number of rows to fetch for each rpc 
        Table table=this.getConnection().getTable(tableName);
        ResultScanner scanner = table.getScanner(scan);
        return StreamSupport.stream(scanner.spliterator(),false).map(mapper::mapRow);
        // scanner.close(); where to close it ?
    }

显然我无法在此方法关闭 ResultScanner。有什么优雅的方法吗?

解决方法

有办法。

首先我们观察到 Stream 实现了 AutoCloseable。因此,我们可以实现一个 Stream 包装器,在其 scanner 方法中关闭 close() 实例。

您可以通过手动编写 Stream 包装器类来实现这一点。该类只需要将所有 Stream API 调用委托给封装的 Stream 类。在 close() 的情况下,它还需要关闭 ResultScanner 资源。您可以使用包装类构造函数中的参数来提供。

它可能看起来像这样:

public class <T> MyStreamWrapper implements Stream<T> {

    private Stream<T> stream;
    private AutoCloseable resource;

    public MyStreamWrapper(Stream<T> stream,AutoCloseable resource) {
        this.stream = stream;
        this.resource = resource;
    }

    @Override
    public close() {
        this.stream.close();
        this.resource.close();
    }
       
    // methods to delegate all other Stream API methods to this.stream
}  

(您的 IDE 可能能够为您生成骨架包装类以节省您的工作量。检查您的 IDE 的文档等)

您也可以使用使用 java.lang.reflect.Proxy ... 或其他方式实现的动态代理来实现这一点。

一旦你实现了包装类,你就可以包装并返回主流;例如

    ResultScanner scanner = table.getScanner(scan);
    Stream<T> stream = 
         StreamSupport.stream(scanner.spliterator(),false)
                      .map(mapper::mapRow);
    return new MyStreamWrapper<>(stream,scanner);

为确保扫描器实际上已关闭,应将此方法的结果分配给 try with resources 语句中的资源变量。

但这并不优雅。