您可以使用 Springs JdbcTemplate 来流式传输数据吗

问题描述

能否使用 Springs JdbcTemplate 查询数据库并将结果作为 Stream 提供?

解决方法

是的 - 但不是开箱即用的。

我发现这篇关于如何操作的文章非常有用:Using the Java 8 Stream API with Spring’s JdbcTemplate

受本文启发,我进行了改进:

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.rowset.ResultSetWrappingSqlRowSet;
import org.springframework.stereotype.Component;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@Slf4j
@Component
@RequiredArgsConstructor
public class QueryStreamer {

  private final NamedParameterJdbcTemplate jdbcTemplate;

  /**
   * Execute query and make result available for a Stream{@literal <T>} consumer
   * @param sql
   * @param parameters
   * @param clazz
   * @param consumer
   * @param <T>
   */
  public <T> void queryForStream(
    String sql,MapSqlParameterSource parameters,Class<T> clazz,java.util.function.Consumer<Stream<T>> consumer
  ) {
    queryForStream(sql,parameters,resultSetStream -> {
      BeanPropertyRowMapper<T> mapper = new TrimmingBeanPropertyRowMapper<>(clazz);
      consumer.accept(resultSetStream.map(r -> mapIt(r,mapper)));
      return null;
    });
  }

  // Build a Stream<ResultSet>
  private void queryForStream(
    String sql,java.util.function.UnaryOperator<Stream<ResultSet>> operator
  ) {
    jdbcTemplate.query(sql,resultSet -> {
      final ResultSetWrappingSqlRowSet rowSet = new ResultSetWrappingSqlRowSet(resultSet);
      final boolean parallel = false;

      Spliterator<ResultSet> spliterator = Spliterators.spliteratorUnknownSize(new Iterator<>() {
        @Override
        public boolean hasNext() {
          return rowSet.next();
        }

        @Override
        public ResultSet next() {
          return resultSet;
        }
      },Spliterator.IMMUTABLE);
      return operator.apply(StreamSupport.stream(spliterator,parallel));
    });
  }

  private static <T> T mapIt(ResultSet resultSet,BeanPropertyRowMapper<T> mapper) {
    try {
      return mapper.mapRow(resultSet,0);
    } catch (SQLException e) {
      throw new RuntimeException(e);
    }
  }

}

这就是你可以在 DAO 中使用它的方式 - 由于很好的多行字符串文字支持,所以在 Kotlin 中实现了:

import MyEntity
import QueryStreamer
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource
import org.springframework.stereotype.Component
import java.util.function.Consumer
import java.util.stream.Stream

@Component
open class MyEntityDAO(private val queryStreamer: QueryStreamer) {

  val sql = """
          SELECT column_1,column_2,column_3
            FROM my_entity_table
           WHERE some_criteria = 'met' 
          """.trimIndent()

  fun streamIt(consumer: Consumer<Stream<MyEntity>>) {
    queryStreamer.queryForStream(sql,MapSqlParameterSource(),MyEntity::class.java,consumer)
  }
}

就是这样 - 玩得开心:-)