分布式锁mysql实现方式
方式1:唯一索引
- 创建锁表,内部存在字段表示资源名及资源描述,同一资源名使用数据库唯一性限制。
- 多个进程同时往数据库锁表中写入对某个资源的占有记录,当某个进程成功写入时则表示其获取锁成功
- 其他进程由于资源字段唯一性限制插入失败陷入自旋并且失败重试。
- 当执行完业务后持有该锁的进程则删除该表内的记录,此时回到步骤一。
表数据
create table `database_lock`(
`id` BIGINT NOT NULL AUTO_INCREMENT,
`resource` INT NOT NULL COMMENT '锁资源',
`description` varchar(1024) NOT NULL DEFAULT "" COMMENT '描述',
PRIMARY KEY (`id`),
UNIQUE KEY `resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表';
db.properties
driver=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://localhost:3306/distribute_lock?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=Asia/Shanghai
user=root
password=123456
- PropertiesReader
@Slf4j
public class PropertiesReader {
// Properties缓存文件
private static final Map<String, Properties> propertiesCache = new HashMap<String, Properties>();
public static Properties getProperties(String propertiesName) throws IOException {
if (propertiesCache.containsKey(propertiesName)) {
return propertiesCache.get(propertiesName);
}
loadProperties(propertiesName);
return propertiesCache.get(propertiesName);
}
private synchronized static void loadProperties(String propertiesName) throws IOException {
FileReader fileReader = null;
try {
// 创建Properties集合类
Properties pro = new Properties();
// 获取src路径下的文件--->ClassLoader类加载器
ClassLoader classLoader = PropertiesReader.class.getClassLoader();
URL resource = classLoader.getResource(propertiesName);
// 获取配置路径
String path = resource.getPath();
// 读取文件
fileReader = new FileReader(path);
// 加载文件
pro.load(fileReader);
// 初始化
propertiesCache.put(propertiesName, pro);
} catch (IOException e) {
log.error("读取Properties文件失败,Properties名为:" + propertiesName);
throw e;
} finally {
try {
if (fileReader != null) {
fileReader.close();
}
} catch (IOException e) {
log.error("fileReader关闭失败!", e);
}
}
}
}
- JDBCUtils
@Slf4j
public class JDBCUtils {
private static String url;
private static String user;
private static String password;
static {
//读取文件,获取值
try {
Properties properties = PropertiesReader.getProperties("db.properties");
url = properties.getProperty("url");
user = properties.getProperty("user");
password = properties.getProperty("password");
String driver = properties.getProperty("driver");
//4.注册驱动
Class.forName(driver);
} catch (IOException | ClassNotFoundException e) {
log.error("初始化jdbc连接失败!", e);
}
}
/**
* 获取连接
* @return 连接对象
*/
public static Connection getConnection() throws SQLException {
return DriverManager.getConnection(url, user, password);
}
/**
* 释放资源
* @param rs
* @param st
* @param conn
*/
public static void close(ResultSet rs, Statement st, Connection conn) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (st != null) {
try {
st.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
数据库操作类
/**
* MySQL 锁操作类(加锁+释放锁)
*/
@Slf4j
public class MySQLDistributedLockService {
private static Connection connection;
private static Statement statement;
private static ResultSet resultSet;
static{
try {
connection = JDBCUtils.getConnection();
statement = connection.createStatement();
resultSet = null;
} catch (SQLException e) {
log.error("数据库连接失败!");
}
}
/**
* 锁表 - 获取锁
* @param resource 资源
* @param description 锁描述
* @return 是否操作成功
*/
public static boolean tryLock(int resource,String description){
String sql = "insert into database_lock (resource,description) values (" + resource + ", '" + description + "');";
//获取数据库连接
try {
int stat = statement.executeUpdate(sql);
return stat == 1;
} catch (SQLException e) {
return false;
}
}
/**
* 锁表-释放锁
* @return
*/
public static boolean releaseLock(int resource) throws SQLException {
String sql = "delete from database_lock where resource = " + resource;
//获取数据库连接
int stat = statement.executeUpdate(sql);
return stat == 1;
}
/**
* 关闭连接
*/
public static void close(){
log.info("当前线程: " + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] +
",关闭了数据库连接!");
JDBCUtils.close(resultSet,statement,connection);
}
}
LockTable
/**
* mysql分布式锁
* 执行流程: 多进程抢占数据库某个资源,然后执行业务,执行完释放资源
* 锁机制: 单一进程获取锁时,则其他进程提交失败
*/
@Slf4j
public class LockTable extends Thread {
@Override
public void run() {
super.run();
//获取Java虚拟机的进程ID
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
try{
while(true){
log.info("当前进程PID:" + pid + ",尝试获取锁资源!");
if(MySQLDistributedLockService.tryLock(1,"测试锁")){
log.info("当前进程PID:" + pid + ",获取锁资源成功!");
//sleep模拟业务处理过程
log.info("开始处理业务!");
Thread.sleep(10*1000);
log.info("业务处理完成!");
MySQLDistributedLockService.releaseLock(1);
log.info("当前进程PID: " + pid + ",释放了锁资源!");
break;
}else{
log.info("当前进程PID: " + pid + ",获取锁资源失败!");
Thread.sleep(2000);
}
}
}catch (Exception e){
log.error("抢占锁发生错误!",e);
}finally {
MySQLDistributedLockService.close();
}
}
// 程序入口
public static void main(String[] args) {
new LockTable().start();
}
}
测试
运行时开启并行执行选项,每次运行三个或三个以上进程. Allow parallel run 运行并行执行
注意事项:
- 该锁为非阻塞的
- 当某进程持有锁并且挂死时候会造成资源一直不释放的情况,造成死锁,因此需要维护一个定时清理任务去清理持有过久的锁
- 要注意数据库的单点问题,最好设置备库,进一步提高可靠性
- 该锁为非可重入锁,如果要设置成可重入锁需要添加数据库字段记录持有该锁的设备信息以及加锁次数
方式二:基于乐观锁
- 每次执行业务前首先进行数据库查询,查询当前的需要修改的资源值(或版本号)。
- 进行资源的修改操作,并且修改前进行资源(或版本号)的比对操作,比较此时数据库中的值是否和上一步查询结果相同。
- 查询结果相同则修改对应资源值,不同则回到第一步。
例子:数据库中设定某商品基本信息(名为外科口罩,数量为10),多进程对该商品进行抢购,当商品数量为0时结束抢购。
代码实现
/**
* 乐观锁-获取资源
* @param id 资源ID
* @return result
*/
public static ResultSet getGoodCount(int id) throws SQLException {
String sql = "select * from database_lock_2 where id = " + id;
//查询数据
resultSet = statement.executeQuery(sql);
return resultSet;
}
/**
* 乐观锁-修改资源
* @param id 资源ID
* @param goodCount 资源
* @return 修改状态
*/
public static boolean setGoodCount(int id, int goodCount) throws SQLException {
String sql = "update database_lock_2 set good_count = good_count - 1 where id =" + id +" and good_count = " + goodCount;
int stat = statement.executeUpdate(sql);
return stat == 1;
}
/**
* 乐观锁-开启事务自动提交
*/
public static void AutoCommit(){
try {
connection.setAutoCommit(true);
} catch (SQLException e) {
log.error("开启自动提交!",e);
}
}
OptimisticLock测试类
/**
* mysql分布式锁-乐观锁
* 执行流程: 多进程抢购同一商品,每次抢购成功商品数量-1,商品数据量为0时退出
* 锁机制: 单一进程获取锁时,则其他进程提交失败
*/
@Slf4j
public class OptimisticLock extends Thread{
@Override
public void run() {
super.run();
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
ResultSet resultSet = null;
String goodName = null;
int goodCount = 0;
try {
while(true){
log.info("当前线程:" + pid + ",开始抢购商品!");
//获取当前商品信息
resultSet = MySQLDistributedLockService.getGoodCount(1);
while (resultSet.next()){
goodName = resultSet.getString("good_name");
goodCount = resultSet.getInt("good_count");
}
log.info("获取库存成功,当前商品名为:" + goodName + ",当前库存剩余量为:" + goodCount);
//模拟执行业务操作
Thread.sleep(2*3000);
if(0 == goodCount){
log.info("抢购失败,当前库存为0! ");
break;
}
//修改库存信息,库存量-1
if(MySQLDistributedLockService.setGoodCount(1,goodCount)){
log.info("当前线程:" + pid + " 抢购商品:" + goodName + "成功,剩余库存为:" + (goodCount -1));
//模拟延迟,防止锁每次被同一进程获取
Thread.sleep(2 * 1000);
}else{
log.error("抢购商品:" + goodName +"失败,商品数量已被修改");
}
}
}catch (Exception e){
log.error("抢购商品发生错误!",e);
}finally {
if(resultSet != null){
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
log.error("关闭Result失败!" , e);
}
}
MySQLDistributedLockService.close();
}
}
public static void main(String[] args) {
new OptimisticLock().start();
}
}
代码测试
开启三个进程,查看执行情况
注意事项:
- 该锁为非阻塞的
- 该锁对于业务具有侵入式,如果设置版本号校验则增加的额外的字段,增加了数据库冗余
- 当并发量过高时会有大量请求访问数据库的某行记录,对数据库造成很大的写压力
- 因此乐观锁适用于并发量不高,并且写操作不频繁的场景
方式三:悲观锁实现方式(利用事务加上行/表锁)
实现思路
- 关闭jdbc连接自动commit属性
- 每次执行业务前先使用查询语句后接for update表示锁定该行数据(注意查询条件如果未命中主键或索引,此时将会从行锁变为表锁)
- 执行业务流程修改表资源
- 执行commit操作
代码实现
MySQLDistributedLockService
/**
* 悲观锁-获取资源
* @param id 资源ID
* @return result
*/
public static ResultSet getGoodCount2(int id) throws SQLException {
String sql = "select * from database_lock_2 where id = " + id + "for update";
//查询数据
resultSet = statement.executeQuery(sql);
return resultSet;
}
/**
* 悲观锁-修改资源
* @param id 资源ID
* @return 修改状态
*/
public static boolean setGoodCount2(int id) throws SQLException {
String sql = "update database_lock_2 set good_count = good_count - 1 where id =" + id;
int stat = statement.executeUpdate(sql);
return stat == 1;
}
/**
* 悲观锁-关闭事务自动提交
*/
public static void closeAutoCommit(){
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
log.error("关闭自动提交失败!",e);
}
}
/**
* 悲观锁-提交事务
*/
public static void commit(String pid,String goodName,int goodCount) throws SQLException {
connection.commit();
log.info("当前线程:" + pid + "抢购商品: " + goodName + "成功,剩余库存为:" + (goodCount-1));
}
/**
* 悲观锁-回滚
*/
public static void rollBack() throws SQLException {
connection.rollback();
}
PessimisticLock
/**
* mysql 分布式锁-悲观锁
* 执行流程:多个进程抢占同一个商品,执行业务完毕则通过connection.commit() 释放锁
* 锁机制:单一进程获取锁时,则其他进程将阻塞等待
*/
@Slf4j
public class PessimisticLock extends Thread {
@Override
public void run() {
super.run();
ResultSet resultSet = null;
String goodName = null;
int goodCount = 0;
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
//关闭自动提交
MySQLDistributedLockService.closeAutoCommit();
try{
while(true){
log.info("当前线程:" + pid + "");
//获取库存
resultSet = MySQLDistributedLockService.getGoodCount2(1);
while (resultSet.next()) {
goodName = resultSet.getString("good_name");
goodCount = resultSet.getInt("good_count");
}
log.info("获取库存成功,当前商品名称为:" + goodName + ",当前库存剩余量为:" + goodCount);
// 模拟执行业务事件
Thread.sleep(2 * 1000);
if (0 == goodCount) {
log.info("抢购失败,当前库存为0!");
break;
}
// 抢购商品
if (MySQLDistributedLockService.setGoodCount2(1)) {
// 模拟延时,防止锁每次被同一进程获取
MySQLDistributedLockService.commit(pid, goodName, goodCount);
Thread.sleep(2 * 1000);
} else {
log.error("抢购商品:" + goodName + "失败!");
}
}
}catch (Exception e){
//抢购失败
log.error("抢购商品发生错误!",e);
try {
MySQLDistributedLockService.rollBack();
} catch (SQLException ex) {
log.error("回滚失败! ",e);
}
}finally {
if(resultSet != null){
try {
resultSet.close();
} catch (SQLException e) {
log.error("Result关闭失败!",e);
}
}
MySQLDistributedLockService.close();
}
}
public static void main(String[] args) {
new PessimisticLock().start();
}
}
测试结果
注意事项:
- 该锁为阻塞锁
- 每次请求存在额外加锁的开销
- 在并发量很高的情况下会造成系统中存在大量阻塞的请求,影响系统的可用性
- 因此悲观锁适用于并发量不高,读操作不频繁的写场景
总结:
- 在实际使用中,由于受到性能以及稳定性约束,对于关系型数据库实现的分布式锁一般很少被用到。但是对于一些并发量不高、系统仅提供给内部人员使用的单一业务场景可以考虑使用关系型数据库分布式锁,因为其复杂度较低,可靠性也能够得到保证。