目录
1.ab工具(压测工具)的安装
2.前置
3.优化
3.1synchronized修饰代码方法/代码块
3.2分布式锁事务的解决方案
3.3Redis实现锁问题
3.3.1 set ex方式
3.3.2 set ex方式+设置过期时间
3.3.3单redis结点的解决UUID和LUA脚本
3.3.4redission解决分布式锁
4.Redission解决分布式锁
4.1添加依赖
4.2相关配置类
4.3注解实现分布式锁
前置补充:
1.并发和事务区别:
并发的理解:Java 并发问题、产生的原因及解决方法 - 掘金 (juejin.cn)
在系统接受请求,先做并发处理,再事务处理。
每个人对资源的获取都相当于在一线程中,如果大量请求同时发生会导致磁盘资源的过度抢占,做不了别的事而导致宕机或变慢。然后在数据库的多表操作要考虑事务。
补充:修改mysql默认隔离级别
SELECT @@transaction_isolation; #8.0查看数据库事务
SET SESSION TRANSACTION ISOLATION LEVEL <isolation_level>; #设置事务隔离级别
isolation_level:
#读取未提交
READ UNCOMMITTED
#允许读取已提交的数据
READ COMMITTED
#可重复读 8.0默认数据库事务
REPEATABLE READ
#可串行化 最严格的
SERIALIZABLE
2.事务的锁和并发的锁区别:
事务的锁,在事务内部进行,保障事务的原子性、一致性、隔离性、持久性。当事务提交或回滚就会释放。
并发的锁:防止cpu切换时候指令重排,保障多个并发操作同时进行数据的一致性和完整性。并发加的锁在整个变更发操作期间都有效,直到手动释放或添加事务结束。
1.ab工具(压测工具)的安装
yum install -y httpd-tools
语法: ab n(一次发送的请求数量) -c(请求的并发数) 访问路径
例子:5000个请求,100的并发 ps:注意关闭本地windows防火墙
ab -n 5000 -c 100 http://192.168.200.1:8206/admin/product/test/testLock
set key value nx|xx ex|px
nx:表示不存在则进行操作
xx:存在再进行操作
ex:表示过期时间,秒
px:过期时间 毫秒
2.前置
提前再redis设置num=0;
ab -n 5000 -c 100 http://192.168.34.93:8206/admin/product/test/testLock
代码片段:
3.优化
3.1synchronized修饰代码方法/代码块
此方法只有在单个服务的时候有效,如果分布式事务时,会导致失效
3.2分布式锁事务的解决方案
分布式锁的关键是:多线程共享的内存标记(锁)
三种:
1.基于数据库的分布式锁
2.基于缓存(redis等) 性能最高
3.基于zookeeper实现 最可靠
补充:数据库实现分布式锁过程(从性能上不考虑)
3.3Redis实现锁问题
使用需注意点:
1.多线程可见: 多线程可见
2.死锁的情况:要保障锁的释放
3.排他:同一时刻,只能由一个进程获取锁
4.高可用:避免服务宕机(redis集群搭建:1.主从复制 2.哨兵3.cluster集群)
3.3.1 set ex方式
通过 set ex key value 如果该字段不存在,判断为真则进行业务代码,如果存在则再次获取,直到获取锁为止。但是这种情况容易出现死锁的问题,如果发生异常,流程被打断了,就会发生死锁问题。
public void testLock() {
//0.先尝试获取锁 setnx key val
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
redisTemplate.delete("lock");
}else{
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.3.2 set ex方式+设置过期时间
设置key的过期时间有两种方式
1.expire key timeout 此方法设置会在异常之后,永远不执行,
2.setex key timeout value ;开始就对lock设置过期时间 ps:set lock value ex 10 nx
情况:
1.设置锁的过期时间为3秒,但业务的执行实现为7秒。当锁的时间过期剩下4秒则有新的线程执行。
2.当业务执行完了,开始释放锁,但在他的前一秒刚有一个线程进入池子中,加锁进行操作。此时锁也被删掉了。没有拦住。
3.3.3单redis结点的解决UUID和LUA脚本
1.优化UUID防止误删:
public void testLock() {
//设置uuid
String uuid = UUID.randomUUID().toString().replace("-","");
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
if (flag) {
String value = this.redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(value)) {
return;
}
int num = Integer.parseInt(value);
this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
while (uuid.equals(this.redisTemplate.opsForValue().get("lock"))) {
redisTemplate.delete("lock");
}
} else {
try {
Thread.sleep(10);
this.testLock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
使用uuid防止误删锁后还有问题:
1.还是会发生多个线程获取到资源(时间过期释放锁),实现锁的续期。
守护线程---->expire key timeout; set key value ex timeout nx;
代码:
Thread thread = new Thread(() -> {
this.redisTemplate.expire("lock", 3, TimeUnit.SECONDS);
});
thread.setDaemon(true);
thread.start();
2.优化LUA脚本保证删除的原子性
SET — Redis 命令参考
public void testLock() {
String uuid = UUID.randomUUID().toString().replace("-", "");
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
if (flag) {
String value = this.redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(value)) {
return;
}
int num = Integer.parseInt(value);
this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
redisScript.setScriptText(script);
//设置响应类型
redisScript.setResultType(Long.class);
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
} else {
try {
Thread.sleep(10);
this.testLock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
3.3.4redission解决分布式锁
以上方法适合redis的单结点适合解决,但如果redis搭建集群则不能解决
为什么?
redis集群中,分片和结点之间的复制,使用lua脚本无法确保原子性操作,当使用Lua脚本时候,他将在一个结点上执行,而数据可能被分配在多个结点,在执行脚本的期间,节点通信不一致。所以lua脚本无法在集群中锁住资源
解决方案:
redisson-redLock:大部分结点加锁成功,我就判断加锁成功[过半机制]
为了满足分布式锁可用:
1.互斥性:任意时刻,只能由一个客户持有锁
2.不会发生死锁,即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保障其他用户加锁
3.加锁和解锁必须同一个用户
4.加锁和解锁必须有原子性
4.Redission解决分布式锁
Redisson是一个在Redis的基础上实现的Java驻内存数据网格,提供多种数据类型,促进使用者对Redis的关注分离。
官网地址:Home · redisson/redisson Wiki · GitHub
github:GitHub - redisson/redisson: Redisson - Easy Redis Java client with features of In-Memory Data Grid. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Publish / Subscribe, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, MyBatis, RPC, local cache ...
4.1添加依赖
<!-- Redisson -->
<dependency>
<groupId>org.Redisson</groupId>
<artifactId>Redisson</artifactId>
<version>3.15.3</version>
</dependency>
4.2相关配置类
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
private String host;
private String addresses;
private String password;
private String port;
private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize = 10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";
/**
* 自动装配
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
if (StringUtils.isEmpty(host)) {
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
//redis://127.0.0.1:7181
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
if (!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
config.useClusterServers().addNodeAddress().addNodeAddress();
/*
集群版:
config.useClusterServers().addNodeAddress().addNodeAddress();*/
// RedissonClient redisson = Redisson.create(config);
return Redisson.create(config);
}
}
4.3注解实现分布式锁
Core Technologies
添加注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface GmallCache {
/*
* 缓存数据前缀
* */
String prefex() default "cache:";
/*
缓存数据后缀
*/
String suffix() default ":info";
}
使用aop的方式环向切入
@Component
@Aspect
public class GmallCacheAspect {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* @param proceedingJoinPoint 能够获取请求之前的参数,请求的方法体,返回值等信息
* @return
*/
@Around("@annotation(com.atguigu.gmall.cache.GmallCache)")
@SneakyThrows
public Object cacheAspect(ProceedingJoinPoint proceedingJoinPoint) {
// 声明一个对象
Object obj = new Object();
/*
* 1.实现分布式锁的逻辑
* 获取到缓存的key;注解前缀+参数+注解后缀
* 获取到方法签名
* */
//获取签名
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
GmallCache annotation = signature.getMethod().getAnnotation(GmallCache.class);
//获取参数
Object[] args = proceedingJoinPoint.getArgs();
//获取前缀
String prefex = annotation.prefex();
//获取后缀
String suffix = annotation.suffix();
// 组成缓存的key
String skuKey = prefex + Arrays.asList(args) + suffix;
try {
obj = this.redisTemplate.opsForValue().get(skuKey);
if (obj == null) {
// 查询数据库,加一把锁
String lockKey = prefex + ":lock";
RLock lock = this.redissonClient.getLock(lockKey);
lock.lock();
try {
// 查询数据库
obj = proceedingJoinPoint.proceed(args);
if (obj == null) {
Object o = new Object();
// 放入缓存
this.redisTemplate.opsForValue().set(skuKey, o, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
return o;
}
this.redisTemplate.opsForValue().set(skuKey, obj, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
return obj;
} catch (Throwable e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} else {
return obj;
}
} catch (RuntimeException e) {
throw new RuntimeException(e);
}
return proceedingJoinPoint.proceed(args);
}
}