1、流式查询简介
流式处理在大数据方面应用比较广泛。随着数据的爆发式增长,流式处理的方式也被应用到日常的工具中,如JDK的对于集合处理的Stream流、Redis5.0新增的数据结构Stream专门来处理消息等。
流式查询指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。
今天要分享的是Mybaits的流式查询。Mybatis流式查询通过使用游标一次处理一条数据的方式,减少了内存的占用,从而提高了大数据量查询的效率。
2、Mybaits流式查询
Mybaits
在3.4.0
版本增加了流式查询也就是游标Cursor
。更新日志传动门,Issues · mybatis/mybatis-3 · GitHub
2.1 游标简介
游标的使用和List一样,只不过返回的结果不同。
游标继承
Closeable
,Iterable
接口,所以游标是可关闭、可遍历的。包含三个方法:
isOpen
游标的开关是否打开
isConsumed
通过游标是否将拉取的数据消费完,也就是是否将数据全部取完。
getCurrentIndex
获取当前数据的位置,位置是从0开始的。
2.2 游标的使用
定义简单的查询方法,这里使用注解
@Select
的方式代替Xml
,方便测试。
@Select("select * from logan_log.logan_login")
Cursor<LoganLogin> testCursorQuery();
编写测试类。因为游标是可关闭的,所以使用完之后要想流一样关闭掉。这里采用
try-resource
的方式简化流的关闭。
@Test
void testCursorSql() {
try (Cursor<LoganLogin> cursor = loganLoginMapper.testCursorQuery()) {
cursor.forEach((ll) -> {
System.out.println("ll:" + JSON.toJSONString(ll));
System.out.println("ll cursor:" + cursor.getCurrentIndex());
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
处理结果,报错了。。。
原因分析:
-
A Cursor is already closed.
还没有处理数据,查询完数据之后游标被关闭了。 -
由于Mybatis帮我们做了自动事务的提交。查询完毕之后就会提交事务关闭连接,游标自然也被关闭了,就会出现这样的问题。游标需要占用数据库的连接,然后逐条的将数据返回的。
解决方案:
-
将整个方法放在一个事务里面,处理完数据之后在提交事务。比较暴力的解决方案就是使用声明式事务
@Transactional
。大事务的处理可能会影响业务的并发。 -
采用小颗粒度的编程式事务
TransactionTemplate
,只针对流式查询的结果包装到一个事务里面即可。 -
也可以采用手动提交事务,使用
SqlSessionFactory
手动建立连接和关闭。
编程式事务解决问题可能更方便一些,所以采用编程式事务的方法解决。
@Test
void testCursorSql2() {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
System.out.println("TransactionStatus1:" + status.hasSavepoint());
try (Cursor<LoganLogin> cursor = loganLoginMapper.testCursorQuery()) {
cursor.forEach((ll) -> {
System.out.println("ll:" + JSON.toJSONString(ll));
System.out.println("ll cursor:" + cursor.getCurrentIndex());
});
System.out.println("TransactionStatus2:" + status.hasSavepoint());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
结果正常打印
2.3 游标查询的升级
游标的使用总会便随着事务,对于开发者来说成了黄金搭档。能不能将其封装成一个方法来方便开发者调用呢,还真有。就是使用
ResultHandler
参数。有人就提出了对应的issues,同时也在
mybatis-3.4.6
得到了支持。
2.3.1 ResultHandler参数说明
ResultHandler
作为参数参数传递,返回值为Void。xml的配置需要指定ResultType
。
查询到的结果,也是封装在
ResultHandler
里面的ResultContext
中。
T getResultObject()
返回的数据结果
int getResultCount()
返回当前结果的记录数据。
boolean isStopped()
是否停止拉取数据
void stop()
中断数据拉取
2.3.2 ResultHandler的使用
ResultHandler
的使用,官方文档中已经给出使用案例。我们来测试一下。先定义一下方法。同样采用注解的方式替代
xml
。
@ResultType(LoganLogin.class)
@Select("select * from logan_log.logan_login")
void testStreamQuery(ResultHandler<LoganLogin> handler);
其中
ResultType
必须要指定,否则就会报错。
测试案例
@Test
void testStreamSql() {
loganLoginMapper.testStreamQuery(resultContext -> {
System.out.println("handleResult:" + JSON.toJSONString(resultContext.getResultObject()));
System.out.println("ResultCount:" + resultContext.getResultCount());
System.out.println("isStop:" + resultContext.isStopped());
System.out.println("---------------------------------------");
});
}
测试结果
2.4 fetchSize
的讨论
很多技术博客中都强调,fetchSize
可以控制每次拉取的数量。
但是笔者配置,测试之后发现并没有什么效果。在使用流式查询时,官方文档中也没有说明必须要配置。
元芳,你怎么看???
3、流式查询在Mybatis-Plus中的使用
mybatis中已经支持了流式查询,那么好用的MP怎么会不支持。MP在
3.5.4
中也得到了支持。
具体的方法如下:
/**
* 根据 entity 条件,查询全部记录
*
* @param queryWrapper 实体对象封装操作类(可以为 null)
* @param resultHandler 结果处理器 {@link ResultHandler}
* @since 3.5.4
*/
void selectList(@Param(Constants.WRAPPER) Wrapper<T> queryWrapper, ResultHandler<T> resultHandler);/**
* 根据 entity 条件,查询全部记录(并翻页)
* @param page 分页查询条件
* @param queryWrapper 实体对象封装操作类(可以为 null)
* @param resultHandler 结果处理器 {@link ResultHandler}
* @since 3.5.4
*/
void selectList(IPage<T> page, @Param(Constants.WRAPPER) Wrapper<T> queryWrapper, ResultHandler<T> resultHandler);
/**
* 根据 Wrapper 条件,查询全部记录
*
* @param queryWrapper 实体对象封装操作类
* @param resultHandler 结果处理器 {@link ResultHandler}
* @since 3.5.4
*/
void selectMaps(@Param(Constants.WRAPPER) Wrapper<T> queryWrapper, ResultHandler<Map<String, Object>> resultHandler);
/**
* 根据 Wrapper 条件,查询全部记录(并翻页)
*
* @param page 分页查询条件
* @param queryWrapper 实体对象封装操作类
* @param resultHandler 结果处理器 {@link ResultHandler}
* @since 3.5.4
*/
void selectMaps(IPage<? extends Map<String, Object>> page, @Param(Constants.WRAPPER) Wrapper<T> queryWrapper, ResultHandler<Map<String, Object>> resultHandler);
/**
* 根据 Wrapper 条件,查询全部记录
* <p>注意: 只返回第一个字段的值</p>
*
* @param queryWrapper 实体对象封装操作类(可以为 null)
* @param resultHandler 结果处理器 {@link ResultHandler}
* @since 3.5.4
*/
<E> void selectObjs(@Param(Constants.WRAPPER) Wrapper<T> queryWrapper, ResultHandler<E> resultHandler);
就拿最典型的
selectList
来测试。没有分页的查询。其中Wrapper主要是查询的
Sql
的条件拼接,也可以是空。ResultHandler
就是我们的流式返回了。
@Test
void testStreamPlus() {
loganLoginMapper.selectList(Wrappers.emptyWrapper(), resultContext->{
System.out.println("handleResult:" + JSON.toJSONString(resultContext.getResultObject()));
System.out.println("ResultCount:" + resultContext.getResultCount());
System.out.println("isStop:" + resultContext.isStopped());
System.out.println("---------------------------------------");
});
}
测试结果
有分页的查询和无分页方法是重载关系,增加了分页参数
Page
。其中Page
只是限定了拉取数据的总条数,但是并不会自动封装到Page
对象中。主要使用了,resultContext
的stop
的方法。
@Test
void testStreamPage() {
Page<LoganLogin> page = new Page<>(1, 2);
loganLoginMapper.selectList(page, Wrappers.emptyWrapper(), resultContext->{
System.out.println("当前处理第" + resultContext.getResultCount() + "条记录.");
System.out.println("handleResult:" + JSON.toJSONString(resultContext.getResultObject()));
System.out.println("isStop:" + resultContext.isStopped());
System.out.println("---------------------------------------");
});
System.out.println(JSON.toJSONString(page.getRecords()));
}
测试结果