springboot-mybatis/JPA流式查询

news2024/11/13 15:47:39

项目中有几个batch需要检查所有的用户参与的活动的状态,以前是使用分页,一页一页的查出来到内存再处理,但是随着数据量的增加,效率越来越低。于是经过一顿搜索,了解到流式查询这么个东西,不了解不知道,这一上手,爱的不要不要的,效率贼高。项目是springboot 项目,持久层用的mybatis,整好mybatis的版本后,又研究了一下JPA的版本,做事做全套,最后又整了原始的JDBCTemplate 版本。废话不多说,代码如下:

第一种方式: springboot + mybatis 流式查询(网上说的有三种,我觉得下面这种最简单,对业务代码侵入性最小)

a) service 层代码:

package com.example.demo.service;

import com.example.demo.bean.CustomerInfo;
import com.example.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Slf4j
@Service
public class TestStreamQueryService {

    @Resource
    private ApplicationContext applicationContext;

    @Resource
    private UserMapper userMapper;

    @Resource
    private JdbcTemplate jdbcTemplate;


    @Transactional
    public void testStreamQuery(Integer status) {
        mybatisStreamQuery(status);
    }

    private void mybatisStreamQuery(Integer status) {
        log.info("waiting for query.....");
        Cursor<CustomerInfo> customerInfos = userMapper.getCustomerInfo(status);
        log.info("finish query!");
        for (CustomerInfo customerInfo : customerInfos) {
            //处理业务逻辑
            log.info("===============>{}", customerInfo.getId());
        }
    }
}

需要注意的有两点:

1.是userMapper 返回的是一个Cursor类型,其实就是用游标。然后遍历这个cursor,mybatis就会按照你在userMapper里设置的fetchSize 大小,每次去从数据库拉取数据

2.注意 testStreamQuery 方法上的 @transactional 注解,这个注解是用来开启一个事务,保持一个长连接(就是为了保持长连接采用的这个注解),因为是流式查询,每次从数据库拉取固定条数的数据,所以直到数据全部拉取完之前必须要保持连接状态。(顺便提一下,如果说不想让在这个testStreamQuery 方法内处理每条数据所作的更新或查询动作都在这个大事务内,那么可以另起一个方法 使用required_new 的事务传播,使用单独的事务去处理,使事务粒度最小化。如下图:)

b) mapper 层代码:

package com.example.demo.mapper;

import com.example.demo.bean.CustomerInfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.cursor.Cursor;
import org.springframework.stereotype.Repository;

@Mapper
@Repository
public interface UserMapper {

    Cursor<CustomerInfo> getCustomerInfo(Integer status);

}

mapper.xml 

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.mapper.UserMapper">

    <select id="getCustomerInfo" resultType="com.example.demo.bean.CustomerInfo" fetchSize="2" resultSetType="FORWARD_ONLY">
        select * from table_name where status = #{status} order by id
    </select>

</mapper>

 UserMapper.java 无需多说,其实要注意的是mapper.xml中的配置:fetchSize 属性就是上一步说的,每次从数据库取多少条数据回内存。resultSetType属性需要设置为 FORWARD_ONLY, 意味着,查询只会单向向前读取数据,当然这个属性还有其他两个值,这里就不展开了。

至此,springboot+mybatis 流式查询就可以用起来了,以下是执行结果截图:

c)读取200万条数据,每次fetchSize读取1000条,batch总用时50s左右执行完,速度是相当可以了,堆内存占用不超过250M,这里用的数据库是本地docker起的一个postgre, 远程数据库的话,耗时可能就不太一样了

 


第二种方式:springboot+JPA 流式查询

a)  service层代码:

package com.example.demo.service;

import com.example.demo.dao.CustomerInfoDao;
import com.example.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import java.util.stream.Stream;

@Slf4j
@Service
public class TestStreamQueryService {

    @Resource
    private ApplicationContext applicationContext;

    @Resource
    private UserMapper userMapper;

    @Resource
    private JdbcTemplate jdbcTemplate;

    @Resource
    private CustomerInfoDao customerInfoDao;

    @Resource
    private EntityManager entityManager;


    @Transactional(readOnly = true)
    public void testStreamQuery(Integer status) {
        jpaStreamQuery(status);
    }

