分布式锁的实现
- 操作共享资源:例如操作数据库中的唯一用户数据、订单系统、优惠券系统、积分系统等,这些系统需要修改用户数据,而多个系统可能同时修改同一份数据,这时就需要使用分布式锁来控制访问,防止数据不一致。
- 在电商系统中,如果多个用户同时购买同一商品,可能会出现超卖现象。通过使用分布式锁,可以确保在同一时间只有一个用户能够进行购买操作,从而避免库存超卖的问题。
- 防止重复调用第三方接口:在分布式系统中,如果多个节点同时调用同一个第三方接口,可能会导致接口调用失败或数据错误。使用分布式锁可以确保在同一时间只有一个节点进行接口调用,避免重复调用问题。
转自:https://lingkang.top/archives/lock333
Redis
当客户端需要获取锁时,向Redis发送SETNX命令,如果返回1,说明客户端获得了锁;如果返回0,则说明锁已被其他客户端占用。当客户端释放锁时,使用DEL命令删除对应的键即可。
1、Maven中添加依赖
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.27</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.3</version>
</dependency>
2、启动好Redis
window可以在这里下载一个:https://gitee.com/lingkang_top/redis-window
3、编写java代码
package redis;
import cn.hutool.core.thread.ThreadUtil;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
/**
* @author lingkang
* @create by 2024/7/25 15:28
*/
public class Demo01 {
/**
* 假设是库存
*/
private static int number = 100;
public static void main(String[] args) {
// 提前初始化好redis连接
List<Jedis> redis = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Jedis jedis = new Jedis("localhost", 6379);
redis.add(jedis);
}
// 操作前先清空锁
redis.get(0).del("lock");
// 假设有10个线程进行锁操作
for (int i = 0; i < 10; i++) {
final Jedis jedis = redis.get(i);
new Thread(() -> {
// 每隔线程减扣 5次
for (int j = 0; j < 5; j++)
dd(jedis);
jedis.close();
}, "t-" + i).start();
}
ThreadUtil.sleep(15000);
// 应该输出 100-50=50
System.out.println("库存: " + number);
}
/**
* 业务处理
*/
private static void dd(Jedis jedis) {
for (; ; ) {
long lock = jedis.setnx("lock", "1");
if (lock == 1) {
// 给key设置一个过期时间,防止死锁
jedis.expire("lock", 20);
// 获取到锁
System.out.println("当前线程获得锁:" + Thread.currentThread().getName());
// 进行减扣库存等一系列操作....
number--;
// 假设处理业务延迟一下
ThreadUtil.sleep(200);
// 处理完毕要移除锁
jedis.del("lock");
break;
}
// 等待一下
ThreadUtil.sleep(200);
}
}
}
4、结果正确
基于数据库
基于数据库的分布式锁主要依赖于数据库的唯一索引或主键约束。具体实现时,当客户端需要获取锁时,向数据库中插入一条记录,该记录的唯一键表示锁。如果插入成功,说明客户端获得了锁;如果插入失败(如主键冲突),则说明锁已被其他客户端占用。当客户端释放锁时,删除该记录即可。
1、Maven中添加依赖
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.27</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>9.0.0</version>
</dependency>
2、添加表格
CREATE TABLE `mylock` (
`id` char(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`create_time` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
3、编写java代码
package mysql;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.thread.ThreadUtil;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author lingkang
* @create by 2024/7/25 16:58
*/
public class Demo01 {
/**
* 假设是库存
*/
private static int number = 100;
public static void main(String[] args) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
String URL = "jdbc:mysql://localhost:3306/mylock?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai";
// 提前初始化好redis连接
List<Connection> connectionList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Connection conn = DriverManager.getConnection(URL, "root", "123456");
connectionList.add(conn);
}
// 开始前,先将表表的锁数据清理
connectionList.get(0).prepareStatement("delete from mylock where id='1'").executeUpdate();
// 假设有10个线程进行锁操作
for (int i = 0; i < 10; i++) {
final Connection conn = connectionList.get(i);
new Thread(() -> {
// 每隔线程减扣 5次
for (int j = 0; j < 5; j++) {
try {
dd(conn);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
IoUtil.close(conn);
}, "t-" + i).start();
}
ThreadUtil.sleep(15000);
// 应该输出 100-50=50
System.out.println("库存: " + number);
}
/**
* 业务处理
*/
private static void dd(Connection conn) throws Exception {
for (; ; ) {
int success = 0;
try {
PreparedStatement statement = conn.prepareStatement("insert into mylock(id,create_time) values('1',now())");
success = statement.executeUpdate();
} catch (Exception e) {
}
if (success == 1) {
// 获取到锁
System.out.println("当前线程获得锁:" + Thread.currentThread().getName());
// 进行减扣库存等一系列操作....
number--;
// 假设处理业务延迟一下
ThreadUtil.sleep(200);
// 处理完毕要移除锁
PreparedStatement statement = conn.prepareStatement("delete from mylock where id='1'");
statement.executeUpdate();
statement.close();
break;
} else {
ResultSet query = conn.prepareStatement("select create_time from mylock where id='1'").executeQuery();
if (query.next()) {
Date date = query.getDate(1);
// 防止死锁,超过20秒删除
if (date.getTime() + 20000L > System.currentTimeMillis()) {
conn.prepareStatement(
"delete from mylock where id='1' and create_time='" + date.getTime() + "'"
).executeUpdate();
// 等待一下
ThreadUtil.sleep(1000);
}
}
query.close();
}
// 等待一下
ThreadUtil.sleep(200);
}
}
}
4、执行结果正确
Zookeeper
使用zookeeper有多钟方案,如下:
- 每个客户端在zookeeper的一个指定目录下创建一个临时节点,通过判断当前目录下的所有节点与自己的节点的顺序,就可以确定自己是否获取到锁。
- 每个客户端在zookeeper的一个指定目录下创建一个有序节点,通过判断自己节点在所有子节点中的顺序,就可以确定自己是否获取到锁。
第一种方案与数据库类似
Zookeeper 方案一
每个客户端在zookeeper的一个指定目录下创建一个临时节点,通过判断当前目录下的所有节点与自己的节点的顺序,就可以确定自己是否获取到锁。
1、Maven中添加依赖
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.27</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.2</version>
</dependency>
2、找个Zookeeper启动好
3、java实现代码
package zk;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.thread.ThreadUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* @author lingkang
* @create by 2024/7/25 18:17
*/
public class Demo01 {
/**
* 假设是库存
*/
private static int number = 100;
private static final String lockPath = "/lock";
public static void main(String[] args) throws Exception {
// 提前初始化好redis连接
List<ZooKeeper> zooKeeperList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
ZooKeeper zooKeeper = new ZooKeeper("10.8.4.191:2181", 20000, null);
// 用来做一个初始化调用
zooKeeper.exists(lockPath, false);
zooKeeperList.add(zooKeeper);
}
// 操作前先清空锁
ZooKeeper zk = zooKeeperList.get(0);
Stat exists = zk.exists(lockPath, false);
if (exists != null) {
zk.delete(lockPath, exists.getVersion());
}
// 假设有2个线程进行锁操作
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
final ZooKeeper zooKeeper = zooKeeperList.get(i);
Thread thread = new Thread(() -> {
// 每隔线程减扣 5次
for (int j = 0; j < 5; j++) {
try {
dd(zooKeeper);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
IoUtil.close(zooKeeper);
// 注意此线程id,此线程id用于模拟全局唯一业务处理id,实际开发可以用具体业务id替代,必须全局唯一
}, "t-" + i);
thread.start();
threadList.add(thread);
}
for (Thread thread : threadList)
thread.join();
ThreadUtil.sleep(1000);
// 应该输出 100-5*2 --> 90
System.out.println("库存: " + number);
}
/**
* 业务处理
*/
private static void dd(ZooKeeper zooKeeper) throws Exception {
String threadName = Thread.currentThread().getName();
for (; ; ) {
Stat stat = zooKeeper.exists(lockPath, false);
if (stat != null) {
byte[] data = zooKeeper.getData(lockPath, null, stat);
if (!threadName.equals(new String(data))) {
// 说明已经被其他服务获取锁了,等待一下跳过此次锁创建
ThreadUtil.sleep(300);// 等待一下
continue;
}
}
try {
// 创建者将会获得锁,注意:注意此线程id,此线程id用于模拟全局唯一业务处理id,
// 实际开发可以用具体业务id替代,必须全局唯一
zooKeeper.create(lockPath, threadName.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
} catch (Exception e) {
// 创建锁失败
}
// 等待一下
ThreadUtil.sleep(200);
stat = zooKeeper.exists(lockPath, false);
// 如果是当前session创建的,就获得锁
if (stat == null) {
continue;
} else {
byte[] data = zooKeeper.getData(lockPath, null, stat);
if (!threadName.equals(new String(data))) {
// 说明已经被其他服务获取锁了,等待一下跳过此次锁创建
ThreadUtil.sleep(200);// 等待一下
continue;
}
}
// 获取到锁
System.out.println("当前线程获得锁:" + threadName);
// 进行减扣库存等一系列操作....
number--;
// 假设处理业务延迟一下
ThreadUtil.sleep(200);
// 处理完毕要移除锁
zooKeeper.delete(lockPath, stat.getVersion());
// 处理完成
break;
}
}
}
4、执行结果正确
Zookeeper 方案二(推荐)
每个客户端在zookeeper的一个指定目录下创建一个有序节点,通过判断自己节点在所有子节点中的顺序,就可以确定自己是否获取到锁。
1、Maven中添加依赖
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.27</version>
</dependency>
<!-- 由于我的zk版本是3.4.8 版本较低,所以使用低版本curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.13.0</version>
</dependency>
2、找个Zookeeper启动好
v3.4.8
3、java实现代码
package zk;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.thread.ThreadUtil;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.List;
/**
* @author lingkang
* @create by 2024/7/26 10:43
*/
public class Demo02 {
/**
* 假设是库存
*/
private static int number = 100;
private static final String lockPath = "/lock";
public static void main(String[] args) throws Exception {
// 提前初始化好zk连接
List<CuratorFramework> curatorFrameworks = new ArrayList<>();
for (int i = 0; i < 2; i++) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.8.4.191:2181", retryPolicy);
client.start();
curatorFrameworks.add(client);
}
// 操作前先清空锁
CuratorFramework curator = curatorFrameworks.get(0);
Stat stat = curator.checkExists().forPath(lockPath);
if (stat != null)
curator.delete().withVersion(stat.getVersion()).forPath(lockPath);
// 假设有JVM进行锁操作
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
final CuratorFramework curatorFramework = curatorFrameworks.get(i);
Thread thread = new Thread(() -> {
// 每个线程减扣 5次
for (int j = 0; j < 5; j++) {
try {
dd(curatorFramework);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, "t-" + i);
threadList.add(thread);
thread.start();
}
for (Thread thread : threadList)
thread.join();
ThreadUtil.sleep(1000);
for (CuratorFramework curatorFramework : curatorFrameworks)
IoUtil.close(curatorFramework);
System.out.println("-------------------------------------------------------------------");
// 应该输出 100-5*2 --> 90
System.out.println("库存: " + number);
}
/**
* 业务处理
*/
private static void dd(CuratorFramework curatorFramework) {
String threadName = Thread.currentThread().getName();
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
try {
lock.acquire();
// 获取到锁,执行业务逻辑
System.out.println("当前线程获得锁:" + threadName);
// 进行减扣库存等一系列操作....
number--;
// 假设处理业务延迟一下
ThreadUtil.sleep(200);
} catch (Exception e) {
// 处理异常
e.printStackTrace();
} finally {
try {
lock.release();
// 释放锁
} catch (Exception e) {
// 处理释放锁时的异常
e.printStackTrace();
}
}
}
}