mysql JDBC的三种查询(普通、流式、游标)

news2025/1/17 2:59:58

使用JDBC向mysql发送查询时,有三种方式:

  • 常规查询:JDBC驱动会阻塞的一次性读取全部查询的数据到 JVM 内存中,或者分页读取
  • 流式查询:每次执行rs.next时会判断数据是否需要从mysql服务器获取,如果需要触发读取一批数据(可能n行)加载到 JVM 内存进行业务处理
  • 游标查询:通过 fetchSize 参数,控制每次从mysql服务器一次读取多少行数据。

1、常规查询

public static void normalQuery() throws SQLException {
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
    PreparedStatement statement = connection.prepareStatement(sql);
    //statement.setFetchSize(100); //不起作用
    ResultSet resultSet = statement.executeQuery();
    
    while(resultSet.next()){
        System.out.println(resultSet.getString(2));
    }
    resultSet.close();
    statement.close();
    connection.close();
}

1)说明:

  1. 第四行设置featchSize不起作用。
  2. 第五行statement.executeQuery()执行查询会阻塞,因为需要等到所有数据返回并放到内存中;接下来每次执行resultSet.next()方法会从内存中获取数据。

2)将jvm内存设置较小(-Xms16m -Xmx16m),对于大数据的查询会产生OOM:

为了避免OOM,通常我们会使用分页查询,或者下面的两种方式。

2、流式查询

public static void streamQuery() throws Exception { 
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
    PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement.setFetchSize(Integer.MIN_VALUE); 
    //或者通过 com.mysql.jdbc.StatementImpl
    ((StatementImpl) statement).enableStreamingResults();
    
    ResultSet rs = statement.executeQuery();
    while (rs.next()) {
        System.out.println(rs.getString(2));
    }
    rs.close();
    statement.close();
    connection.close();
}

2.1)流式查询的条件:

随着大数据的到来,对于百万、千万的数据使用流式查询可以有效避免OOM。在执行statement.executeQuery()时不会从TCP响应流中读取完所有数据,当下面执行rs.next()时会按照需要从TCP响应流中读取部分数据。

  1. 创建Statement的时候需要制定ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
  2. 设置fetchSize位Integer.MIN_VALUE

或者通过com.mysql.jdbc.StatementImpl的enableStreamingResults()方法设置。二者是一致的。看mysql的jdbc(com.mysql.jdbc.StatementImpl)源码:

2.2)流式查询原理:

1)基本概念

我们要知道jdbc客户端和mysql服务器之间是通过TCP建立的通信,使用mysql协议进行传输数据。首先声明一个概念:在三次握手建立了TCP连接后,就可以在这个通道上进行通信了,直到关闭该连接。

在 TCP 中发送端和接收端**可以是客户端/服务端,也可以是服务器/客户端**,通信的双方在任意时刻既可以是接收数据也可以是发送数据(全双工)。在通信中,收发双方都不保持记录的边界,所以需要按照一定的协议进行表示。在mysql中会按照mysql协议来进行交互。

有了上面的概念,我们重新来定义这两种查询:

在执行st.executeQuery()时,jdbc驱动会通过connection对象和mysql服务器建立TCP连接,同时在这个链接通道中发送sql命令,并接受返回。二者的区别是:

  1. 普通查询:也叫批量查询,jdbc客户端会阻塞的一次性从TCP通道中读取完mysql服务的返回数据;
  2. 流式查询:分批的从TCP通道中读取mysql服务返回的数据,每次读取的数据量并不是一行(通常是一个package大小),jdbc客户端在调用rs.next()方法时会根据需要从TCP流通道中读取部分数据。(并不是每次读区一行数据,网上说的几乎都是错的!)

2)源码查看:

从statement.executeQuery()方法跟进去,主要的调用连如下:

protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly,
            Field[] metadataFromCache, boolean isBatch) throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {
            MySQLConnection locallyScopedConnection = this.connection;
            rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, this.resultSetType, this.resultSetConcurrency,
                            createStreamingResultSet, this.currentCatalog, metadataFromCache, isBatch);
            return rs;
        }
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency,
            boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
        synchronized (getConnectionMutex()) {
            return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                        cachedMetadata);
        }
}
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,
            int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
        Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);
        ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket,
                    false, -1L, cachedMetadata);
        return rs;
}
ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,
            String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
        ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);
        return topLevelResultSet;
}
protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency,
            boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
            com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults,
                    catalog, isBinaryEncoded, metadataFromCache);
            return results;
        }
}
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,
            boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
        Buffer packet; // The packet from the server
        RowData rowData = null;
        if (!streamResults) {
            rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
        } else {
            rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
            this.streamingData = rowData;
        }
        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,
                resultSetConcurrency, isBinaryEncoded);
        return rs;
}

说明:

  1. sqlQueryDirect()方法中的sendCommand会通过io发送sql命令请求到mysql服务器,并获取返回流mysqlOutput
  2. getResultSet()方法会判断是否是流式查询还是批量查询。MySQL驱动会根据不同的参数设置选择对应的ResultSet实现类,分别对应三种查询方式:
  • RowDataStatic 静态结果集,默认的查询方式,普通查询
  • RowDataDynamic 动态结果集,流式查询
  • RowDataCursor 游标结果集,服务器端基于游标查询

看上述代码(41行),对于批量查询:readSingleRowSet方法会循环掉用nextRow方法获取所有数据,然后放到jvm内存的rows中:

对于流式查询:直接创建RowDataDynamic对象返回。后面在掉用rs.next()获取数据时会根据需要从mysqlOutput流中读取数据。

2.3)流式查询的坑:

public static void streamQuery2() throws Exception { 
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
    //statement1
    PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement.setFetchSize(Integer.MIN_VALUE); 
    ResultSet rs = statement.executeQuery();
    if (rs.next()) {
        System.out.println(rs.getString(2));
    }
    //statement2
    PreparedStatement statement2 = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement2.setFetchSize(Integer.MIN_VALUE); 
    ResultSet rs2 = statement2.executeQuery();
    if (rs2.next()) {
        System.out.println(rs2.getString(2));
    }
//      rs.close();
//      statement.close();
//      connection.close();
}

执行结果:

test1
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@45c8e616 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:869)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:865)
	at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:3217)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2453)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2482)
	at com.mysql.jdbc.StatementImpl.executeSimpleNonQuery(StatementImpl.java:1465)
	at com.mysql.jdbc.StatementImpl.setupStreamingTimeout(StatementImpl.java:726)
	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1939)
	at com.tencent.clue_disp_api.MysqlTest.streamQuery2(MysqlTest.java:79)
	at com.tencent.clue_disp_api.MysqlTest.main(MysqlTest.java:25)

MySQL Connector/J 5.1 Developer Guide中原文:

There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown. 也就是说当通过流式查询获取一个ResultSet后,通过next迭代出所有元素之前或者调用close关闭它之前,不能使用同一个数据库连接去发起另外一个查询,否者抛出异常(第一次调用的正常,第二次的抛出异常)。

2.4)抓包验证:

查看3307 > 62169的包可以发现,ack都是1324,证明都是针对当时sql请求的返回数据。

3、游标查询

public static void cursorQuery() throws Exception {
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false&useCursorFetch=true", "root", "123456");
    ((JDBC4Connection) connection).setUseCursorFetch(true); //com.mysql.jdbc.JDBC4Connection
    Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement.setFetchSize(2);    
    ResultSet rs = statement.executeQuery(sql);    
    while (rs.next()) {
        System.out.println(rs.getString(2));
        Thread.sleep(5000);
    }
    
    rs.close();
    statement.close();
    connection.close();
}

1)说明:

  • 在连接参数中需要拼接useCursorFetch=true;
  • 创建Statement时需要设置ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
  • 设置fetchSize控制每一次获取多少条数据

2)抓包验证:

通过wireshark抓包,可以看到每执行一次rs.next() 就会向mysql服务发送一个请求,同时mysql服务返回两条数据:

3)游标查询需要注意的点:

