一 项目背景
此前的项目中,鉴于客户方服务器的安全配置对 MQ 中间件有所限制,我们只得采用 Redis 的 list 作为简易的 MQ 来传送报文数据。然而,近段时间客户关闭了相关端口,导致大量数据积压,需要进行补发。在补发过程中,发现原先的发送方式极其缓慢,之后改用管道方式发送才提升了速度。
二 优化发送redis数据库
1 redis发送优化前
优化前发送redis数据库是一条一条发送的,代码如下:
private void sendBatchMsgByRedis(List<HexLog> list) {
RedisTemplate<String, String> sec = SecondRedisConnection.getSecondRedisTemplate();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < list.size(); i++) {
DbHexLog db = list.get(i);
sb.setLength(0); // 重置 StringBuilder
sb.append(db.getAddr())
.append("$$")
.append(db.getInHexStr())
.append("$$")
.append(db.getCtime().format(df));
String txt = sb.toString();
try {
sec.opsForList().leftPush("PUSH_HEX", txt);
} catch (Exception e) {
// 使用适当的日志工具记录异常
Logger.getLogger(getClass().getName()).log(Level.SEVERE, "Failed to push to Redis", e);
}
// 使用条件日志,减少系统输出
if (i % 100 == 0) {
System.out.println("send =========" + i);
}
}
System.out.println("Batch processing completed. Total messages sent: " + list.size());
}
说明:优化前发送1天的数据量600万条大概要10个多小时。
2 redis发送优化后
优化后发送redis数据库是使用管道批量发送,代码如下:
private void sendBatchMsgByRedis(List<DbHexLog> list) {
RedisTemplate<String, String> sec = SecondRedisConnection.getSecondRedisTemplate();
// 实现 SessionCallback 接口,用于批量执行 Redis 操作
SessionCallback<Void> sessionCallback = new SessionCallback<>() {
@Override
public Void execute(RedisOperations operations) {
StringBuilder sb = new StringBuilder();
for (DbHexLog db : list) {
sb.setLength(0); // 重置 StringBuilder
sb.append(db.getAddr())
.append("$$")
.append(db.getInHexStr())
.append("$$")
.append(db.getCtime().format(df));
String txt = sb.toString();
operations.opsForList().leftPush("PUSH_HEX", txt);
}
return null;
}
};
// 使用 executePipelined 执行批量操作
sec.executePipelined(sessionCallback);
System.out.println("Batch processing completed. Total messages sent: " + list.size());
}
优化后发送一天的数据量只需10分钟即完成,如下图:
由于服务器处理速度慢,一下就积压了刚发送过来的6百多万条。
三 小结
在使用管道批量发送时,需留意以下事项:
-
内存容量:要确保内存与数据量相适配(合理配置 Xmx)。例如,若数据量庞大,可能需要配置较大的内存空间,如 8G 甚至 16G,以保障系统运行的流畅性。
-
网速:依据传送的数据量,适当延迟 Redis 超时连接时间。比如,当数据传输量较大时,可将超时时间延长至 30 秒甚至 1 分钟。
需要明确的是,使用管道批量发送与单条数据发送存在差异,所涉及的配置可能需要变更。此外,要提示分批量发送,并根据配置机调整每次发送的数据量。例如上述的 650 多万条数据,可根据时间每小时一批,大概 27 万左右发一批,同时配置约 1G 的内存,超时也设置为 10 秒。
倘若未设置 Redis 超时(或 setTimeout 时间设置过短),再加上数据量较大、带宽较小的情况,就有可能会报出以下错误:
org.springframework.data.redis.connection.RedisPipelineException: Pipeline contained one or more invalid commands; nested exception is org.springframework.data.redis.connection.RedisPipelineException: Pipeline contained one or more invalid commands; nested exception is org.springframework.dao.QueryTimeoutException: Redis command timed out