缓存数据一致性
引入缓存会导致一些比如修改/删除内容后缓存还是之前的数据,这会导致缓存和数据库数据不一致的情况,本文将提到相关的解决方案,而且还提供了canal去实现每次在更新数据库的时候自动同步缓存,而无需将代码都写在后端造成冗余。
缓存的更新方案(传统)
先更新DB还是先更新缓存?是更新缓存还是删除缓存?在常规情况下,怎么操作都可以,但一旦存在高并发场景,就需要采用合适的方案。
1、先更新数据库再更新缓存(双写策略1)
线程A:更新数据库(第1s)——> 更新缓存(第10s)
线程B:更新数据库 (第3s)——> 更新缓存(第5s)
并发场景下,这样的情况是很容易出现的,每个线程的操作先后顺序不同,这样就导致请求B的缓存值被请求A给覆盖了,数据库中是线程B的新值,缓存中是线程A的旧值,并且会一直这么脏下去直到缓存失效(设置了过期时间)
2、先更新缓存再更新数据库(双写策略2)
线程A:更新缓存(第1s)——> 更新数据库(第10s)
线程B: 更新缓存(第3s)——> 更新数据库(第5s)
和前面一种情况相反,缓存中是线程B的新值,而数据库中是线程A的旧值。
前两种方式之所以会在并发场景下出现异常,本质上是因为更新缓存和更新数据库是两个操作,我们没有办法控制并发场景下两个操作之间先后顺序,也就是先开始操作的线程先完成自己的工作。
3、先删除缓存再更新数据库
通过这种方式,我们很惊喜地发现,前面困扰我们的并发场景的问题确实被解决了!两个线程都只修改数据库,不管谁先,数据库以之后修改的线程为准。
但这个时候,我们来思考另一个场景:两个并发操作,一个是更新操作,另一个是查询操作,更新操作删除缓存后,查询操作没有命中缓存,先把老数据读出来后放到缓存中,然后更新操作更新了数据库。于是,在缓存中的数据还是老的数据,导致缓存中的数据是脏的。很显然,这种状况也不是我们想要的。
延时双删方案(大多数)
传统方案无论怎么样似乎都会出现漏洞导致缓存数据不一致的问题。大部分企业都是采用的延时双删方案,这种方案很简单而且高效。
1.删除缓存
2.更新数据库
3.睡眠一段时间(500ms)
4.再次删除缓存
加了个睡眠时间,主要是为了确保请求 A 在睡眠的时候,请求 B 能够在这这一段时间完成「从数据库读取数据,再把缺失的缓存写入缓存」的操作,然后请求 A 睡眠完,再删除缓存。
所以,请求 A 的睡眠时间就需要大于请求 B 「从数据库读取数据 + 写入缓存」的时间。
但是具体睡眠多久其实是个玄学,很难评估出来,所以这个方案也只是尽可能保证一致性而已,极端情况下,依然也会出现缓存不一致的现象。因此,还是不太建议这种方案。
对于蓝色的文字,“删除缓存 10”必须在“回写缓存10”后面,那如何才能保证一定是在后面呢?让请求 A 的最后一次删除,等待 500ms。
弊端: 在读写模式(MySQL主从复制)同时存在的情况下,会有数据一致性问题
分布式读写锁
- 读读允许并发
- 读写不允许并发
- 写读不允许并发
- 写写不允许并发
依赖“锁”的机制,避免出现并发读写。弊端:性能低
读数据方法(查询操作)
@Autowired
private RedissonClient redissonClient;
/**
* 读取数据方法 允许并发读。 但不允许进行并发读写,写读
*
* @return
*/
@Override
public String read() {
System.out.println("read当前节点被调用:" + port);
//1.创建读写锁对象
RReadWriteLock rwlock = redissonClient.getReadWriteLock("myLock");
//2.获取读锁对象
RLock lock = rwlock.readLock();
//3.获取读锁
lock.lock(5, TimeUnit.SECONDS);
//优先从缓存中获取数据
String data = redisTemplate.opsForValue().get("data");
if (StringUtils.isNotBlank(data)) {
//TODO 模拟查询数据库
data = "dbData";
//将查询结果放入缓存
redisTemplate.opsForValue().set("daa", data);
}
//todo 故意不释放读锁 - 5s后自动释放
lock.unlock();
return data;
}
修改数据方法(修改操作)
/**
* 写数据方法 不允许并发读写,写读。 先哪个操作来的先执行谁,等待写锁或者读锁释放,写数据方法才能继续执行
*/
@Override
public void write() {
System.out.println("write当前节点被调用:" + port);
//1.创建读写锁对象
RReadWriteLock rwlock = redissonClient.getReadWriteLock("myLock");
//2.获取写锁对象
RLock lock = rwlock.writeLock();
//3.获取写锁
lock.lock(5, TimeUnit.SECONDS);
//删除缓存
redisTemplate.delete("data");
//模拟修改数据库
String data = "writeData";
//todo 故意释放写锁 -- 5s后自动释放
lock.unlock();
}
监听 MySQL binlog方案(推荐)
「先更新数据库,再删缓存」的策略的第一步是更新数据库,那么更新数据库成功,就会产生一条变更日志,记录在 binlog 里。
于是我们就可以通过订阅 binlog 日志,拿到具体要操作的数据,然后再执行缓存删除,阿里巴巴开源的 Canal 中间件就是基于这个实现的。
alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)
Canal 模拟 MySQL 主从复制的交互协议,把自己伪装成一个 MySQL 的从节点,向 MySQL 主节点发送 dump 请求,MySQL 收到请求后,就会开始推送 Binlog 给 Canal,Canal 解析 Binlog 字节流之后,转换为便于读取的结构化数据,供下游程序订阅使用。
下图是 Canal 的工作原理:
所以,如果要想保证「先更新数据库,再删缓存」策略第二个操作能执行成功,我们可以使用「消息队列来重试缓存的删除」,或者「订阅 MySQL binlog 再操作缓存」,这两种方法有一个共同的特点,都是采用异步操作缓存。
验证MySQL是否开启BinLog确保Value为 ON
show variables like '%log_bin%';
在MySQL新增用户用于监听Binlog日志,在Canal容器中要使用该用户
#创建用户
CREATE USER canal IDENTIFIED BY '123456';
#给用户授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
#如果是MySQL8.X以上需要对加密方式进行设置
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
#刷新生效
FLUSH PRIVILEGES;
采用Docker方式创建Canal服务端。以下为创建Canal容器命令,要修改要监听主MySQL数据库IP跟端口用户名以及密码确保正确,别忘了把ip改成自己的。这里用的是1.1.5版本的canal
docker run -p 11111:11111 --name canal \
-e canal.destinations=tingshuTopic \
-e canal.instance.master.address=192.168.200.6:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=123456 \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=.*\\..* \
-d canal/canal-server:v1.1.5
在java项目中引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0</version>
</dependency>
在application.yml中配置canal的相关信息
#canal配置
canal:
destination: tingshuTopic #Canal服务端发送数据的话题名称跟上面容器里参数destinations的一样
server: 192.168.200.6:11111
提供Java实体类类监听变更后的数据,注意属性上使用**@Column**注解进行映射(这只是示例)。这里不需要提供表中全部的字段,只要与修改相关的即可。
//监听变更表
@Data
public class UserCDC {
@Column(name = "id")
private Long id;
@Schema(description = "nickname")
@TableField("nickname")
@Column(name = "nickname")
private String nickname;
@Schema(description = "头像图片")
@TableField("avatar_url")
@Column(name = "avatar_url")
private String avatarUrl;
@Schema(description = "性别")
@TableField("gender")
@Column(name = "gender")
private Integer gender;
@Schema(description = "出生年月")
@TableField("birthday")
@Column(name = "birthday")
private Date birthday;
}
监听到变更业务处理类(这只是测试)
/**
*
*/
@CanalTable("user") //监控指定表变更操作
@Component
public class SkuInfoHandler implements EntryHandler<SkuInfoCDC> {
@Autowired
private RedisTemplate redisTemplate;
/**
* 监听到新增操作
*
* @param skuInfo
*/
@Override
public void insert(UserCDC user) {
System.out.println("新增用户");
System.out.println("user = " + user);
}
/**
* 监听到修改操作
*
* @param before
* @param after
*/
@Override
public void update(UserCDC before, UserCDC after) {
System.out.println("修改用户");
System.out.println("修改用户before:" + before);
System.out.println("修改用户after:" + after);
try {
//删除缓存
String dataKey = RedisConst.SKUKEY_PREFIX + after.getId() + RedisConst.SKUKEY_SUFFIX;
redisTemplate.delete(dataKey);
} catch (Exception e) {
throw new RuntimeException(e);
//todo 将删除缓存失败消息发送MQ-将来MQ消费者监听后进行重试
}
}
/**
* 监听到删除操作
*
* @param skuInfo
*/
@Override
public void delete(UserCDC usercdc) {
}
}
注意:canal版本可能还不支持springboot3(至少在现在的1.1.5版本是不支持的),所以无法在本项目中使用canal。只能新建一个专门的项目去使用。还有就是如果CDC类里面有字段是null会造成封装失败。
ps:如果在springboot3环境下去用这个,请注意一定要将设置里java编译器给cdc模块设置专门的jdk版本(并且不要填默认值!),项目结构也要设置专门的8版本。