本文介绍Redis pipeline相关的知识点及代码示例,包括Redis客户端-服务端的一次完整的网络请求、pipeline与client执行多命令的区别、pipeline与Redis"事务"、pipeline的使用代码示例;
pipeline与client执行多命令的区别
Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务;Redis客户端-服务端的一次完整的网络请求来回如下图;
简化一下,一次Redis请求和响应,会经历如下的步骤:
- 客户端发起一个(查询/插入)请求,并监听socket返回,通常情况都是阻塞模式等待Redis服务器的响应;
- 服务端处理命令,并且返回处理结果给客户端;
- 客户端接收到服务的返回结果,程序从阻塞代码处返回;
Redis客户端和服务端之间通过网络连接进行数据传输,这个连接可以很快(loopback接口)或很慢(建立了一个多次跳转的网络连接);但无论网络延如何延时,数据包总是能从客户端到达服务器,并从服务器返回数据回复客户端,这个时间被称之为RTT(Round Trip Time - 往返时间);
我们可以很容易就意识到,Redis在连续请求服务端时,即使Redis每秒能处理100k请求,但也会因为网络传输花费大量时间,导致整体性能的下降;
因此如果遇到大量的批处理,我们可以考虑使用Redis的pipeline(管道);
对于pipeline技术而言,对于N个命令,就相当于将N个上图中的步骤,合并成1个,其他多余的时间开销仅作用于命令的执行,这样服务请求响应的总体时间将会大大的减少;
关于pipeline与client单命令的压测结果可参考Redis精通系列——Pipeline(管道);
值得注意的是,管道技术并不是Redis特有的技术,管道技术往往需要客户端-服务器的共同配合,大部分工作任务其实是在客户端完成;Redis在较早的版本就已经支持管道技术;
如下图,多个连续的incr指令,使用pipeline(管道)后,多个连续的incr指令只会花费一次网络来回开销;这个开销会随着N数值的增大,大幅减少网络IO开销,从而提升整体服务的性能;
Redis pipeline的使用注意事项
1. pipeline一次执行的命令不宜过多
结合上面redis命令完整执行流程图,有个值得注意的点——可能出现我们经常说到的IO阻塞:
- 当write操作发生,并且发送缓冲区(send buffer)满时,就会导致write操作阻塞;
- 当read操作发生,并且接收缓冲区(recv buffer)满时,就会导致read操作阻塞;
上述的这两个阻塞如果出现,将会导致整个请求时间变长;
因此我们操作大批量指令的时候,比如10k个指令,我们可以合理的对指令分多次批量发送,这样可以减少出现阻塞的情况,也可以避免服务器响应一个过大的response包,导致客户端内存负载过重;
即使不发生IO阻塞,pipeline每批打包的命令不能过多还有一个原因:因为 pipeline 方式打包命令再发送,那么 redis server 必须在处理完所有命令前,先缓存起所有命令的处理结果,这样就有一个缓存结果的内存的消耗;
2. pipeline不保证命令执行的原子性
官方文档的一句话——
Redis::PIPELINE block is simply transmitted faster to the server, but without any guarantee of atomicity.
其实Redis的高性能设计本就不支持包含多命令的严格事务,哪怕是multi/exec操作还是Lua脚本;Redis只是提供了一些命令来一定程度实现"事务";
multi/exec操作针对命令语法错误和执行时参数错误,处理是不一样的,详情见我之前的文章《Redis——“事务“/Lua脚本》;
Lua脚本也只能一定程度保证逻辑处理和Redis命令打包的原子性,例如库存扣减;
pipeline的使用代码示例
代码示例的背景是:根据appId批量查询本地缓存的App信息,未命中本地缓存的,需要从redis中拿这多个key的value,然后刷入本地缓存;在"从redis中拿这多个key的value",在key数量较多时,做个优化,使用Redis的pipeline;
private List<AppSimpleInfoDTO> querySimpleAppWithCache(List<String> payAppIds) {
if (CollectionUtils.isEmpty(payAppIds)) {
return Lists.newArrayList();
}
List<AppSimpleInfoDTO> appList = Lists.newArrayList();
try {
// 从本地缓存读取APP
final List<String> tobeQry = Lists.newArrayList();
for (String payAppId : payAppIds) {
AppSimpleInfoDTO appSimpleInfoDTO = null;
appSimpleInfoDTO = appSimpleInfoCache.getIfPresent(payAppId);
if (appSimpleInfoDTO != null) {
log.info("get_appSimpleInfoDTO_fr_appSimpleInfoCache_suc. [appSimpleInfoDTO={}]", JSON.toJSONString(appSimpleInfoDTO));
appList.add(appSimpleInfoDTO);
} else {
tobeQry.add(payAppId);
}
}
// 未命中本地缓存则去redis查询
if (CollectionUtils.isNotEmpty(tobeQry)) {
final List<AppSimpleInfoDTO> cacheAppSimpleInfoDTOs = getAndCacheAppSimpleInfoDTOs(tobeQry);
if (CollectionUtils.isNotEmpty(cacheAppSimpleInfoDTOs)) {
appList.addAll(cacheAppSimpleInfoDTOs);
}
}
return appList;
} catch (Exception e) {
log.error("querySimpleAppWithCache error.", e);
// 异常时刷全量缓存
return getAndCacheAppSimpleInfoDTOs(payAppIds);
}
}
使用pipeline做多个key的get命令:
private List<AppSimpleInfoDTO> getAndCacheAppSimpleInfoDTOs(List<String> payAppIds) {
final Set<String> payAppIds2Qry = new HashSet<>(payAppIds);
final List<AppSimpleInfoDTO> result = Lists.newArrayList();
// 先尝试从redis获取 pipeline模式
JedisClusterPipeLine pipeline = null;
try {
pipeline = jedisCluster.pipelined();
for (String payAppId : payAppIds) {
final String appSimpleInfoKey = CacheKeyUtils.getAppSimpleInfoKey(payAppId);
// pipeline添加get命令
pipeline.get(appSimpleInfoKey);
}
// pipeline执行并获取结果
final List<Object> allVal = pipeline.syncAndReturnAll();
if (CollectionUtils.isNotEmpty(allVal)) {
allVal.forEach(val -> {
if (val != null) {
final String jsonStr = String.valueOf(val);
if (StringUtils.isNotBlank(jsonStr)) {
Optional.ofNullable(JSON.parseObject(jsonStr, AppSimpleInfoDTO.class)).ifPresent(appSimpleInfoDTO -> {
result.add(appSimpleInfoDTO);
appSimpleInfoCache.put(appSimpleInfoDTO.getPayAppId(), appSimpleInfoDTO);
log.info("localCache_AppSimpleInfoDTO_fr_redis_suc. [simpleApp={}]", JSON.toJSONString(appSimpleInfoDTO));
payAppIds2Qry.remove(appSimpleInfoDTO.getPayAppId());
});
}
}
});
}
} catch (Exception e) {
log.error("localCache_AppSimpleInfoDTOs_fr_redis_error. [payAppIds={}]", JSON.toJSONString(payAppIds), e);
} finally {
if (pipeline != null) {
pipeline.close();
}
}
// redis未查到的数据走RPC查询 并异步加载到redis和localCache
if (CollectionUtils.isNotEmpty(payAppIds2Qry)) {
final List<String> payAppIds2QryList = new ArrayList<>(payAppIds2Qry);
List<App> apps = Lists.newArrayList();
int index = 0;
while (index < payAppIds2QryList.size()) {
int toIndex = Math.min(index + max_batch, payAppIds2QryList.size());
List<String> payAppIds2QryTemp = payAppIds2QryList.subList(index, toIndex);
// 实时查APP信息接口
List<App> appsTemp = queryAppByPayAppIds(new ArrayList<>(payAppIds2QryTemp));
apps.addAll(appsTemp);
index += max_batch;
}
// 异步加载到redis和localCache
if (CollectionUtils.isNotEmpty(apps)) {
for (App app : apps) {
AppSimpleInfoDTO simpleApp = new AppSimpleInfoDTO(app.getName(), app.getPackname(), app.getCode(), app.getBigType());
CompletableFuture.runAsync(() -> {
// redis缓存1小时
jedisClusterTemplate.setex(CacheKeyUtils.getAppSimpleInfoKey(simpleApp.getPayAppId()), VivoConfigManager.getInteger(JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL, JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL_DEFAULT), JSON.toJSONString(simpleApp));
log.info("redisCache_AppSimpleInfoDTO_suc. [simpleApp={}]", JSON.toJSONString(simpleApp));
// local cache 缓存5分钟
appSimpleInfoCache.put(simpleApp.getPayAppId(), simpleApp);
log.info("localCache_AppSimpleInfoDTO_suc. [simpleApp={}]", JSON.toJSONString(simpleApp));
});
result.add(simpleApp);
}
}
}
// 重新排序
final Map<String, AppSimpleInfoDTO> payAppIdMap = result.stream().collect(Collectors.toMap(AppSimpleInfoDTO::getPayAppId, Function.identity(), (old, newly) -> newly));
result.clear();
payAppIds.forEach(payAppId -> Optional.ofNullable(payAppIdMap.get(payAppId)).ifPresent(result::add));
return result;
}
本文参考:
Redis精通系列——Pipeline(管道)