起因是后台job对一批数据做大量的redis读写操作,为了提高job的执行速度,直接使用pipeline对一些不能批量读写的命令进行管道优化
简单介绍什么是lettuce
Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettuce客户端基于Netty的NIO框架实现,对于大多数的Redis操作,只需要维持单一的连接即可高效支持业务端的并发请求 —— 这点与Jedis的连接池模式有很大不同。同时,Lettuce支持的特性更加全面,且其性能表现并不逊于,甚至优于Jedis
官方是这样介绍的:Lettuce 是一个可扩展的线程安全 Redis 客户端,提供同步、 异步和反应式API。如果多个线程避免阻塞和事务性操作(例如BLPOP和 MULTI/ ) ,则它们可以共享一个连接EXEC。优秀的netty NIO框架可以有效地管理多个连接。其中包括对高级 Redis 功能(例如 Sentinel、Cluster 和 Redis 数据模型)的支持。
为什么选择使用pipeline进行批处理优化
客户端与服务端通过网络连接,无论两者间的网络延迟是高还是低,数据包从客户端到服务端(请求),再从服务端返回客户端(响应)的过程总是会消耗一定的时间。我们将这段时间称为RTT(Round Trip Time)。假设在延迟非常高的网络条件下,RTT达到250ms,此时就算服务端拥有每秒处理100k请求的能力,(基于单一连接)整体的QPS也仅仅只有4。而如果借助管道模式,客户端则可以一次性发出大量(如1k)请求,并随后一次性接收大量服务端的响应,从而显著提高请求处理速度
还原现场
本次使用的zRem接口,zRem可以在一个网络请求中对一个key的多个values删除,但是如果删除的是多个key,则做不到,所以会考虑pipeline进行优化
简单的zRemove接口
public Boolean zRemove(String key, String value) {
try {
Long affected = stringRedisTemplate.opsForZSet().remove(key, value);
return Objects.equals(affected, 1L);
} catch (Exception e) {
log.error("Redis Operation[zRemove] Error", e);
}
return null;
}
使用pipeline后
public Boolean zRemoveByPipeline(String key, Collection<String> values) {
if (CollectionUtils.isEmpty(values)) {
return Boolean.TRUE;
}
try {
List<Object> results = stringRedisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
for (String value : values) {
connection.zRem(key.getBytes(), value.getBytes());
}
return null;
});
return Objects.equals(results.size(), values.size());
} catch (Exception e) {
log.error("Redis Operation[zRemoveByPipeline] Error", e);
}
return Boolean.FALSE;
}
优化结果
不仅job运行时间变长,服务器cpu也有显著上升
原因排查
查看executePipelined方法
public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {
return execute((RedisCallback<List<Object>>) connection -> {
connection.openPipeline();
boolean pipelinedClosed = false;
try {
Object result = action.doInRedis(connection);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
List<Object> closePipeline = connection.closePipeline();
pipelinedClosed = true;
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!pipelinedClosed) {
connection.closePipeline();
}
}
});
}
其实就是使用pipeline后,方法首尾分别调用了connection.openPipeline();
和connection.closePipeline();
查看lettuce的openPipeline()实现:
public void openPipeline() {
if (!isPipelined) {
isPipelined = true;
ppline = new ArrayList<>();
flushState = this.pipeliningFlushPolicy.newPipeline();
flushState.onOpen(this.getOrCreateDedicatedConnection());
}
}
先解释一下:
- isPipelined代表本次调用是否使用pipeline
- ppline 存贮封装好的LettuceResult
- flushState:pipeline的刷新策略,lettuce默认
PipeliningFlushPolicy.flushEachCommand()
每个命令一刷
第一点很好理解,第二点我们看一下ppline这个集合什么时候被添加元素:
void pipeline(LettuceResult result) {
if (flushState != null) {
flushState.onCommand(getOrCreateDedicatedConnection());
}
if (isQueueing()) {
transaction(result);
} else {
ppline.add(result);
}
}
在执行reids命中的execute方法中,如果读取到isPipelined为true,则调用上面的pipeline方法,将LettuceResult的对象存入ppline集合中
这样就可以解释第三点,上面代码将LettuceResult的对象存入ppline集合中执行会执行:flushState.onCommand(getOrCreateDedicatedConnection());
然而在FlushEachCommand的刷新策略类中是这样实现该方法的: public void onCommand(StatefulConnection<?, ?> connection) {}
什么也不做,这就会很奇怪了,必须看看别的实现类怎么做,比如BufferedFlushing的实现:
private static class BufferedFlushing implements PipeliningFlushState {
private final AtomicLong commands = new AtomicLong();
private final int flushAfter;
public BufferedFlushing(int flushAfter) {
this.flushAfter = flushAfter;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
if (commands.incrementAndGet() % flushAfter == 0) {
connection.flushCommands();
}
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
connection.flushCommands();
connection.setAutoFlushCommands(true);
}
}
很明了:
- onOpen:打开pipeline时,将自动刷新设置为false,
该值默认是true
,此时lettuce是每个命令都会立即发送到redis server - onCommand:每flushAfter个命令后,调用flushCommands进行刷新
- onClose:刷新命令并设置自动刷新为true
看到这里,博主赶紧回头看FlushEachCommand的三个方法:果然都是什么都没做,因为默认就是自动刷新:每个命令都会立即发送到redis server
到了这里已经出现端倪:我们默认用的是FlushEachCommand,那么即便使用了pipeline,也没有做到像开头说的那样:将一批redis 命中只通过一次网络调用发往redis
FlushEachCommand什么时候被使用?
在把命令封装成LettuceResult的时候回调用dispatch方法,最终在DefaultEndpoint类中:write(RedisCommand<K, V, T> command)
方法
if (autoFlushCommands) {
if (isConnected()) {
writeToChannelAndFlush(command);
} else {
writeToDisconnectedBuffer(command);
}
} else {
writeToBuffer(command);
}
当autoFlushCommands = true,writeToChannelAndFlush
否则:writeToBuffer()将命令装入内部缓冲区 private final Queue<RedisCommand<?, ?, ?>> commandBuffer;
在onCommand和onClose的时候调用flushCommands将缓冲区的命令发送至redis server
在本次案例中因为autoFlushCommands = false,所以将命令放入缓冲区
为什么cpu和时间都会在增加呢?
上面的代码中我们已经知道excute方法把redis command封装成LettuceResult放入了pipeline集合中,LettuceResult其实是把redis命令放入了内部缓冲区commandBuffer
而刷新策略的区别就是BufferedFlushing在onOpen的时候设置autoFlushCommands为false,onCommand.的时候选择是否flushCommands,onClose的时候flushCommands并且设置autoFlushCommands为true;而FlushEachCommand什么都没做
本例的优化手段只是使用了pipeline,并没有操作autoFlushCommands,所以本质上还是每个命令都会直接被发送到redis server,所以网络请求并不会减少,至于CPU占用上升,
则是因为pipeline使用了异步请求,增加了CPU的占用
附closePipeline方法核心片段:
flushState.onClose(this.getOrCreateDedicatedConnection());
flushState = null;
isPipelined = false;
List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
for (LettuceResult<?, ?> result : ppline) {
futures.add(result.getResultHolder());
}
try {
boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS,
futures.toArray(new RedisFuture[futures.size()]));
先调用flushState.onClose方法,无论哪个实现类的最终目的都是将缓冲区的命令刷新到redis server,再通过 LettuceFutures.awaitAll
方法获取最终结果;
怎么改进
1、聪明的你肯定已经知道了:那就使用BufferedFlushing
没问题,但是如果你了解过lettuce,应该已经知道了,lettuce是单连接的且线程安全,考虑一个问题:为了一些批处理的功能,我们设置了例如每5个命令刷新一次管道,如果此时一些C端的服务要读这些缓存,也要每5个命令才能刷新一次,虽然在高并发的情况下5个命令很快就打满了,影响微乎其微,但终究不让人放心,是否能读“写分离呢”?
再次优化,读和写的操作分开成两个服务或者读和写分开成连个连接,一个设置成每5个命令刷新一次用来写,另外一个服务或者连接保持默认用来读那就ok了
2、如果flushAfter这个参数你不好把握,想认为自己控制什么自动刷新的开闭功能,那么刷新策略就不要设置成BufferedFlushing,默认为FlushEachCommand,然后自己在各自封装好的代码中进行人为控制,比如开始写之前,关闭自动刷新,写结束,打开自动刷新;但是这样1中的问题也会存在,且打开和关闭的时机也不好把握,不如第一条稳妥;
3、官方如何推荐?
首先要明白lettuce为什么单个连接也能应对高并发的读写请求,参考
深度解析lettuce,为什么单连接也可以处理高并发redis请求
简单理解Lettuce 是一个非阻塞和异步客户端。它提供了一个同步 API,以在每个线程的基础上实现阻塞行为,以创建等待(同步)命令响应。阻塞本身不会影响其他线程。Lettuce 被设计为以pipeline方式运行。多个线程可以共享一个连接。当一个线程可以处理一个命令时,另一个线程可以发送一个新命令。第一个请求返回后,第一个线程的程序流将继续,而第二个请求由 Redis 处理并在某个时间点返回。
关于刷新:状态AutoFlushCommands是针对每个连接设置的,因此对于使用共享连接的所有线程都可见。如果您想忽略此影响,请使用专用连接。
同时还有警告:请勿在跨线程共享连接时使用,至少在没有正确同步的情况下不要使用setAutoFlushCommands(…)。根据许多问题和(无效的)错误报告,setAutoFlushCommands(…)在多线程场景中使用会导致大量复杂性开销,并且很可能会导致您这边出现问题。setAutoFlushCommands(…)只能在批量加载等场景中可靠地用于单线程连接使用。
也就是官方并不推荐自己setAutoFlushCommands,尤其是在跨线程的时候,但如果真的需要使用,还是建议分批次刷新,根据性能测试推荐配置为50-1000