问题描述
这是背景。我有一个操作可能会从 hbase 扫描许多行。因为行数可能很大,所以我想返回一个 Stream of rows。问题是:如何关闭 ResultScanner?
这样的方法
public <T> Stream<T> getResultStream(String tableName,Scan scan,RowMapper<T> mapper){
scan.setCaching(5000);//set number of rows to fetch for each rpc
Table table=this.getConnection().getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
return StreamSupport.stream(scanner.spliterator(),false).map(mapper::mapRow);
// scanner.close(); where to close it ?
}
显然我无法在此方法中关闭 ResultScanner。有什么优雅的方法吗?
解决方法
有办法。
首先我们观察到 Stream
实现了 AutoCloseable
。因此,我们可以实现一个 Stream 包装器,在其 scanner
方法中关闭 close()
实例。
您可以通过手动编写 Stream 包装器类来实现这一点。该类只需要将所有 Stream
API 调用委托给封装的 Stream
类。在 close()
的情况下,它还需要关闭 ResultScanner 资源。您可以使用包装类构造函数中的参数来提供。
它可能看起来像这样:
public class <T> MyStreamWrapper implements Stream<T> {
private Stream<T> stream;
private AutoCloseable resource;
public MyStreamWrapper(Stream<T> stream,AutoCloseable resource) {
this.stream = stream;
this.resource = resource;
}
@Override
public close() {
this.stream.close();
this.resource.close();
}
// methods to delegate all other Stream API methods to this.stream
}
(您的 IDE 可能能够为您生成骨架包装类以节省您的工作量。检查您的 IDE 的文档等)
您也可以使用使用 java.lang.reflect.Proxy
... 或其他方式实现的动态代理来实现这一点。
一旦你实现了包装类,你就可以包装并返回主流;例如
ResultScanner scanner = table.getScanner(scan);
Stream<T> stream =
StreamSupport.stream(scanner.spliterator(),false)
.map(mapper::mapRow);
return new MyStreamWrapper<>(stream,scanner);
为确保扫描器实际上已关闭,应将此方法的结果分配给 try with resources 语句中的资源变量。
但这并不优雅。