由于MySQL方不知道客户端什么时候将数据消费完,而自身的对应表可能会有DML写入操作,此时MySQL需要建立一个临时空间来存放需要拿走的数据。因此对于当你启用useCursorFetch读取大表的时候会看到MySQL上的几个现象:

  1. IOPS飙升 (IOPS (Input/Output Per Second):磁盘每秒的读写次数)
  2. 磁盘空间飙升
  3. 客户端JDBC发起SQL后,长时间等待SQL响应数据,这段时间就是服务端在准备数据
  4. 在数据准备完成后,开始传输数据的阶段,网络响应开始飙升,IOPS由“读写”转变为“读取”。
  5. CPU和内存会有一定比例的上升

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/550755.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

找计算机研究的论文18个平台

虽然说目前arvix是计算机领域跟进最新研究成果论文的网站,有时候我们也需要找一些其他的好论文,比如一个很久之前的。我们整理了18个相关平台,包括几个可以免费下载和阅读CS相关技术论文的网站,收录到 找计算机研究的论文18个平台…

secure CRT 常见问题配置

文章目录 颜色主题如何切换 SecureCRT 颜色主题如何新建SecureCRT 颜色 主题如何拷贝我的颜色主题,主题名为pic 系统间拷贝基于clipboard的文字shell下的VIM系统间拷贝1. 确保 ubuntu 上的 vim 支持 clipboard 特性2. 确保 图形shell下的 vim(gvim) 支持 系统间拷贝3. 确保 文字…

004 - STM32固件库GPIO(三)位带操作

目前掌握的对GPIO引脚的输入输出操作只能使用BSRRL/H、I/ODR寄存器,记得以前学51的时候,对于引脚的输入输出可以采用关键字sbit实现位定义,例如 sbit LED1 P1^3;在STM32中没有类似于sbit一样的关键字,但是提供了位带操作来实现类似于51的为…

ARM的状态传送器指令、软中断指令与协处理指令(软中断具体实现)

1.状态寄存器传送指令: 作用:访问(读写)CPSR寄存器 CPSR寄存器结构图: 前八位的作用: Bit[4:0] :不同的电平组合表示不同的模式,[10000]User [10001]FIQ [10010]IRQ [10011]SVC …

【Hadoop】二、Hadoop MapReduce与Hadoop YARN

文章目录 二、Hadoop MapReduce与Hadoop YARN1、Hadoop MapReduce1.1、理解MapReduce思想1.2、Hadoop MapReduce设计构思1.3、Hadoop MapReduce介绍1.4、Hadoop MapReduce官方示例1.5、Map阶段执行流程1.6、Reduce阶段执行流程1.7、Shuffle机制 2、Hadoop YARN2.1、Hadoop YARN…

导入源码至Android Studio

导入源码至Android Studio 参考: Android源码环境搭建(aosp Ubuntu 16.04) 使用如下的步骤: 1.. build/envsetup.sh (source可以用 .代替,即". build/envsetup.sh") 2.lunch,并选择要编译的项…

jmeter请求Sse长链接接口

文章目录 1.背景1.1 什么是SSE接口 2. **解决思路-尝试方法⬇️:**2.1 🏳️‍🌈 **postman-sse请求结果**2.2 **⚡ jmeter报错**2.3 ☀️**封装此SSE接口**2.3.1 ❌httpclient2.3.2 ❌HttpURLConnection2.3.3 ✔️okhttp3 3. jmeter-beanshel…

跟我一起使用 compose 做一个跨平台的黑白棋游戏(3)状态与游戏控制逻辑

前言 在上一篇文章中,我们已经完成了黑白棋的界面设计与编写,今天这篇文章我们将完成状态控制和游戏逻辑代码的编写。 正如第一篇文章所述,在本项目中,我们需要实现不依赖于平台的状态管理,也就是使用 Flow 和 compo…

浏览器扩展一些好用插件

给浏览器添加一些插件功能,能够让我们用的更方便,开发中非常实用,下面直接开始 我们这里选择的是微软自带的Microsoft Edge浏览器(谷歌也行。这两款浏览器都是非常好用的) 我们打开浏览器找到扩展应用这个,…

opencv 中值滤波