    public void jpaStreamQuery(Integer status) {
        Stream<com.example.demo.entity.CustomerInfo> stream = customerInfoDao.findByStatus(status);
        stream.forEach(customerInfo -> {
            entityManager.detach(customerInfo); //解除强引用,避免数据量过大时,强引用一直得不到GC 慢慢会OOM
            log.info("====>id:[{}]", customerInfo.getId());
        });
    }

}

 注意点:1. 这里的@transactional(readonly=true) 这里的作用也是保持一个长连接的作用,同时标注这个事务是只读的。

                2. 循环处理数据时需要先:entityManager.detach(customerInfo); 解除强引用,避免数据量过大时,强引用一直得不到GC 慢慢会OOM。

b) dao层代码:

package com.example.demo.dao;

import com.example.demo.entity.CustomerInfo;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.QueryHints;
import org.springframework.stereotype.Repository;

import javax.persistence.QueryHint;
import java.util.stream.Stream;

import static org.hibernate.jpa.QueryHints.HINT_FETCH_SIZE;

@Repository
public interface CustomerInfoDao extends JpaRepository<CustomerInfo, Long> {

    @QueryHints(value=@QueryHint(name = HINT_FETCH_SIZE,value = "1000"))
    Stream<CustomerInfo> findByStatus(Integer status);
}

 注意点:1.dao方法的返回值是 Stream 类型

                2.dao方法的注解:@QueryHints(value=@QueryHint(name = HINT_FETCH_SIZE,value = "1000"))  这个注解是设置每次从数据库拉取多少条数据,自己可以视情况而定,不可太大,反而得不偿失,一次读取太多数据数据库也是很耗时间的。。。

自此springboot + jpa 流式查询代码就贴完了,可以happy了,下面是执行结果:

c)  batch读取两百万条数据,堆内存使用截图:

 

每次fetchSize拉取1000条数据,可以看到内存使用情况:初始内存不到100M,batch执行过程中最高内存占用300M出头然后被GC。读取效率:不到一分钟执行完(处理每一条数据只是打印一下id),速度还是非常快的。

d)  读取每一条数据时,不使用 entityManager.detach(customerInfo),内存使用截图:

最终OOM了,这里的entityManager.detach(customerInfo) 很关键。


第三种方式:使用JDBC template 流式查询

其实这种方式就是最原始的jdbc的方式,代码侵入性很大,逼不得已也不会使用

a) 上代码:

package com.example.demo.service;

import com.example.demo.dao.CustomerInfoDao;
import com.example.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

@Slf4j
@Service
public class TestStreamQueryService {

    @Resource
    private ApplicationContext applicationContext;

    @Resource
    private UserMapper userMapper;

    @Resource
    private JdbcTemplate jdbcTemplate;

    @Resource
    private CustomerInfoDao customerInfoDao;

    @Resource
    private EntityManager entityManager;


    public void testStreamQuery(Integer status) {
        jdbcStreamQuery(status);
    }

