使用Java Stream消耗数据库游标 驱动程序分页手动分页

问题描述

我想使用Java Stream消耗数据库游标。我希望Java流根据需要提取和处理行,避免先将所有500万行加载到内存中,然后再进行处理。

是否可以在不将整个表加载到RAM的情况下使用它?

到目前为止,我的代码如下:

Cursor<Product> products = DAO.selectCursor(...);

// 1. Initialize variables
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
  Product p = it.next();
  // 2. Processing each row
  ...
}
// 3. Concluding (processing totals,stats,etc.)
double avg = total / count;
...

它的确运行良好,但是有点麻烦,我想利用Stream API。

解决方法

首先,我们必须讨论如何从数据库中获取数据。如果您打算查看大量记录,而又不想一次全部加载到内存中,则有两种选择:

  1. 对结果分页。
  2. 让驱动程序对结果进行分页。

如果您已经有一个基于Cursor的迭代器,可以根据需要检索分页数据,则可以使用JDK API中的SpliteratorsStreamSupport实用工具类将其转换为{ {1}}。

Stream

否则,您将必须构建自己的东西。

驱动程序分页

如果JDBC驱动程序支持fetch size属性,则可以执行以下操作:

Stream<Product> products = StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(cursor.iterator(),Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE),false)

这时,Connection con = ds.getConnection(); con.setAutoCommit(false); PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'",ResultSet.TYPE_FORWARD_ONLY); stm.setFetchSize(1000); ResultSet rs = stm.executeQuery(); 包含1000条记录的第一次获取,在您阅读上一页之前,它不会从数据库中检索更多信息。

所有这方面的棘手部分是,在完成所有记录的读取之前,您将无法关闭任何资源(即连接,准备好的语句和结果集),并且由于默认情况下我们要构建的流是惰性的,因此意味着我们必须打开所有这些资源,直到完成流为止。

也许最简单的方法是围绕此逻辑构建一个Iterator,并且当Iterator实际上到达所有数据的末尾时,可以关闭所有资源(即rs),或者另一种替代方法是关闭流(!rs.next())时的所有工作。

一旦有了迭代器,使用来自JDK API的Stream.onClose()Spliterators实用工具类就可以很容易地构建出一个流。

我的基本实现看起来像这样。这仅出于说明目的。您可能希望对您的特殊情况给予更多的爱。

StreamSupport

这里的关键是要注意,我们返回的流应该运行某些public Stream<String> getUsers() { DataSource ds = jdbcTemplate.getDataSource(); try { Connection conn = ds.getConnection(); conn.setAutoCommit(false); PreparedStatement stm = conn.prepareStatement("SELECT id FROM users",ResultSet.TYPE_FORWARD_ONLY); //fetch size is what guarantees only 1000 records at the time stm.setFetchSize(1000); ResultSet rs = stm.executeQuery(); Iterator<String> sqlIter = new Iterator<>() { @Override public boolean hasNext() { try { return rs.next(); } catch (SQLException e) { closeResources(conn,stm,rs); throw new RuntimeException("Failed to read record from ResultSet",e); } } @Override public String next() { try { return rs.getString("id"); } catch (SQLException e) { closeResources(conn,e); } } }; //turn iterator into a stream return StreamSupport.stream( Spliterators.spliteratorUnknownSize(sqlIter,false ).onClose(() -> { //make sure to close resources when done with the stream closeResources(conn,rs); }); } catch (SQLException e) { logger.error("Failed to process data",e); throw new RuntimeException(e); } } private void closeResources(Connection conn,PreparedStatement ps,ResultSet rs) { try (conn; ps; rs) { logger.info("Resources successfully closed"); } catch (SQLException e) { logger.warn("Failed to properly close database sources",e); } } 逻辑,因此当我们使用流时,必须确保在完成处理后执行onClose确保我们关闭所有活着的资源(例如stream.close()connstm)。

最好的方法也许是使用try-with-resources,以便尝试将关闭流。

rs

手动分页

另一种方法是您自己对结果进行分页,这取决于数据库,但是使用诸如limit和offset这样的select子句,您可以请求特定的记录页,对其进行处理,然后再检索更多内容。

try(Stream<String> users = userRepo.getUsers()){
    //print users to the main output retrieving 1K at the time
    users.forEach(System.out::println);
}

在这种情况下,迭代器将占用所有页面,完成后,请求下一页,直到在最后一页中找不到更多记录为止。

这种另一种方法的优点是可以在迭代器本身中立即控制资源。

我不会举一个例子,留给你尝试。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...