中值滤波是一种常用的图像滤波算法,是在像素点周围进行多个点的中值滤波,将点的灰度值根据其周围像素点的灰度值进行平均,并使这些点的灰度值具有相似性,以达到平滑去噪的目的。中值滤波在图像处理中应用广泛,在图像滤…

利用java编写的项目设备调配系统代码示例(内含5种设备调配的算法)

利用java编写的项目设备调配系统代码示例(内含5种设备调配的算法) 一、设备调配方案二、设备匹配算法三、代码实现(java) 最近在做一个项目设备调配系统,分享一些干货!!! 一、设备…

Godot引擎 4.0 文档 - 入门介绍 - Godot 关键概念概述¶

本文为Google Translate英译中结果,DrGraph在此基础上加了一些校正。英文原版页面:Overview of Godots key concepts — Godot Engine (stable) documentation in English Godot 关键概念概述 每个游戏引擎都围绕您用来构建应用程序的抽象展开。在 Godo…

【mysql】库的操作+表的操作

文章目录 启动mysql登录mysql1.MySQL环境安装与基础认识修改端口号连接服务器服务器,数据库,表关系建表 第二讲_库与表的操作1.创建数据库2.创建数据库案例3.指明字符集和校验集校验规则对数据库的影响不区分大小写的查询以及结果:区分大小写…

SQL Backup Master 6.3.6 Crack

SQL Backup Master 能够为用户将 SQL Server 数据库备份到一些简单的云存储服务中,例如 Dropbox、OneDrive、Amazon S3、Microsoft Azure、box,最后是 Google Drive。它能够将数据库备份到用户和开发者的FTP服务器上,甚至本地机器甚至网络服务…

速通二次型、二次型标准型、二次型规范型

浅过二次型 理解二次型可以从二次型的多项式入手: 显然,在系数都为实数的情况下,二次型矩阵即为一个实对称矩阵。 取一个代入值的例子就是: 二次型的标准型 OK,再从二次型的标准型的多项式入手,如下&…

FPGA System Planner(FSP)使用手册

FSP工具是cadence公司为了FPGA/PCB协同设计而推出的一个解决方案工具包。它的主要工作是由软件来自动生成、优化FPGA芯片的管脚分配,提高FPGA/PCB设计的工作效率和连通性。FSP完成两顷重要工作:一、可以自动生成FPGA芯片的原理图符号(symbol);二、自动生成、优化和更改FPG…

C++模板(上)

文章目录 模板函数模板函数模板的实例化 类模板总结 模板 模板是C种为了方便用户对于一些场景的使用&#xff0c;引入的新概念&#xff0c;使得我们的代码不会冗余 template关键字 template关键字的意思就是模板&#xff0c;语法为&#xff1a;template<typename T1,type…

内网渗透之Linux权限维持-OpenSSHPAM后门SSH软链接公私钥登录

0x01替换版本-OpenSSH后门 原理&#xff1a;替换本身操作系统的ssh协议支撑软件openssh&#xff0c;重新安装自定义的openssh,达到记录帐号密码&#xff0c;也可以采用万能密码连接的功能&#xff01; 可以修改软件版本和删除安装记录 1.环境准备&#xff1a; yum -y install…

【Java EE 初阶】网络初识

目录 1.网络互连 1.局域网&#xff1a; 2.广域网WAN 2.网络通信基础 3.IP地址&#xff1a;端口号 4.协议 1.五元组 2.协议分层 1.为什么要用网络分层&#xff1f; 3.OSI七层模型 4.TCP/IP五层&#xff08;或四层&#xff09;模型 5.封装和分用 1.应用层 2.传输层A…

Oracle数据库中了locked1勒索病毒攻击后怎么办,什么是locked1勒索病毒

Oracle数据库是一种被集团企业广泛使用的关系型数据库管理系统&#xff0c;但是随着科学技术的不断发展&#xff0c;在现代互联网环境中数据库安全性成为了一个非常重要的问题。而其中主要的威胁就是勒索病毒攻击。一旦数据库被勒索病毒攻击入侵&#xff0c;许多重要的数据就会…