    private void jdbcStreamQuery(Integer status) {
        Connection conn = null;
        PreparedStatement pstmt = null;
        ResultSet rs = null;

        try {
            conn = jdbcTemplate.getDataSource().getConnection();
            conn.setAutoCommit(false);
            pstmt = conn.prepareStatement("select * from customer_info where status = " + status + " order by id", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            pstmt.setFetchSize(1000);
            pstmt.setFetchDirection(ResultSet.FETCH_FORWARD);
            rs = pstmt.executeQuery();
            while (rs.next()) {
                long id = rs.getLong("id");
                String name = rs.getString("name");
                String email = rs.getString("email");
                int sta = rs.getInt("status");
                log.info("=========>id:[{}]", id);
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        } finally {
            try {
                rs.close();
                pstmt.close();
                conn.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
    }
}

b) 执行结果:200万数据不到50秒执行完,内存占用最高300M

 

自此,针对不同的持久层框架, 使用不同的流式查询,其实本质是一样的,归根结底还是驱动jdbc做事情。以上纯个人见解,若有不当之处,请不吝指出,共同进步!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/99338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机毕业设计springboot+vue基本微信小程序的演出门票管理系统-票务转票系统

项目介绍 转票是一个传统的行业。根据当前发展现状,网络信息时代的全面普及,转票也在发生着变化,单就出票这一方面,利用手机预约考试正在逐步进入人们的生活。传统的转票方式,不仅会耗费大量的人力、时间,有时候还会出错。小程序系统伴随智能手机为我们提供了新的方向。手机微信…

新兴物种:程序猿的饲养指南

程序猿&#xff0c;一种主要生存在中国印度等亚太国家的新型猿类&#xff0c;生存活动以及消费活动的范围遍布世界各地&#xff0c;其中最优渥的产地位于美国硅谷。 主要的生存环境需求有&#xff0c;两脚兽一切的日用饮食以及物资需求。 该物种所获得的荣誉勋章有&#xff0…

Spring系列之SpringBoot概述及入门

SpringBoot入门 文章目录SpringBoot入门一、SpringBoot是什么&#xff1f;二、Spring的缺点1.配置繁琐2.依赖繁琐三、SpringBoot功能四、SpringBoot起步依赖原理五、SpringBoot快速入门总结一、SpringBoot是什么&#xff1f; SpringBoot是由Pivotal团队提供的全新框架&#xf…

Splunk Enterprise 9.0.X Crack

Splunk Enterprise 9.0.X Crack Splunk 有能力了解用户小型企业中实际发生的情况&#xff0c;并快速采取有目的的行动来了解用户和开发人员的情况。它能够轻松灵活地将简单信息转化为答案&#xff0c;以及自动机器学习支持的分析过程 搜索、分析和可视化&#xff0c;从您的所…

基于springcloud的简单易用的java分布式日志组件

真正的大师,永远都怀着一颗学徒的心&#xff01; 一、项目简介 基于springcloud的简单易用的java分布式日志组件 二、实现功能 支持基于traceId的日志记录 支持日志查询 支持日志缓冲队列 redis或者kafka 支持错误报警模块 支持内容组合查询功能 支持日志分应用统计条数…

【神奇bug】“金”、“⾦”不是同一个字

身为程序员&#xff0c;总能遇见那些神奇的bug。我前段时间遇到了 “中国黄金” 和 “中国黄⾦”&#xff0c;我咋看咋觉得是同一个词&#xff0c;但是程序就是判定不一致&#xff0c;十分郁闷&#xff0c;多方搜索&#xff0c;最后发现2个金居然不是一个字。真是个神奇的bug&a…

计算机基础学习笔记:操作系统篇之硬件结构,CPU Cache基础概念

三、CPU Cache的数据结构和读取过程 本文知识来源小林Coding阅读整理思考&#xff0c;原文链接请见该篇文章 Cache结构 CPU Cache 是由很多个 Cache Line 组成的&#xff0c;Cache Line 是 CPU 从内存读取数据的基本单位&#xff0c;而 Cache Line 是由各种**标志&#xff08;…

基于java+springmvc+mybatis+vue+mysql的智能新冠疫苗接种助手

项目介绍 随着全球新冠疫情的蔓延&#xff0c;基本所有的发达国家都开始了全民疫苗接种的行为&#xff0c;在我国更是进行了全民的新冠疫苗接种&#xff0c;为了能够让民众更加方便快捷的进行疫苗的接种我们通过java编程语言&#xff0c;后端ssm框架&#xff0c;前端vue技术开…

【Python百日进阶-数据分析】Day129 - plotly柱状图(条形图):px.bar()实例

文章目录四、实例4.1 Plotly Express条形图4.1.1 加拿大人口4.1.2 一维数据的条形图4.1.3 多维数据条形图4.1.4 彩条4.1.5 堆叠与分组条形图4.1.6 聚集成单色条4.1.7 带文本的条形图4.1.8 填充图案4.1.9 分面子图4.1.10 带Plotly Express的基本水平条形图4.1.11 配置水平条形图…

nacos服务注册与发现

目录 1. 应用系统架构的演变&#xff08;单应用>分布式&#xff09; 2. Spring Cloud Alibaba介绍 3. 开发示例 3.1 版本的选择 3.2 nacos安装 3.3 创建工程 3.3.1 创建父工程 3.3.2 创建服务提供者模块 3.3.2 服务消费者 3.4 测试 今天与大家们简单的聊一下&#…

远程的Win11主机没有连接屏幕,通过向日葵远程后只有一个640x480的分辨率选项

背景 远程的 Win11 主机没有连接屏幕&#xff0c;通过向日葵远程后只有一个 640x480 的分辨率选项&#xff0c;界面特别小&#xff0c;用起来很不方便。而且远程主机本身还无法调整分辨率&#xff0c;向日葵上面的工具栏里也没有分辨率这一选项。 问题分析 主要原因是远程主机…

threejs之圆弧

文章目录弧线相关方法getPointssetFromPoints直线样条曲线与贝塞尔曲线样条曲线贝塞尔曲线专栏目录请点击 弧线 一般我们绘制弧线都会使用ArcCurve来进行绘制&#xff0c;他是EllipseCurve的别名&#xff0c;关于他的所有的方法&#xff0c;我们都可以看EllipseCurve 官网例子…

Linux网络协议之UDP协议(传输层)

Linux网络协议之UDP协议(传输层) 文章目录Linux网络协议之UDP协议(传输层)1.深入理解传输层1.1 对于端口号的理解1.2 端口号范围1.3 常用的知名端口号1.4 进程和端口号的两个问题1.5 查看网络状态命令(netstat)2.UDP协议2.1 UDP协议格式2.2 UDP的特点2.3 面向数据报2.4 UDP的缓…

基于nodejs仿京东商城系统的设计与实现.zip(论文+源码+ppt文档+视频录制)

第一章绪论 3 1.1项目开发的背景和意义 3 1.2国内外研究的现状 3 1.3研究的主要内容 4 第2章系统相关技术介绍 4 2.1 相关技术介绍 4 2.2 系统环境开发条件 5 第三章系统分析 6 3.1可行性分析 6 3.1.1技术性可行性 6 3.1.2经济性可行性 6 3.1.3操作性可行性 7 3.2功能需求分析 …

(Java)【深基9.例1】选举学生会

【深基9.例1】选举学生会 一、题目描述二、输入格式三、输出格式四、样例输入五、样例输出六、失败经历七、正确代码八、正确思路及易错点&#xff08;1&#xff09;题目分析&#xff08;2&#xff09;思路分析&#xff08;3&#xff09;StringBuffer: 线程安全的可变字符串①S…

二十四、CANdelaStudio深入-ExtData编辑

本专栏将由浅入深的展开诊断实际开发与测试的数据库编辑,包含大量实际开发过程中的步骤、使用技巧与少量对Autosar标准的解读。希望能对大家有所帮助,与大家共同成长,早日成为一名车载诊断、通信全栈工程师。 本文介绍CANdelaStudio的ExtData编辑,欢迎各位朋友订阅、评论,…

推荐系统学习笔记-deep crossing

由来 2016年由微软提出&#xff0c; 完整的解决了特征工程、稀疏向量稠密化&#xff0c; 多层神经网络进行优化目标拟合等一系列深度学习在推荐系统的应用问题。 这个模型涉及到的技术比较基础&#xff0c;在传统神经网络的基础上加入了embedding&#xff0c; 残差连接等思想&…

K8s CICD实战

K8s Network之Ingress PDF路径: 链接&#xff1a;https://pan.baidu.com/s/17DxUD8KN7pU1UKIR1Ejemg 提取码&#xff1a;dwf5 一、如果项目需要修改某些代码&#xff0c;怎么办&#xff1f; &#xff08;1&#xff09;重新打成jar包 &#xff08;2&#xff09;重新制作Dockerf…

javaSE - 异常(Exception 或 RuntimeException)

一、异常的背景 1.1、初识异常 其实在我们开发中&#xff0c;就是代码出现意外状况。影响到程序的运行。 其实&#xff0c;在我们接触代码开始&#xff0c;就一直在接触异常&#xff0c;只是从来没有分类。 这点在java中&#xff0c;更加明显。 现在我们就来基本了解一下异常。…

web靶场搭建之帝国cms7.5

目录 一、漏洞描述 二、漏洞环境 三、环境搭建 四、漏洞复现 后台getshell(CVE-2018-18086) 漏洞原理&#xff1a; 漏洞复现&#xff1a; 源码审计&#xff1a; 代码注入 (CVE-2018-19462) 漏洞原理&#xff1a; 漏洞复现&#xff1a; 源码审计&#xff1a; 后台X…