问题场景
在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:
- 强一致性:写入何值,读出何值,但在实现中,性能较差。
- 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
- 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。
解决方案
强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。
弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。
最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。
在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。
Cache Aside
- 该方案在读取数据库时,首先从缓存中查询数据库:
-
- 如果缓存中存在数据,则直接返回给应用程序。
- 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
- 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。
Cache Aside注意事项
- 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
- 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。
综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。
Cache Aside demo
基于Cache Aside实现点赞功能。
实体类信息
public class Like {
private String postId;
private int likeCount;
// 构造函数、getter和setter方法
}
逻辑层
@Service
public class LikeService {
private final LikeRepository likeRepository;
private final RedisUtils redisUtils;
public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {
this.likeRepository = likeRepository;
this.redisUtils = redisUtils;
}
public Like getLikeInfo(String postId) {
String cacheKey = "like:" + postId;
// 从缓存中获取点赞信息
Like like = (Like) redisUtils.get(cacheKey);
// 如果缓存中不存在,则从持久层(数据库)获取
if (like == null) {
like = likeRepository.findByPostId(postId);
// 如果数据库中存在数据,则保存到缓存中
if (like != null) {
redisUtils.set(cacheKey, like);
}
}
// 如果点赞信息为空,则初始化为0
if (like == null) {
like = new Like(postId, 0);
}
return like;
}
public void addLike(String postId) {
String cacheKey = "like:" + postId;
// 在持久层(数据库)新增点赞信息
Like like = likeRepository.findByPostId(postId);
if (like == null) {
like = new Like(postId, 1);
} else {
like.setLikeCount(like.getLikeCount() + 1);
}
likeRepository.save(like);
// 更新缓存中的数据
redisUtils.set(cacheKey, like);
}
}
canal
引用canal官方说明:
canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
前置知识:MySQL主从复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
环境搭建
需要的开发环境:
- MySQL
- Redis
- Canal
特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。
那么,麻烦ChatGPT写一个Docker-Compose文件吧:
- version请按本地安装的Docker-Compose版本定义。
- Docker-Compose安装请自行查询。
version: '2.4'
services:
mysql:
image: mysql:8.0
container_name: mysql
restart: false
environment:
MYSQL_ROOT_PASSWORD: root
ports:
- "33060:3306"
volumes:
- ./mysql-data:/var/lib/mysql
canal:
image: canal/canal-server:v1.1.5
container_name: canal
restart: false
ports:
- "11111:11111"
- "11112:11112"
depends_on:
- mysql
environment:
- canal.destinations=example
- canal.instance.mysql.slaveId=1234
- canal.instance.master.address=mysql:3306
- canal.instance.dbUsername=root
- canal.instance.dbPassword=root
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=false
- canal.instance.gtidon=false
- canal.instance.filter.regex=.*
- canal.instance.filter.black.regex=mysql\.slave_.*
redis:
image: redis:latest
restart: always
ports:
- 6379:6379
volumes:
- ./redis_data:/data
将文件命名为:docker-compose.yml,开始安装。
docker-compose up -d
本案例使用balance余额表来演示,数据库表设计如下:
CREATE TABLE `balance` (
`id` varchar(50) NOT NULL COMMENT '主键',
`account` varchar(50) NOT NULL COMMENT '账户',
`amount` decimal(10,2) NOT NULL COMMENT '金额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
COMMENT='余额表';
开发环境
- JDK 17
- SpringBoot 3.1.2
- MyBatis-Plus 3.5.3.1
- druid
- lettuce
开发环境根据您的实际需要选择即可。
环境启动后,进入编码阶段。
/**
* @author: liu_pc
* Date: 2023/8/25
* Description: 余额信息变更Redis变成处理类
* Version: 1.0
*/
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {
private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);
private final RedisUtils redisUtils;
private final CanalConfig canalConfig;
private final Executor executor;
@Value("${canal.server.open}")
private boolean open;
@Autowired
public BalanceRedisProcessorService(RedisUtils redisUtils,
CanalConfig canalConfig,
@Qualifier("ownThreadPoolExecutor") Executor executor) {
this.redisUtils = redisUtils;
this.canalConfig = canalConfig;
this.executor = executor;
}
@PostConstruct
public void init() {
Map<String, String> mainMdcContext = Maps.newHashMap();
mainMdcContext.put("canal-thread", "balance-redis-processor-service");
MDC.setContextMap(mainMdcContext);
executor.execute(this);
logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");
}
@Override
public void run() {
CanalConnector canalConnector = canalConfig.canalConnector();
canalConnector.connect();
// 回滚到未进行ack的地方
canalConnector.rollback();
try {
while (open) {
// 获取数据 每次获取一百条改变数据
Message message = canalConnector.getWithoutAck(100);
//获取这条消息的id
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
// 处理数据
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
String tableName = entry.getHeader().getTableName();
// 判断是否是 Balance 表的 amount 字段变更
if ("balance".equals(tableName)) {
StringBuilder redisKey = new StringBuilder("balance:");
for (CanalEntry.Column column : columns) {
logger.info("Balance changed in 'balance' dataInfo: {}", column);
if ("id".equals(column.getName())) {
String changeId = column.getValue();
logger.info("当前变更id为:{}", changeId);
redisKey.append(changeId);
}
if ("amount".equals(column.getName())) {
String changeValue = column.getValue();
logger.info(changeValue);
redisUtils.set(redisKey.toString(), changeValue);
}
}
}
}
}
}
}
// 确认消费完成这条消息
canalConnector.ack(message.getId());
logger.info("消费成功");
}
} catch (Exception e) {
logger.warn("canal-消费失败");
} finally {
// 关闭连接
canalConnector.disconnect();
}
}
}
测试
使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:
Redis数据:
完成。
我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。
mdc_logback
同时,如果您对笔者其他文章感兴趣,可以扫一扫关注笔者的公众号:种颗代码技术树
公众号文章更新更及时,以及一些程序员周边相关更新。
感谢您阅读到这里,不胜感激。