Redis简介
Redis是一个开源的使用C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value的NoSQL数据库。特点如下:
- 读写速度快:Redis官网测试读写能到10万左右每秒。速度快的原因这里简单说一下,第一是因为数据存储在内存中,我们知道机器访问内存的速度是远远大于访问磁盘的,其次是Redis采用单线程的架构,避免了上下文的切换和多线程带来的竞争,也就不存在加锁释放锁的操作,减少了CPU的消耗,第三点是采用了非阻塞IO多路复用机制。
- 数据结构丰富: Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构
- 支持持久化:Redis提供了RDB和AOF两种持久化策略,能最大限度地保证Redis服务器宕机重启后数据不会丢失。
- 支持高可用:可以使用主从复制,并且提供哨兵机制,保证服务器的高可用。
- 客户端语言多:因为Redis受到社区和各大公司的广泛认可,所以客户端语言涵盖了所有的主流编程语言,比如Java,C,C++,PHP,NodeJS等等
数据类型
string类型
简单的key-value模型。string数据结构是一种简单动态字符串(simple dynamic string, SDS)
。
应用场景
- 计数器(全局,当服务是多个实例时)
- 可以做缓存(将对象转换为JSON字符串)
- 使用setnx(set if not exist)命令做分布式锁
> set key value
OK
> get key
value
> exists key
1
> strlen key
5
> del key
1
> get key
null
@Test
public void testString() {
redisTemplate.opsForValue().set("key", 1, 1, TimeUnit.MINUTES);
Object value = redisTemplate.opsForValue().get("key");
System.out.println(value); // 1
Long key = redisTemplate.opsForValue().increment("key");
System.out.println(key); // 2
Object value1 = redisTemplate.opsForValue().get("key");
System.out.println(value1); // 2
}
list类型
list是一个双向链表,是有序的,可以做列表,用lrange实现读取,高性能分页
rpush myList value1 # 向list的头部(右边)添加元素
rpush myList value2 value3 # 向list的头部(右边)添加多个元素
lpop myList # 将list的尾部(左边)元素取出
lrange myList 0 1 # 查看对应下标的list列表
lrange myList 0 -1 # 查看列表中的所有元素
llen myList # 查看链表长度
@Test
public void testList() {
redisTemplate.opsForList().rightPush("list", "value1");
redisTemplate.opsForList().rightPush("list", "value2");
redisTemplate.opsForList().rightPush("list", "value3");
Object value = redisTemplate.opsForList().leftPop("list");
System.out.println(value); // value1
List list1 = redisTemplate.opsForList().range("list", 0, 1);
System.out.println(list1); // [value2, value3]
List list2 = redisTemplate.opsForList().range("list", 0, -1);
System.out.println(list2); // [value2, value3]
Long size = redisTemplate.opsForList().size("list");
System.out.println(size); // 2
}
hash类型
hash类似Java的HashMap,内部都是数组加链表的形式。
> hset userInfoKey name "tong" description "dev" age "24"
> hexists userInfoKey name # 查看key对应的value中的字段是否存在
> hget userInfoKey name # 获取存储在哈希表中指定字段的值
> hgetall userInfoKey # 获取在哈希表中指定key的所有字段和值
> hkeys userInfoKey # 获取key列表
> hvals userInfoKey # 获取value列表
> hset userInfoKey name "aloneness" # 修改某个字段对应的值
@Test
public void testHash() {
redisTemplate.opsForHash().put("userInfoKey", "name", "tong");
redisTemplate.opsForHash().put("userInfoKey", "description", "dev");
redisTemplate.opsForHash().put("userInfoKey", "age", "24");
Object value = redisTemplate.opsForHash().get("userInfoKey", "name");
System.out.println(value); // tong
Map<String, String> userInfoKey = redisTemplate.opsForHash().entries("userInfoKey");
for (Map.Entry<String, String> entry : userInfoKey.entrySet()) {
System.out.println(entry.getKey() + " " + entry.getValue());
}
}
set类型
set类似Java的HashSet。set类型是一种无序集合,不允许重复的数据。
> sadd mySet value1 value2 # 添加元素进去
> smembers mySet # 查看set中的所有元素
> scard mySet # 查看set的长度
> sismember mySet value1 # 检查某个元素是否存在set中,只能接受单个元素
@Test
public void testSet() {
redisTemplate.opsForSet().add("mySet", "value1");
redisTemplate.opsForSet().add("mySet", "value2");
redisTemplate.opsForSet().add("mySet", "value1");
Object value = redisTemplate.opsForSet().isMember("mySet", "value1");
System.out.println(value); // true
Set mySet = redisTemplate.opsForSet().members("mySet");
System.out.println(mySet); // [value1, value2]
}
zset类型
和set相对比,zset增加了一个权重参数score,集合的元素可以按照score进行有序排列,还可以通过score的范围来获取元素的列表。最主要的应用场景是排行榜,可以根据某个权重进行排序。
> zadd myZset 3.0 value1 # 添加元素到sorted set中,3.0为权重
> zadd myZset 2.0 value2 1.0 value3 # 一次添加多个元素
> zcard myZset # 查看元素数量
> zscore myZset value1 # 查看某个value的权重
> zrange myZset 0 -1 # 顺序输出某个范围内的元素
> zrevrange myZset 0 1 # 逆序输出某个范围区间的元素
@Test
public void testZSet() {
redisTemplate.opsForZSet().add("myZSet", "value1", 3.0);
redisTemplate.opsForZSet().add("myZSet", "value2", 2.0);
redisTemplate.opsForZSet().add("myZSet", "value3", 1.0);
redisTemplate.opsForZSet().add("myZSet", "value2", 5.0);
Set mySet = redisTemplate.opsForZSet().range("myZSet", 0, 10);
System.out.println(mySet); // [value3, value1, value2]
Set reverseRange = redisTemplate.opsForZSet().reverseRange("myZSet", 0, 10);
System.out.println(reverseRange); // [value2, value1, value3]
}
使用场景
缓存热点数据
热点数据:经常被查询,但未被修改或删除的数据,一般有两种数据保存方式
- 读取前,先去读Redis,如果没有数据,读取数据库,将数据拉入Redis。这种方案针对实时性不高的数据。还需要注意缓存击穿的问题。(提升查询性能,一般将数据写到ES)
- 数据增删改的时候,同时对Redis增删改。实时性强,适用于数据量不大的场景,例如:系统配置,数据字典,等等一些配置。
限时业务的应用
可以使用Redis的expire命令设置键的有效时间,到时间后Redis会删除它。利用这一特性可以运用在限时的优惠活动信息,手机验证码,session有效期等业务场景。
@Test
public void testExpire() throws InterruptedException {
redisTemplate.opsForValue().set("key", "value");
redisTemplate.expire("key", 1, TimeUnit.SECONDS);
System.out.println(redisTemplate.opsForValue().get("key")); // value
Thread.sleep(2000);
System.out.println(redisTemplate.opsForValue().get("key")); // null
}
计数器、流水号生成
Redis由于incrby命令可以实现原子性的递增,所以可以运用于高并发的秒杀活动、分布式序列号的生成、具体业务还体现在比如限制一个手机号发多少条短信、一个接口一分钟限制多少请求、一个接口一天限制调用多少次等等。
流水号的生成
@Component
public class SeriesCode {
private static final int ID_LENGTH = 6;
/**
* SC2210-000011
*/
private static final String SERIES_CODE_FORMAT = "SC%s-%s";
/**
* 32天 一个月
*/
private static final long EXPIRE_DAYS = 32;
@Autowired
private RedisTemplate redisTemplate;
public String getSeriesCode() {
String monthStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyMM"));
String monthRedisKey = String.format("month:key:%s", monthStr);
if (!redisTemplate.hasKey(monthRedisKey)) {
redisTemplate.opsForValue().set(monthRedisKey, 0, EXPIRE_DAYS, TimeUnit.DAYS);
}
long incr = redisTemplate.opsForValue().increment(monthRedisKey);
String code = String.format(SERIES_CODE_FORMAT, monthStr, formatChar(incr, ID_LENGTH));
String cacheKey = String.format("month:key:%s", code);
if (redisTemplate.hasKey(cacheKey)) {
return getSeriesCode();
} else {
redisTemplate.opsForValue().set(cacheKey, 1, EXPIRE_DAYS, TimeUnit.DAYS);
return code;
}
}
private static String formatChar(long incr, int length) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
sb.append("0");
}
DecimalFormat df = new DecimalFormat(sb.toString());
return df.format(incr);
}
}
分布式锁
利用了Redis的setnx(set if not exist)命令,如果不存在设置成功返回1,否则返回0。可以应用在表单防重,幂等性处理。
@Component
public class RedisLock {
@Resource
private RedisTemplate<String, String> redisTemplate;
public boolean lock(String key, Integer delayTime) {
long currentTime = System.currentTimeMillis();
if (redisTemplate.opsForValue().setIfAbsent(key, String.valueOf(currentTime))) {
return true;
}
String keyTime = redisTemplate.opsForValue().get(key);
if (StringUtils.isEmpty(keyTime)) {
if (redisTemplate.opsForValue().setIfAbsent(key, String.valueOf(currentTime))) {
return true;
} else {
return false;
}
}
long keyCurrentTime = Long.parseLong(keyTime) + delayTime;
/**
* 判断是否死锁
*/
if (keyCurrentTime < currentTime) {
redisTemplate.opsForValue().getOperations().delete(key);
if (redisTemplate.opsForValue().setIfAbsent(key, String.valueOf(currentTime))) {
return true;
}
}
return false;
}
public void unlock(String key) {
redisTemplate.opsForValue().getOperations().delete(key);
}
}
@Test
public void testRedisLock() {
String lockKey = "lock:redis";
boolean lock = redisLock.lock(lockKey, 3000);
System.out.println(lock); // true
boolean lock1 = redisLock.lock(lockKey, 3000);
System.out.println(lock1); // false
redisLock.unlock(lockKey);
boolean lock2 = redisLock.lock(lockKey, 3000);
System.out.println(lock2); // true
}
延时操作
可以使用Redisson组件,构建一个延迟队列。
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.0</version>
</dependency>
构建延时队列
@Slf4j
@Component
public class RedisDelayQueue {
@Autowired
private RedissonClient redissonClient;
/**
* 入队
*/
public <T> void addQueue(T t, long delay, TimeUnit timeUnit) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(t.getClass().getName());
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
/**
* 出队
*/
public <T> void getQueue(Class zClass, TaskEventListener taskEventListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(zClass.getName());
// 由于此线程需要常驻,可以新建线程,不用交给线程池管理
Thread thread = new Thread(() -> {
while (true) {
try {
T t = blockingFairQueue.take();
log.info(Thread.currentThread().getName() + "取得数据后,开始处理");
taskEventListener.invoke(t);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
});
thread.start();
}
/**
* 延时回调监听
*/
public abstract static class TaskEventListener<T> {
/**
* 执行方法
*/
public abstract void invoke(T t);
}
}
延时队列出队,并调用自定义的回调方法
@Component
@Slf4j
public class GetQueue {
@Autowired
private RedisDelayQueue redisDelayQueue;
@PostConstruct
public void init() {
redisDelayQueue.getQueue(Meta.class, new TaskEventListenerDemo());
}
public static class Meta {
String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Meta{" +
"value='" + value + '\'' +
'}';
}
}
}
@Slf4j
public class TaskEventListenerDemo extends RedisDelayQueue.TaskEventListener {
@Override
public void invoke(Object o) {
// 延迟队列到期了
log.info("延迟队列到期了,出队:{}", o);
}
}
测试延时队列
@Test
public void test() throws InterruptedException {
GetQueue.Meta meta = new GetQueue.Meta();
meta.setValue("hello, redission");
log.info("开始添加到队列中...");
redisDelayQueue.addQueue(meta, 30, TimeUnit.SECONDS);
Thread.sleep(1000 * 60);
}
Redisson组件的简单使用
Redisson是一个分布式的Redis客户端,支持很多分布式的操作。
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.0</version>
</dependency>
创建Redisson客户端
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.database:0}")
private Integer database;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
//单机配置
config.useSingleServer().setAddress("redis://"+host+":"+port);
config.useSingleServer().setPassword(password);
//集群配置
//config.useClusterServers().addNodeAddress(".... 可变参数 .");
// //主从
// config.useMasterSlaveServers()
// .setMasterAddress("主节点配置")
// .addSlaveAddress("从节点配置 可变参数");
// //哨兵配置
// config.useSentinelServers().addSentinelAddress("哨兵配置地址 可变参数")
// .setMasterName("主库地址")
// .setTimeout(50000)
// .setMasterConnectionPoolSize(10)
// .setSlaveConnectionPoolSize(5);
//
//
config.setEventLoopGroup(new NioEventLoopGroup());
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
分布式锁的简易使用
@Test
public void test() throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
scheduledExecutorService.execute(new Worker(i, countDownLatch));
}
countDownLatch.await();
scheduledExecutorService.shutdown();
}
public class Worker implements Runnable {
private int i;
private CountDownLatch countDownLatch;
public Worker(int i, CountDownLatch countDownLatch) {
this.i = i;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
RLock lock = redissonClient.getLock("redisson:lock");
try {
// 1. 最常见的使用方法
lock.lock();
// 2. 支持过期解锁功能,10秒钟以后自动解锁,无需调用unlock方法手动解锁
// lock.lock(10, TimeUnit.SECONDS);
// 3. 尝试加锁,最多等待2秒,上锁以后5秒自动解锁
// boolean res = lock.tryLock(2, 5, TimeUnit.SECONDS);
// if (res) {
doTask(i);
// }
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
countDownLatch.countDown();
}
}
void doTask(int i) {
System.out.println(Thread.currentThread().getName() + " sleep " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意事项
Key、Value序列化
引入spring-boot-starter-data-redis依赖会自动注入两个RedisTemplate,RedisTemplate默认使用的是JdkSerializationRedisSerializer,StringRedisTemplate默认使用的是StringRedisSerializer,当使用RedisTemplate做自增操作会报错,无法序列化。这需要我们自定义RedisTemplate并设置序列化。
我们可以将key设置为StringRedisSerializer,value设置为Jackson2JsonRedisSerializer
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置键(key)的序列化采用StringRedisSerializer。
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// 设置值(value)的序列化采用Jackson2JsonRedisSerializer
Jackson2JsonRedisSerializer<Object> redisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
redisTemplate.setValueSerializer(redisSerializer);
redisTemplate.setHashValueSerializer(redisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}