RedisLockRegistry分布式锁
介绍
RedisLockRegistry是Spring框架提供的一种分布式锁机制,它基于Redis来实现对共享资源的保护,防止多个进程同时对同一资源进行修改,从而避免数据不一致或其他问题
基本原理
RedisLockRegistry通过Redis的原子操作来实现分布式锁。其主要原理包括:
- 独占性:同一时刻只能被一个客户端持有,确保互斥性。
- 健壮性:通过设置锁的过期时间来防止死锁,确保锁能够在一定时间内自动释放,避免资源长时间被占用
- 对称性:加锁和解锁必须由同一客户端执行,防止非法释放他人持有的锁
- 高可用性:当部分节点故障时,不影响分布式锁服务的稳定性
核心特点
-
互斥性:同一时刻只有一个客户端能持有锁
-
可重入性:同一个客户端可以多次获取同一个锁
-
超时机制:防止死锁,锁会自动释放
-
高可用:基于 Redis,性能高且可靠
应用场景
-
防止重复处理:如定时任务在集群环境下的执行控制
-
资源争用:如库存扣减、秒杀系统
-
关键业务流程:如支付订单处理
-
分布式系统协调:如主节点选举
使用方法
1.添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
2.配置分布式锁
@Configuration
public class RedisLockConfig {
/**
* 锁过期毫秒数
*/
private static final long EXPIRE_AFTER_MILLS = 600000L;
@Bean(destroyMethod = "destroy")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "redis-lock", EXPIRE_AFTER_MILLS);
}
}
默认有效时间是60秒,如果是默认的时间,则修改为
@Configuration
public class RedisLockConfig {
/**
* 锁过期毫秒数
*/
private static final long EXPIRE_AFTER_MILLS = 600000L;
@Bean(destroyMethod = "destroy")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "redis-lock");
}
}
3.基本使用示例
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@Service
public class OrderService {
private final RedisLockRegistry redisLockRegistry;
public OrderService(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
public void processOrder(String orderId) {
// 获取锁对象,orderId作为锁的key
Lock lock = redisLockRegistry.obtain(orderId);
try {
// 尝试获取锁,等待最多3秒,锁持有30秒(与配置一致)
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// 获取锁成功,执行业务逻辑
System.out.println("处理订单: " + orderId + ", 线程: " + Thread.currentThread().getName());
// 模拟业务处理
Thread.sleep(1000);
} finally {
// 释放锁
lock.unlock();
}
} else {
// 获取锁失败
System.out.println("获取锁失败,订单: " + orderId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("获取锁被中断");
}
}
}
4.高级用法-可重入锁
public void reentrantMethod(String resourceId) {
Lock lock = redisLockRegistry.obtain(resourceId);
try {
if (lock.tryLock()) {
try {
System.out.println("外层方法获取锁");
// 调用另一个也需要相同锁的方法
nestedMethod(resourceId);
} finally {
lock.unlock();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void nestedMethod(String resourceId) {
Lock lock = redisLockRegistry.obtain(resourceId);
try {
if (lock.tryLock()) {
try {
System.out.println("内层方法获取锁");
// 业务逻辑
} finally {
lock.unlock();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
5.定时任务分布式锁示例
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ScheduledTasks {
private final RedisLockRegistry redisLockRegistry;
public ScheduledTasks(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
@Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次
public void distributedCronJob() {
Lock lock = redisLockRegistry.obtain("report-generation");
try {
if (lock.tryLock(10, TimeUnit.SECONDS)) {
try {
// 生成报表的业务逻辑
System.out.println("开始生成报表...");
Thread.sleep(5000); // 模拟耗时操作
System.out.println("报表生成完成");
} finally {
lock.unlock();
}
} else {
System.out.println("其他节点正在生成报表,本节点跳过");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
RedisLockRegistry 分布式锁工具类
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* Redis分布式锁工具类
*/
@Component
public class RedisDistributedLock {
private final RedisLockRegistry redisLockRegistry;
public RedisDistributedLock(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
/**
* 尝试获取锁(立即返回)
*
* @param lockKey 锁的key
* @return 是否获取成功
*/
public boolean tryLock(String lockKey) {
Lock lock = redisLockRegistry.obtain(lockKey);
return lock.tryLock();
}
/**
* 尝试获取锁(带超时时间)
*
* @param lockKey 锁的key
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, long timeout, TimeUnit unit) {
Lock lock = redisLockRegistry.obtain(lockKey);
try {
return lock.tryLock(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 释放锁
*
* @param lockKey 锁的key
*/
public void unlock(String lockKey) {
Lock lock = redisLockRegistry.obtain(lockKey);
lock.unlock();
}
/**
* 执行带锁的业务逻辑(立即返回)
*
* @param lockKey 锁的key
* @param processor 业务处理器
* @param <T> 返回值类型
* @return 业务处理结果,如果获取锁失败返回null
*/
public <T> T executeWithLock(String lockKey, LockProcessor<T> processor) {
if (tryLock(lockKey)) {
try {
return processor.process();
} finally {
unlock(lockKey);
}
}
return null;
}
/**
* 执行带锁的业务逻辑(带超时时间)
*
* @param lockKey 锁的key
* @param timeout 超时时间
* @param unit 时间单位
* @param processor 业务处理器
* @param <T> 返回值类型
* @return 业务处理结果,如果获取锁失败返回null
*/
public <T> T executeWithLock(String lockKey, long timeout, TimeUnit unit, LockProcessor<T> processor) {
if (tryLock(lockKey, timeout, unit)) {
try {
return processor.process();
} finally {
unlock(lockKey);
}
}
return null;
}
/**
* 执行带锁的业务逻辑(无返回值,立即返回)
*
* @param lockKey 锁的key
* @param processor 无返回值的业务处理器
* @return 是否成功获取锁并执行
*/
public boolean executeWithLock(String lockKey, VoidLockProcessor processor) {
if (tryLock(lockKey)) {
try {
processor.process();
return true;
} finally {
unlock(lockKey);
}
}
return false;
}
/**
* 执行带锁的业务逻辑(无返回值,带超时时间)
*
* @param lockKey 锁的key
* @param timeout 超时时间
* @param unit 时间单位
* @param processor 无返回值的业务处理器
* @return 是否成功获取锁并执行
*/
public boolean executeWithLock(String lockKey, long timeout, TimeUnit unit, VoidLockProcessor processor) {
if (tryLock(lockKey, timeout, unit)) {
try {
processor.process();
return true;
} finally {
unlock(lockKey);
}
}
return false;
}
/**
* 业务处理器接口(有返回值)
*/
@FunctionalInterface
public interface LockProcessor<T> {
T process();
}
/**
* 业务处理器接口(无返回值)
*/
@FunctionalInterface
public interface VoidLockProcessor {
void process();
}
}
使用示例
1.基本用法
@Service
public class OrderService {
private final RedisDistributedLock redisDistributedLock;
public OrderService(RedisDistributedLock redisDistributedLock) {
this.redisDistributedLock = redisDistributedLock;
}
public void processOrder(String orderId) {
if (redisDistributedLock.tryLock(orderId, 3, TimeUnit.SECONDS)) {
try {
// 处理订单业务逻辑
System.out.println("处理订单: " + orderId);
} finally {
redisDistributedLock.unlock(orderId);
}
} else {
System.out.println("获取锁失败,订单: " + orderId);
}
}
}
2.使用函数式接口(推荐)
@Service
public class InventoryService {
private final RedisDistributedLock redisDistributedLock;
public InventoryService(RedisDistributedLock redisDistributedLock) {
this.redisDistributedLock = redisDistributedLock;
}
public boolean deductStock(String productId, int quantity) {
return redisDistributedLock.executeWithLock(
"stock:" + productId,
2,
TimeUnit.SECONDS,
() -> {
// 在这里写扣减库存的业务逻辑
System.out.println("扣减商品库存: " + productId + ", 数量: " + quantity);
return true; // 返回业务处理结果
}
) != null;
}
}
3.无返回值的使用方式
public void generateReport() {
boolean executed = redisDistributedLock.executeWithLock(
"report-generation",
5,
TimeUnit.SECONDS,
() -> {
// 生成报表的业务逻辑
System.out.println("开始生成报表...");
Thread.sleep(3000);
System.out.println("报表生成完成");
}
);
if (!executed) {
System.out.println("其他节点正在生成报表,本次跳过");
}
}
注意:
- 确保锁的key具有唯一性,不同业务使用不同的key前缀
注意事项
-
锁过期时间:设置合理的过期时间,太短可能导致业务未完成锁就释放,太长可能导致其他客户端等待过久
-
异常处理:确保锁在finally块中释放,避免死锁
-
Redis可用性:Redis集群的高可用配置很重要,避免单点故障
-
时钟同步:确保所有使用锁的服务器的系统时钟同步
锁粒度:根据业务需求选择合适的锁粒度,太粗影响并发,太细增加复杂度
参考博客
分布式锁之RedisLockRegistry