和其他大部分NoSQL不同,Redis是支持事务的,尽管没有数据库那么强大,但非常有用,在某些高并发但又要保证高度一致性的场景下,代替数据库事务非常有效;在Redis的机制中,允许通过流水线一次性发给Redis大量的命令去执行,如此可以提高命令的执行效率并降低网络延迟;此外Redis还支持消息的发布订阅从而支持消息的传播,缓存数据有超时机制从而避免脏数据,并且为了更好的支持运算Redis还支持Lua脚本从而扩展了Redis的灵活性
Redis事务
Redis的事务是使用MULTI-EXEC的命令组合,它保证了两件事情,其一保证事务是一个被隔离的操作,事务中的方法会被Redis序列化并按顺序执行,事务在执行过程中不会被其他客户端的命令打断;其二事务是原子性操作,要么全部执行,要么全部不执行。如下表所示是Redis事务的基础命令
在Redis中开启事务是使用multi命令,执行事务是exec命令,在这两个命令之间的Redis命令都会进入队列,知道exec命令出现,会一次性发送队列里的全部命令去执行,在执行这些事务命令时,其他客户端不能再插入任何命令,这便是Redis事务的机制
> multi
"OK"
> set key1 value1
"QUEUED"
> get key1
"QUEUED"
> exec
1) "OK"
2) "value1"
如果回滚事务,则使用discard命令,它会进入事务队列中的命令中,这样事务中的方法就不会被执行了
> multi
"OK"
> set key1 value1
"QUEUED"
> get key1
"QUEUED"
> discard
"OK"
> exec
"ERR EXEC without MULTI"
使用RedisTemplate并不能保证多个对Redis的操作是同一个链接完成的,更多的时候在使用Spring时,会使用SessionCallback接口处理,下面是借助Spring的SessionCallback接口完成上述Redis命令同样的事情,代码如下
public static void testTransaction() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
try {
// Lambda表达式创建SessionCallback对象
SessionCallback session = (ops) -> {
ops.multi();
// 命令放入队列
ops.boundValueOps("key1").set("value1");
// 注意由于命令只是进入队列,而没有被执行,
// 所以此处采用get命令,而value却返回为null
String value = (String) ops.boundValueOps("key1").get();
System.out.println("事务执行中,命令只入队,没被执行,所以:value=" + value);
// 此时list会保存之前进入队列的所有命令的结果
List list = ops.exec();// 执行事务
// 事务结束后,获取value1
value = (String) redisTemplate.opsForValue().get("key1");
System.out.println("key1->" + value);
return value;
};
redisTemplate.execute(session);
} finally {
applicationContext.close();
}
}
需要特别注意的是,当命令进入队列后,Redis就会检测事务的命令是否正确,如果不正确则会产生错误,之前和之后的命令都会被回滚,就变成了什么都没有执行;而当命令格式正确,被操作的数据结构引起错误时,则之前和之后的命令都会被执行,并不会回滚。这一点和数据库不太一样,需要通过程序去检测数据的正确性从而保证Redis事务的正确执行,避免数据不一致
> multi
"OK"
> set key1 value1
"QUEUED"
> set key2 value2
"QUEUED"
> incr key1
"QUEUED"
> del key2
"QUEUED"
> exec
1) "OK"
2) "OK"
3) "ReplyError: ERR value is not an integer or out of range"
4) "1"
> multi
"OK"
> set key3 value3
"QUEUED"
> incr
"ERR wrong number of arguments for 'incr' command"
> set key4 value4
"QUEUED"
> exec
"EXECABORT Transaction discarded because of previous errors."
> get key3
(nil)
> get key4
(nil)
>
监控Redis事务
在Redis中使用watch命令可以决定事务是执行还是回滚,通常可以在multi命令之前使用watch命令监控某些键值对,然后使用multi命令开启事务,执行各类对数据结构的操作命令,这些命令进入队列,当Redis使用exec命令执行事务时,首先会去对比watch命令监控的键值对,如果没有发生变化,则会执行队列中的命令,提交事务;如果发生变化则不执行任何队列中的命令,将事务回滚,并且无论是否回滚,Redis都会取消执行事务前的watch命令
Redis参考了多线程中使用的比较和交换(Compare and Swap, CAS)的执行,在数据高并发环境的操作中,我们把这样的机制就称为乐观锁,其过程大致是这样的当数据被多个线程共享访问的时候,很容易造成数据不一致的情况,首先会在线程开始时读取这些数据并将其保存到当前线程的副本中,这个时候称这些数据为旧值,watch命令就具备这样的功能,然后开启线程业务逻辑,由multi提供这个功能,在执行更新数据之前,比较当前线程副本保存的旧值和当前线程访问的当前值是否一致,如果不一致如果不一致,说明这个旧值已经被其他线程修改,为了保持数据一致性,则线程不会再修改该数据而是回滚事务,相反的则执行对应的业务逻辑修改数据,exec命令就类似这样的功能
而多线程的CAS原理会产生ABA问题,简单讲就是监控值或者线程副本中的值是A,在改线程对该值进行最终判断是否要修改之前,该值被其他线程修改为B,然后又被其他线程修改为A,这个时候原来的线程进行判断发现值没变,则执行业务逻辑修改数据的场景,这就是CAS原理中的漏洞ABA问题,因此仅仅记录一个旧值还不够,需要通过其他方法规避该漏洞,常见的会为数据增加一个版本号,每次操作都给版本号加1,即便是回退操作也是加1,这样再采用CAS原理探测线程值和其版本号时就可以排除ABA问题的影响,从而保证数据一致性
Redis在执行事务的过程中,并不会阻塞其他链接的并发,只是通过watch监控的键值对保证数据的一致性,所以Redis的多个事务完全可以在非阻塞的的多线程环境中并发执行,而且Redis的不会产生ABA的问题(即便再回到A,Redis还是认为已经被修改了),所以回滚事务
> set key1 value1
"OK"
> watch key1
"OK"
> multi
"OK"
> set key2 value2
"QUEUED"
> exec
1) "OK"
> get key2
"value2"
Redis流水线
在Redis事务中,Redis提供了队列可以批量执行任务,性能比较高,然而multi…exec的模式会检测对应的锁和序列化命令,带来了不小的系统开销;有时候我们希望在没有任何附加条件的场景下使用队列批量执行系列的命令,从而提高系统的性能,这便是Redis流水线(pipelined)的场景
Redis的读写速度非常快,性能的瓶颈往往出现在网络通信中,多条命令逐一到达Redis服务器,没条命令产生的网络延迟是必须考虑的因素,否则无法将Redis的高性能充分发挥,同样Redis流水线技术也可以解决此类问题
使用流水线技术测试性能,代码如下
public static void testPipeline() {
// 连接Redis
Jedis jedis = new Jedis("192.168.3.115", 6379);
// 如果需密码
// jedis.auth("Ms123!@#");
try {
long start = System.currentTimeMillis();
// 开启流水线
Pipeline pipeline = jedis.pipelined();
// 这里测试10万条的读/写2个操作
for (int i = 0; i < 100000; i++) {
int j = i + 1;
pipeline.set("pipeline_key_" + j, "pipeline_value_" + j);
pipeline.get("pipeline_key_" + j);
}
// pipeline.sync();//这里只执行同步,但是不返回结果
// pipeline.syncAndReturnAll();将返回执行过的命令返回的List列表结果
List result = pipeline.syncAndReturnAll();
long end = System.currentTimeMillis();
// 计算耗时
System.err.println("耗时:" + (end - start) + "毫秒");
} finally {// 关闭连接
jedis.close();
}
}
在一个普通PC上执行这段测试代码实际耗时587毫秒,也就是在不到一秒间,完成了10万次对Redis的读写,相比没有使用流水线技术的情况提高了数倍;执行的命令返回值都存在了一个list中,在实际开发中如果需要保存还需要考虑list对象的大小,它会吃内存严重了可能会导致内存溢出,即便不溢出也会导致其他使用内存的程序受到影响
在Spring中执行流水线和执行事务如出一辙,使用RedisTemplate提供的executePipelined方法即可,如下代码所示
public static void testRedisPipeline() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
RedisConfig.class);
StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
// 使用Java8的Lambda表达式创建SessionCallback
SessionCallback callBack = (ops) -> {
for (int i = 0; i < 100000; i++) {
int j = i + 1;
String key = "pipeline_key_" + j;
String value = "pipeline_value_" + j;
ops.boundValueOps(key).set(value);
ops.boundValueOps(key).get();
}
return null;
};
long start = System.currentTimeMillis();
// 执行Redis的流水线命令,并返回结果
List resultList = redisTemplate.executePipelined(callBack);
long end = System.currentTimeMillis();
System.err.println("耗时:" + (end - start) + "毫秒");
}
Redis发布订阅
这个场景实际上非常常见,当银行卡被扣费的时候,会通知到扣费客户端例如微信,支付宝,并以短信的形式通知到短信终端,这就是一种发布订阅模式,由银行系统发布由消费终端订阅,而Redis支持这样的模式
首先需要有一个消息渠道,订阅消息的终端监听这个渠道,而发布消息的终端想消息渠道发布消息,然后监听消息渠道的终端监听到新消息发布,则处理各自的业务逻辑,观察者模式就是这个模式的典型应用
如图所示,首先打开一个Redis终端,运行 subscribe 命令,客户端会进入订阅模式,并开始监听指定的一个或多个频道。每当有消息发送到这些频道时,客户端就会收到通知;然后再启动另一个Redis终端发布消息到前一个终端订阅的渠道,它就能收到发布的消息
在Spring中,它提供了org.springframework.data.redis.connection.MessageListener
接口,接口中定义了个方法public void onMessage(Message message, byte[] pattern)
用来监听消息,同时Spring还提供了Redis消息监听容器,对应的类为RedisMessageListenerContainer
,该容器中可以设置线程池大小,因此需要创建一个线程任务池,综合下来看就是在使用Spring中使用Redis的消息监听,需要构建3个对象,监听者(MessageListener)、线程任务池、监听容器。如下代码所示
/**
* 创建Redis消息监听者
* @param redisTemplate StringRedisTemplate对象
* @return Redis消息监听者
*/
@Bean("redisMessageListener")
public MessageListener initRedisMessageListener(
@Autowired StringRedisTemplate redisTemplate) {
// 通过Lambda表达式创建MessageListener对象
return (Message message, byte[] bytes) -> {
// 获取channel
byte[] channel = message.getChannel();
// 使用字符串序列化器转换
String channelStr = new String(channel);
System.out.println("渠道:" + channelStr);
// 获取消息
byte[] body = message.getBody();
// 使用值序列化器转换
String msgBody = new String(body);
System.out.println("消息体:" + msgBody);
// 渠道名称转换
String bytesStr = new String(bytes);
System.out.println("渠道名称:" + bytesStr);
};
}
/**
* 构建线程任务池
* @return 线程任务池
*/
@Bean
public ThreadPoolTaskScheduler initTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 设置池大小为20条线程
taskScheduler.setPoolSize(20);
return taskScheduler;
}
/**
* 创建Redis消息监听容器
* @param connectionFactory 连接工厂
* @param taskScheduler 线程任务池
* @param listener 监听器
* @return Redis消息监听容器
*/
@Bean
public RedisMessageListenerContainer initListenerContainer(
@Autowired RedisConnectionFactory connectionFactory,
@Autowired ThreadPoolTaskScheduler taskScheduler,
@Autowired MessageListener listener) {
// 创建消息监听容器
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置Redis连接工厂
container.setConnectionFactory(connectionFactory);
// 设置线程任务池
container.setTaskExecutor(taskScheduler);
// 创建渠道
Topic topic = new ChannelTopic("chat");
// 让监听者和渠道绑定
container.addMessageListener(listener, topic);
return container;
}
测试代码如下
public static void testMessageListener() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
try {
// 渠道
String channel = "chat";
// 将消息发送到“chat”渠道
redisTemplate.convertAndSend(channel, "let's go");
} finally {
applicationContext.close();
}
}
Redis超时机制
Redis的超时机制是非常重要的机制,就像Java虚拟机提供了的垃圾回收机制,来保证Java程序使用过且不再使用的Java对象能被及时销毁,从而回收内存空间,保证内存持续使用,同样Redis是基于内存运行的,也必须要考虑键值的回收和管理问题;和Jvm一样当内存不足时,Redis会触发垃圾回收机制,在Java中程序员可以使用System.gc()
建议Jvm执行一次垃圾回收(只是建议java虚拟机进行垃圾回收,但未必会执行,所以是建议),但这样做可能会导致java虚拟机在执行垃圾回收的时候严重的性能下降,因为在执行垃圾回收的时候同样会消耗系统资源,在本就所剩无几的系统资源中执行垃圾回收也未必可取,因此选择合适的时机进行垃圾回收才有利于系统性能的提升
超时机制对于业务也非常重要,Redis中缓存的数据未必是最新的数据,这取决于实际系统架构的设计,以及在Redis和持久化数据之间的同步机制,当Redis中的数据并非最新的数据时,它所存储的数据就已经不是真实数据了,这样数据称为脏数据,而如果设计了合理的超时机制,一旦超时则无法再在Redis中读取了,就会触发系统访问新的数据源并再次向Redis同步新数据的机制,超时命令如下表所示
> set key10 value10
"OK"
> ttl key10
(integer) -1
> expire key10 120
(integer) 1
> ttl key10
(integer) 118
> persist key10
(integer) 1
> ttl key10
(integer) -1
> expireat key10 1582016553990
(integer) 1
> ttl key10
(integer) 1580293017661
>
使用Spring完成同样的事,代码如下
public static void testExpire() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
SessionCallback session = (ops) -> {
// 清空“key1”
ops.delete("key1");
// 设置“key1”
ops.boundValueOps("key1").set("value1");
String keyValue = (String) ops.boundValueOps("key1").get();
// 获取超时时间
Long expSecond = ops.getExpire("key1");
System.out.println(expSecond);
boolean b =false;
// 设置120秒后超时
b = ops.expire("key1", 120L, TimeUnit.SECONDS);
Long l = 0L;
// 获取超时时间
l = ops.getExpire("key1");
System.out.println(l);
// 设置永不超时
b = ops.persist("key1");
Long now = System.currentTimeMillis();
Date date = new Date();
date.setTime(now + 120000);
// 设置到达某一时间点超时
ops.expireAt("key", date);
return null;
};
redisTemplate.execute(session);
}
这里需要注意的是,Redis的键值对超时不会被自动回收,只会标识该键值对超时,回收有单独的机制,如果超时就回收,那么可能Redis会出现卡顿,进而影响系统性能,垃圾回收也需要消耗系统资源,并不是超时就回收,Redis提供了两种回收机制一种是定时回收,一种是惰性回收,所谓定时回收就是在固定的时间触发一段代码按策略回收键值对,而惰性回收则是当一个超时的键值对被再次访问,例如执行get命令,才触发Redis将其从内存中回收
Redis使用Lua脚本
在Redis2.6之后的版本除了使用Redis命令,还可以使用Lua语言操作Redis,从而弥补Redis命令计算能力不足的情况,且在Redis中执行Lua语言是原子性的,也就是说Redis执行Lua是不会被中断的这个特性有助于并发下的数据一致性;在Redis中执行Lua有两种方式,一种是直接执行脚本代码,一种是执行脚本文件,在执行脚本代码的方式中,Redis可以缓存脚本并使用SHA-1算法对脚本进行签名,然后将SHA-1表示返回,只需要通过标识便可运行,看几个例子
> eval "return 'hello lua'" 0
"hello lua"
> eval "redis.call('set', KEYS[1], ARGV[1])" 1 lua-key lua-value
(nil)
> get lua-key
"lua-value"
EVAL script numkeys key [key ...] arg [arg ...]
其中script: Lua 脚本的内容、numkeys: 脚本中使用的键的数量、key [key …]: 脚本中使用的键、arg [arg …]: 传递给脚本的参数
Lua 脚本中可以使用
redis.call
和redis.pcall
来调用 Redis 命令。redis.call 用于正常调用,而 redis.pcall 用于处理可能出现的错误
有时候需要多次执行相同的脚本,这种情况下可以使用Redis缓存脚本,实例如下
> script load "redis.call('set', KEYS[1], ARGV[1])"
"27d4ba7a9326bdcdabb271c0771037674735bec2"
> EVALSHA 27d4ba7a9326bdcdabb271c0771037674735bec2 1 lua-key1 lua-value1
(nil)
> get lua-key1
"lua-value1"
在java代码中,如果是简单的脚本使用jedis中的API会相对简单,如下代码所示
public static void testEval() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
// 注意StringRedisTemplate是RedisTemplate的子类
// 因此这里使用名称获取RedisTemplate对象
RedisTemplate<String, Object> redisTemplate = applicationContext.getBean("redisTemplate", RedisTemplate.class);
// 如果是简单的操作,使用原来的Jedis会简易些
Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();
// 执行简单的Lua脚本
String helloJava = (String) jedis.eval("return 'hello java'");
System.out.println(helloJava);
// 执行带参数的脚本
jedis.eval("redis.call('set',KEYS[1], ARGV[1])", 1, "lua-key", "lua-value");
String luaValue = (String) jedis.get("lua-key");
System.out.println(luaValue);
// 缓存脚本,返回SHA1签名标识
String sha1 = jedis.scriptLoad("redis.call('set',KEYS[1], ARGV[1])");
// 通过标识执行脚本
jedis.evalsha(sha1, 1, new String[] { "key1", "val1" });
// 获取执行脚本后的数据
String shaVal = jedis.get("key1");
System.out.println(shaVal);
// 关闭连接
jedis.close();
}
这段代码进行了字符串的存储,但实际中更多的是存储对象,这个时候就要考虑使用Spring提供的RedisScript接口,它提供了一个实现类DefaultRedisScript,使用RedisScript接口在Redis中操作java对象,代码如下所示:
public static void testRedisScript() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
// 注意StringRedisTemplate是RedisTemplate的子类
// 因此这里使用名称获取RedisTemplate对象
RedisTemplate<String, Object> redisTemplate = applicationContext.getBean("redisTemplate", RedisTemplate.class);
// 定义默认脚本封装类
DefaultRedisScript<Role> redisScript = new DefaultRedisScript<>();
// 设置脚本
redisScript.setScriptText("redis.call('set'," + "KEYS[1], ARGV[1]) return redis.call('get', KEYS[1])");
// 定义操作的key列表
List<String> keyList = new ArrayList<String>();
keyList.add("role1");
// 需要序列化保存和读取的对象
Role role = new Role();
role.setId(1L);
role.setRoleName("role_name_1");
role.setNote("note_1");
// 获得标识字符串
String sha1 = redisScript.getSha1();
System.out.println(sha1);
// 设置返回结果类型,如果没有这行代码,结果返回为空
redisScript.setResultType(Role.class);
// 定义序列化器
RedisSerializer serializer = RedisSerializer.java();
// 执行脚本
// 第一个是RedisScript接口对象,第二个是参数序列化器
// 第三个是结果序列化器,第四个是Reids的key列表,最后是参数列表
Role result = (Role) redisTemplate.execute(redisScript, serializer, serializer, keyList, role);
// 打印结果
System.out.println(result.getRoleName());
}
如果脚本的工作较多,代码比较长,则通常会放到脚本文件中,然后用执行脚本文件的形式完成,首先创建一个lua脚本文件,将如下代码写入redisscript.lua
redis.call('set', KEYS[1], ARGV[1])
redis.call('set', KEYS[2], ARGV[2])
local n1 = tonumber(redis.call('get', KEYS[1]))
local n2 = tonumber(redis.call('get', KEYS[2]))
if n1 > n2 then
return 1
end
if n1 == n2 then
return 0
end
if n1 < n2 then
return 2
end
Java代码如下所示
public static void testLuaFile() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
// 注意StringRedisTemplate是RedisTemplate的子类
// 因此这里使用名称获取RedisTemplate对象
RedisTemplate<String, Object> redisTemplate = applicationContext.getBean("redisTemplate", RedisTemplate.class);
// 读入Lua文件流
File file = new File("src/main/resources/redisscript.lua");
byte[] bytes = getFileToByte(file);
Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();
// 发送文件二进制给Redis,这样REdis就会返回sha1标识
byte[] sha1 = jedis.scriptLoad(bytes);
// 使用返回的标识执行,其中第二个参数2,表示使用2个键
// 而后面的字符串都转化为了二进制字节进行传输
Object obj = jedis.evalsha(sha1, 2, "key1".getBytes(), "key2".getBytes(), "2".getBytes(), "4".getBytes());
System.out.println(obj);
}
/**
* 把文件转化为二进制数组
* @param file 文件
* @return 二进制数组
*/
public static byte[] getFileToByte(File file) {
byte[] by = new byte[(int) file.length()];
try {
InputStream is = new FileInputStream(file);
ByteArrayOutputStream bytestream = new ByteArrayOutputStream();
byte[] bb = new byte[2048];
int ch;
ch = is.read(bb);
while (ch != -1) {
bytestream.write(bb, 0, ch);
ch = is.read(bb);
}
by = bytestream.toByteArray();
} catch (Exception ex) {
ex.printStackTrace();
}
return by;
}