文章目录
- 使用原生JDBC,数据量很大且内存空间较小的情况下,JavaHeap非常容易溢出
- 问题背景
- `java.lang.OutOfMemoryError: Java heap space
- 错误分析
- 解决方案
- 1. **优化数据库查询**
- 2. **调整 JVM 堆内存**
- 3. **批量处理数据**
- 4. **线程池优化**
- 总结
- **`ResultSet.getString`** 导致的内存溢出与以下几个关键点有关:
- **问题来源分析**
- **代码中的优化点**
- **1. 查询返回数据过多**
- **2. 列筛选**
- **3. 大字段处理**
- **4. 分批发送到 MQ**
- **5. 增加 JVM 内存**
- **6. 避免不必要的 JSON 处理**
- **总结**
- 最终处理
使用原生JDBC,数据量很大且内存空间较小的情况下,JavaHeap非常容易溢出
问题背景
在sendRecord方法中 ,通过dbDirectService.getDBRecords获取数据库的数据,然后rs遍历每条记录进行操作。在数据量很大且内存空间较小的情况下,JavaHeap非常容易溢出,该如何改进呢?
public int sendRecord(Connection conn, String timeStart, String timeEnd, DataSwitchControl dataSwitchControl, DataSwitchSubControl
dataSwitchSubControl, String driver) throws Exception{
PreparedStatement ps = null;
ResultSet rs = null;
int thisCount = 0;
try {
long dbStartTime = System.currentTimeMillis();
ps = dbDirectService.getDBRecords(conn, timeStart, timeEnd,dataSwitchSubControl.getSrcTbName(),dataSwitchSubControl.getQueryColumn(),driver);
rs = ps.executeQuery();
long dbEndTime = System.currentTimeMillis();
logger.debug("本次数据库耗时: "+(dbEndTime - dbStartTime));
/*if (rs.first()) {*/
// 数据库json数组数据转string填入消息体,并放入消息列表
ResultSetMetaData rsm = rs.getMetaData();
int columnCount = rsm.getColumnCount();
//rs.beforeFirst();
while (rs.next()) {
long t1 = System.currentTimeMillis();
JSONObject jsonObj = new JSONObject();
for (int i = 0; i < columnCount; i++) {
String colName = rsm.getColumnName(i + 1);
String colType = rsm.getColumnTypeName(i + 1);
String value = rs.getString(colName);
if (value == null) {
value = "";
}
if(colName.equals("extract_time"))continue;//入库时间字段不做处理
jsonObj.put(colName + "|" + colType, value);
}
MessageModel model = new MessageModel();
model.setStartTime(timeStart);
model.setEndTime(timeEnd);
model.setSrcTbName(dataSwitchSubControl.getSrcTbName());
model.setDistTbName(dataSwitchSubControl.getDistTbName());
model.setJsonObject(jsonObj);
model.setId(String.valueOf(dataSwitchControl.getRowId()));
model.setOrderId(rs.getString(dataSwitchSubControl.getRowColumn()));
model.setOrderName(dataSwitchSubControl.getRowColumn());
long t4 = System.currentTimeMillis();
//logger.info("组对象:"+(t4-t1));
//发送消息到队列
queueFactory.addQueue(model,dataSwitchControl.getBatchFlag());
thisCount += 1;
}
long sendEndTime = System.currentTimeMillis();
logger.debug("本次转移任务主键:"+dataSwitchControl.getRowId()+",转移表名:"+dataSwitchSubControl.getSrcTbName()+",发送mq耗时: "+(sendEndTime - dbEndTime)+",记录数: " +thisCount);
//}
return thisCount;
}catch (Exception e)
{
throw new Exception(e);
}finally {
dbDirectService.closeConnection(null,ps,rs);
}
}
public PreparedStatement getDBRecords(Connection conn, String start, String next, String tabName, String queryColum, String driver) throws Exception {
PreparedStatement ps = null;
try {
String sql = "";
if (DataSwitchConstants.DB_DRIVER_MYSQL.equals(driver)) {
sql = "select * from " + tabName + " where " + queryColum + " >= STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s') and " + queryColum + " < STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')";
}
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(50);
ps.setFetchDirection(ResultSet.FETCH_FORWARD);
//ps.clearParameters();
ps.setString(1, start);
ps.setString(2, next);
logger.info("sql:" + sql);
} catch (Exception e) {
throw new Exception();
}
return ps;
}
`java.lang.OutOfMemoryError: Java heap space
出现了一个 java.lang.OutOfMemoryError: Java heap space 错误。具体原因分析如下:
从你提供的日志截图来看,出现了一个 java.lang.OutOfMemoryError: Java heap space
错误。具体原因分析如下:
错误分析
-
日志核心内容
- 日志显示错误来源于 数据库查询结果的处理过程中。
- 错误堆栈表明:
- 数据库操作由 Druid 连接池处理;
ResultSet.getString
方法导致了内存溢出;- 具体原因可能是从数据库中读取了大量数据导致内存占用过高。
-
具体原因
- 数据库查询语句:
SELECT * FROM gw_backwards_order_archive_03 WHERE extract_time >= STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s') AND extract_time < STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')
- 查询结果的范围取决于
extract_time
参数的值。 - 如果查询条件范围过大,可能导致 返回结果集数据量过多。
- 查询结果的范围取决于
- 从堆栈可以看到,
ResultSet.getString
尝试将大量数据加载到内存中,导致 堆内存不足。
- 数据库查询语句:
-
线程模型
- 日志中的
main-producer-threadPool
表明这个任务由线程池中的某个线程执行。 - 如果线程池中有多个线程同时执行类似的任务,而每个任务都读取了大量数据,内存使用会进一步增加。
- 日志中的
-
程序设计可能问题
- 数据库查询返回的数据集未进行分页或限制,导致一次性加载过多数据到内存。
- 查询结果被处理为大对象(如字符串拼接、复杂的映射转换等),增加了内存占用。
- 线程池中多个线程并行处理类似任务,争夺堆内存资源。
解决方案
1. 优化数据库查询
-
分页查询
确保数据库查询的返回结果集不会过大,可以通过分页限制返回的数据量。例如:SELECT * FROM gw_backwards_order_archive_03 WHERE extract_time >= STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s') AND extract_time < STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s') LIMIT ?, ?
- 每次只查询一部分数据,通过分页参数动态调整
OFFSET
和LIMIT
。
- 每次只查询一部分数据,通过分页参数动态调整
-
只查询必要字段
如果不需要所有字段,使用精确字段代替SELECT *
,例如:SELECT field1, field2 FROM gw_backwards_order_archive_03 WHERE ...
2. 调整 JVM 堆内存
-
如果确实需要加载大量数据,可以尝试增大堆内存:
-Xms512m -Xmx2048m
-Xms
: JVM 初始化堆大小;-Xmx
: JVM 最大堆大小。
-
注意:这只能缓解内存问题,但不能根本解决大数据量问题,还是需要优化查询逻辑。
3. 批量处理数据
- 不一次性加载所有数据,而是分批次处理:
这样可以显著降低内存占用。int batchSize = 1000; while (hasMoreData) { List<Data> dataBatch = fetchNextBatch(batchSize); processBatch(dataBatch); }
4. 线程池优化
- 限制线程池中可同时运行的任务数量,避免内存争用。例如:
ExecutorService executor = new ThreadPoolExecutor( 2, // core pool size 4, // max pool size 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10) // task queue size );
- 减少同时运行的任务数量,以控制并发对内存的占用。
总结
你的问题主要是 数据库查询返回结果集过大 导致内存不足。建议按照以下优先级逐步优化:
- 优化数据库查询,通过分页和字段选择减少结果集大小;
- 分批次处理数据,避免一次性加载所有数据;
- 适当调整 JVM 堆内存配置,缓解现有内存压力;
- 优化线程池配置,控制并发任务数。
ResultSet.getString
导致的内存溢出与以下几个关键点有关:
问题来源分析
-
大量数据加载到内存:
- 你的 SQL 查询 (
SELECT *
) 没有限制结果集大小,可能会返回大量数据行,所有这些数据会被加载到内存中。 - 即使你在
PreparedStatement
中设置了fetchSize(50)
,这只会影响数据库和 JDBC 驱动之间的数据抓取效率,但数据仍会逐条加载到 JVM 内存中。
- 你的 SQL 查询 (
-
全表扫描:
- SQL 查询语句
SELECT *
会将表中的所有字段加载进结果集中,尤其是有大字段(如TEXT
或BLOB
)时,每一行的数据都会占用较大的内存。 - 如果表的列较多(例如几十列)或某些列的内容较大(例如 JSON 或 XML 字符串),内存压力会迅速增加。
- SQL 查询语句
-
ResultSet.getString
和 JSON 构造:- 代码中通过
ResultSet.getString
方法逐一读取每一列的值,并将其存入JSONObject
:String value = rs.getString(colName); if (value == null) { value = ""; } jsonObj.put(colName + "|" + colType, value);
- 如果某些列的值是非常大的字符串(例如日志数据、JSON 数据等),则每次读取时都会在内存中分配大量内存来存储这些字符串。
- 代码中通过
-
没有分批处理数据:
- 整个代码逻辑是基于一个大循环从
ResultSet
中逐行读取并处理,没有限制单次任务的内存占用。 - 即使你设置了批量发送到 MQ,但在每次循环中,每条记录的 JSON 都暂时存储在内存中,累计处理大量记录时会消耗 JVM 堆内存。
- 整个代码逻辑是基于一个大循环从
代码中的优化点
以下是你的代码可能导致内存问题的关键点,以及相应的优化建议:
1. 查询返回数据过多
- 问题:
SELECT *
会查询整个表的数据。- 没有分页机制,可能会一次性返回上百万条记录。
- 优化建议:
- 使用分页查询来限制每次返回的数据量。例如:
SELECT * FROM table_name WHERE query_column >= ? AND query_column < ? LIMIT 100 OFFSET ?;
- 在代码中使用分页逻辑:
int pageSize = 100; // 每页记录数 int offset = 0; // 起始位置 while (true) { String sql = "SELECT * FROM " + tableName + " WHERE query_column >= ? AND query_column < ? LIMIT ? OFFSET ?"; PreparedStatement ps = conn.prepareStatement(sql); ps.setString(1, startTime); ps.setString(2, endTime); ps.setInt(3, pageSize); ps.setInt(4, offset); ResultSet rs = ps.executeQuery(); if (!rs.next()) break; // 没有更多数据,退出循环 ps这里实际上埋了一个大问题,数据丢失了 while (rs.next()) { // 处理数据 } offset += pageSize; // 更新分页偏移量 }
- 使用分页查询来限制每次返回的数据量。例如:
2. 列筛选
- 问题:
SELECT *
会返回所有列,可能包括许多不必要的列。
- 优化建议:
- 只查询必要的列,例如:
SELECT column1, column2, column3 FROM table_name WHERE query_column >= ? AND query_column < ?;
- 只查询必要的列,例如:
3. 大字段处理
- 问题:
- 数据表中可能存在大字段(例如
TEXT
或BLOB
),getString
读取这些字段会占用大量内存。
- 数据表中可能存在大字段(例如
- 优化建议:
- 对于大字段,避免直接使用
ResultSet.getString
,而是使用流式读取:InputStream inputStream = rs.getBinaryStream("large_column"); // 使用流读取数据,避免将整个内容加载到内存
- 对于大字段,避免直接使用
4. 分批发送到 MQ
- 问题:
- 当前代码是读取一行数据就立即发送到 MQ,但如果某个批次数据量太大,仍然会导致内存积压。
- 优化建议:
- 使用批量发送模式,例如将一定数量的
MessageModel
累积后再发送:List<MessageModel> messageBatch = new ArrayList<>(); while (rs.next()) { // 构造 MessageModel MessageModel model = new MessageModel(); // ... messageBatch.add(model); // 当累积到一定数量时发送 if (messageBatch.size() >= 100) { queueFactory.addBatchQueue(messageBatch); messageBatch.clear(); } } // 发送剩余数据 if (!messageBatch.isEmpty()) { queueFactory.addBatchQueue(messageBatch); }
- 使用批量发送模式,例如将一定数量的
5. 增加 JVM 内存
- 问题:
- 如果数据量确实过大,JVM 堆内存不足也是导致问题的原因之一。
- 优化建议:
- 增大 JVM 堆内存,例如:
java -Xms512m -Xmx4g -jar your-application.jar
- 但这只是缓解手段,根本问题还是要限制数据量。
- 增大 JVM 堆内存,例如:
6. 避免不必要的 JSON 处理
- 问题:
- 每行数据都会被封装成一个
JSONObject
,这会增加额外的内存开销。
- 每行数据都会被封装成一个
- 优化建议:
- 使用更轻量级的对象(如简单的 POJO 或 Map)来代替
JSONObject
。
- 使用更轻量级的对象(如简单的 POJO 或 Map)来代替
总结
内存溢出的核心原因是:SQL 查询返回的数据量太大,ResultSet.getString
读取数据时占用内存过多。优化建议包括:
- 使用分页查询限制数据量。
- 避免
SELECT *
,只查询必要的列。 - 对大字段使用流式读取。
- 使用批量发送模式减少内存压力。
- 增加 JVM 堆内存作为辅助手段。
可以先从查询限制(分页和列筛选)开始优化!
最终处理
- 分页
- 减少jsonObject中不必要的字段
分页如上述所讲的,加上 limit 和 offset的参数,在外层需用while将数据全部取出,判定没数据需要if (!rs.next()) break; // 没有更多数据,退出循环
后面需用while (rs.next())
取数据,需注意if (!rs.next()) break;
执行后实际上浪费了一条数据!因此需要rs.beforeFirst(); // 重新设置游标到数据的起始位置
使用这个需要注意不能使用ResultSet.TYPE_FORWARD_ONLY
,会抛出异常!
public int sendRecord(Connection conn, String timeStart, String timeEnd, DataSwitchControl dataSwitchControl, DataSwitchSubControl
dataSwitchSubControl, String driver) throws Exception {
PreparedStatement ps = null;
ResultSet rs = null;
int thisCount = 0;
try {
int pageSize = 100; // 每页记录数
int offset = 0; // 起始位置
while (true) {
long dbStartTime = System.currentTimeMillis();
ps = dbDirectService.getDBRecords(conn, timeStart, timeEnd, dataSwitchSubControl.getSrcTbName(), dataSwitchSubControl.getQueryColumn(), driver,pageSize ,offset );
rs = ps.executeQuery();
long dbEndTime = System.currentTimeMillis();
logger.debug("本次数据库耗时: " + (dbEndTime - dbStartTime));
/*if (rs.first()) {*/
// 数据库json数组数据转string填入消息体,并放入消息列表
ResultSetMetaData rsm = rs.getMetaData();
int columnCount = rsm.getColumnCount();
if (!rs.next()) break; // 没有更多数据,退出循环
rs.beforeFirst(); // 重新设置游标到数据的起始位置
//rs.beforeFirst();
while (rs.next()) {
JSONObject jsonObj = new JSONObject();
for (int i = 0; i < columnCount; i++) {
String colName = rsm.getColumnName(i + 1);
if (colName.equals("extract_time")) continue;//入库时间字段不做处理
String value = rs.getString(colName);
if (value == null) {
value = "";
}
jsonObj.put(colName, value);
}
//DATA_SWITCH_CONFIG表ROW_ID+主键
MessageModel model = new MessageModel();
// model.setProcessingType(dataSwitchControl.getProcessingType());
model.setStartTime(timeStart);
model.setEndTime(timeEnd);
/*model.setSrcTbName(dataSwitchSubControl.getSrcTbName());*/
model.setDistTbName(dataSwitchSubControl.getDistTbName());
model.setJsonObject(jsonObj);
// model.setId(String.valueOf(dataSwitchControl.getRowId()));
model.setOrderId(rs.getString(dataSwitchSubControl.getRowColumn()));
model.setOrderName(dataSwitchSubControl.getRowColumn());
logger.info("MessageModel:" + JSONObject.toJSONString(model));
//发送消息到队列
queueFactory.addQueue(model, dataSwitchControl.getBatchFlag());
thisCount += 1;
}
offset += pageSize; // 更新分页偏移量
}
return thisCount;
} catch (Exception e) {
throw new Exception(e);
} finally {
dbDirectService.closeConnection(null, ps, rs);
}
}
public PreparedStatement getDBRecords(Connection conn, String start, String next, String tabName, String queryColum, String driver, int pageSize, int offset) throws Exception {
PreparedStatement ps = null;
try {
String sql = "";
if (DataSwitchConstants.DB_DRIVER_MYSQL.equals(driver)) {
sql = "select * from " + tabName + " where " + queryColum + " >= STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s') and " + queryColum + " < STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s') LIMIT ? OFFSET ?";
}
//ps = conn.prepareStatement(sql,ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
ps = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(50);
ps.setFetchDirection(ResultSet.FETCH_FORWARD);
//ps.clearParameters();
ps.setString(1, start);
ps.setString(2, next);
ps.setInt(3, pageSize);
ps.setInt(4, offset);
logger.info("sql:" + sql);
} catch (Exception e) {
throw new Exception();
}
return ps;
}