文章目录
- 1.Schedule(定时任务)
- 2.高并发线程安全的解决方案
- 2.1为什么不适用同步锁(Synchronized)?
- 2.2 Redis的分布式锁setnx
- 2.3 redisson分布式锁(看门狗机制)
- 2.3.1 Redis的分布式锁setnx产生的问题
- 2.3.2 redisson 实现锁续命
- 2.3.3 redisson 的代码实现
- 3.限流处理操作(并发量过大)
- 3.1 令牌桶的实现
- 3.1 自定义注解实现接口限流
1.Schedule(定时任务)
-
Spring的Schedule依赖包含在spring-boot-starter模块中,无需引入其他依赖。
-
在启动类增加注解(开启定时任务):@EnableScheduling
-
Cron表达式,当方法的执行时间超过任务调度频率时,调度器会在下个周期执行
-
注意: Spring的Schecule默认是单线程执行的,如果你定义了多个任务,那么他们将会被串行执行,会严重不满足你的预期。(如果要解决可以通过线程池的方式解决)
-
示例: 每秒获取Redis中存储的消息(消息队列确认机制的保证,可以通过Redis保证实现),将失败的消息重新投递
@Component
public class MySchedul {
@Autowired
RedisTemplate redisTemplate;
@Autowired
RabbitMessageUtils rabbitMessageUtils;
private static Logger logger = LoggerFactory.getLogger(MySchedul.class);
//该定时任务就是从redis 中获取到未投递成功的消息,并且进行重新投递
@Scheduled(cron = "* * * * * ?")
public void getMessageAndSender() {
logger.debug("===进入消息重新投递的定时任务===");
//进入定时任务,获取到redis中所有的信息
Set keys = redisTemplate.boundHashOps(RabbitKey.MESSAGE_KEY).keys();
//获取到redis中所有的消息的key,循环所有的key
if (keys != null && keys.size() > 0) {
for (Object id : keys) {
//通过该key获取到 消息本身
Object o = redisTemplate.boundHashOps(RabbitKey.MESSAGE_KEY).get(id.toString());
//将o转为message对象
MyMessage message = JSONObject.parseObject(JSON.toJSONString(o), MyMessage.class);
//判断message中的status是否为fail状态
if (message.getStatus().equals("fail")) {
//调用工具类重新投递
rabbitMessageUtils.sendMessage(message);
}
}
}
}
}
2.高并发线程安全的解决方案
2.1为什么不适用同步锁(Synchronized)?
- 对代码块添加同步锁(Synchronized)可以保证单个服务,在多线程状况下不能同时执行,如果项目部署集群,同一台服务器会部署多个服务,则同步锁会失效,多个服务间会出现并发
2.2 Redis的分布式锁setnx
“锁”就是一个存储在redis里的key-value对,key是把一组操作用字符串来形成唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个操作已经上锁,redis的分布式锁技术,使用的就是redis的setnx操作(在java中表现为setIfAbsent),如果有key 则加锁失败,如果没有则加锁成功,需要注意的是在加分布式锁时需要设置超时时间,用来防止死锁的产生
(1)加锁:
- “锁”就是一个存储在redis里的key-value对,key是把一组操作用字符串来形成唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个操作已经上锁。
- redis的分布式锁技术,使用的就是redis的setnx操作,如果有key 则加锁失败,如果没有则加锁成功
- setIfAbsent 是java中的方法
- setnx 是 redis命令中的方法
(2)解锁:
- 既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对
(3)阻塞、非阻塞:
- 阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。
- 非阻塞式的实现,若发现线程已经上锁,则直接返回。
(4)处理异常情况(防止死锁的产生):
- 假设当投资操作调用其他平台接口出现等待时,自然没有释放锁,这种情况下加入锁超时机制,用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁,进而防止死锁的产生
(5)示例 (以秒杀防止超卖问题为例)
//为了安全性继续加入redis的分布式锁 使用的就是redis的setnx操作,如果有key 则加锁失败,如果没有则加锁成功
//设置锁的失效时间,防止死锁产生,例如设置10s的过期时间,如果到期还未释放锁,则直接将该锁进行移除
//解决的是超卖问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1", 10, TimeUnit.SECONDS);
if(lock) {
logger.debug("当前用户抢到了该锁,进行扣减库存操作!");
//如果有库存,则需要进行扣减库存操作,注意要保证原子性 会返回当前减1后的剩余库存量
Long stockConut = redisTemplate.opsForValue().decrement(RedisKey.SECKILL_KEY + goodsId);
if (stockConut <= 0) {
//库存没有了 删除商品信息
redisTemplate.boundHashOps(RedisKey.SECKILL_KEY + redisKey).delete(goodsId.toString());
//移除掉对应的库存信息
redisTemplate.delete(RedisKey.SECKILL_KEY + goodsId);
return new BaseResp().FAIL("商品已被抢购完!");
} else {
//修改剩余库存量设置到 redis中
TbSeckillGoods tbSeckillGoods = JSONObject.parseObject(JSON.toJSONString(obj), TbSeckillGoods.class);
tbSeckillGoods.setStockCount(stockConut.intValue());
redisTemplate.boundHashOps(RedisKey.SECKILL_KEY + redisKey).put(goodsId.toString(), tbSeckillGoods);
//将数据库修改为0 但是这个逻辑是错误的,因为用户点击完抢购,如果没有付款,则该商品的数量是不能被真正的排除掉的。就会出现少卖的情况,所以真正的扣减数据库的库存,需要用户支付完成后,才能真正的扣钱
//在这里不能直接释放库存,因为用户一直不付钱,那么我们就需要进行 后续业务处理,如果用户30S不付款,则直接将该用户抢购的商品进行移除操作,预扣减的库存+1,可以通过RabbitMQ的死信队列来实现
//防止少卖问题
}
//执行完逻辑后要将该锁释放
logger.debug("抢购完成!");
//释放分布式锁
redisTemplate.delete("lock");
}else{
logger.debug("没有抢到锁,抢购失败!");
}
2.3 redisson分布式锁(看门狗机制)
2.3.1 Redis的分布式锁setnx产生的问题
- 使用Redis的分布式锁setnx,需要设置超时时间来防止死锁的产生,若某个线程抢到该锁但由于调用其他接口出现了等待,导致其业务执行总时间超过了setnx设置的超时时间,此时锁就会被释放,就会出现并发问题(例如秒杀时出现超卖问题)
2.3.2 redisson 实现锁续命
- redisson 通过 watch dog(看门狗机制),当业务执行超时,会进行锁续命
- redisson实现锁续命的原理: 设置了超时时间,如果当前业务代码超过了锁的失效时间,会进行锁续命,但不是无限次的续命,而是达到一定的次数、一定的时间,redisson会认为当前出现了死锁状况,会自动将该锁进行释放。(第一次默认延长30S时间)
2.3.3 redisson 的代码实现
(1)依赖
<!-- 加入redisson依赖,解决分布式锁问题-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.7</version>
</dependency>
(2)通过配置文件,将redisson交由Spring容器进行管理
- 需要结合redis
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.database}")
private Integer database;
//创建redission连接客户端 并且交给spring管理
@Bean
public RedissonClient createRedisson(){
//1.声明Redisson的配置
Config config = new Config();
//2.使用单机模式 其中也有集群模式
config.useSingleServer()
//redisson的连接 必须以redis://开头
.setAddress("redis://"+host+":"+port)
//设置使用的redis的库
.setDatabase(database);
//3.使用redisson的config创建出redissonClient客户端
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
(3)加锁
@RestController
@RequestMapping("/lock")
public class TestLockController {
@Autowired
RedisTemplate redisTemplate;
@Autowired
ShopRepository shopRepository;
//获取到配置文件中的Bean对象
@Autowired
RedissonClient redissonClient;
//1.加锁
//2.判断数据库的数量是否大于1
//3.如果大于等于1 则修改数量 进行减1操作
//4.释放锁资源
//使用自定义注解 ,我们需要定义该注解的作用是什么
@AccessLimit
@RequestMapping("/sekill/{id}")
@Transactional
public String testRedis(@PathVariable("id")Integer id) throws InterruptedException {
// 进行加锁,并且修改数据库的库存 模拟秒杀
//1.key value setIfAbsent 特点为:如果已经存在该key值,则不会设置成功
// Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
//2.使用redisson进行加锁的操作
RLock lock = redissonClient.getLock("lock");
//3.判断锁是否可用尝试加锁 并且设置1秒钟的失效时间 如果加锁成功则设置为true
try {
boolean b = lock.tryLock(1, TimeUnit.SECONDS);
if (b) {
System.out.println("获取锁开始执行");
//2.如果获取到这把锁,设置失效时间 失效时间设置为5秒时间
//redisTemplate.expire("lock", 1, TimeUnit.SECONDS);
//3.设置完成后执行自己的业务逻辑。从数据库查询该商品的数量是否大于等于1
Optional<TbShop> byId = shopRepository.findById(id);
if (byId.isPresent()) {
//在执行当前逻辑是,超过了默认的锁失效时间,那么就会出现超卖
//将当前的线程休眠两秒钟的时间 业务执行的时间越长,则超卖的商品就越多
//当使用到Ression后,休眠两秒钟的时间,默认设置的该锁的时间为1秒,那么redssion
//会自动的将该锁进行续命操作,防止出现并发操作。只有当前线程执行完成,才会讲该锁进行释放,不管
//线程执行的时间长短,当达到固定时间时,才会释放防止死锁的产生
Thread.sleep(2000);
if (byId.get().getNum() >= 1) {
shopRepository.updateNum(id);
}
}
//释放锁
//redisTemplate.delete("lock");
//使用Redisson释放锁.先获取到锁,才能进行解锁操作
lock.unlock();
}
}catch (Exception e){
return "秒杀失败!";
}
return "秒杀失败";
}
}
3.限流处理操作(并发量过大)
- 使用RabbitMQ设置队列的最大存储量,对请求进行限流
- 使用限流技术,比如令牌桶
3.1 令牌桶的实现
- 推荐使用google提供的guava工具包中的RateLimiter进行实现,其内部是基于令牌桶算法进行限流计算
(1)依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
(2)测试
public static void main(String[] args) {
//允许每秒通过三个请求
RateLimiter rateLimiter = RateLimiter.create(3.0);
//获取令牌,如果获取到则为true 否则为false
boolean b = rateLimiter.tryAcquire();
//开启线程池进行测试
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 10; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
//获取令牌桶中一个令牌,最多等待10秒
if (rateLimiter.tryAcquire(1, 10, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName()+" "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
});
}
executor.shutdown();
}
3.1 自定义注解实现接口限流
-
基于令牌桶
-
实现方式: 自定义注解 + AOP
(1)依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
(2)自定义限流注解
注解定义用
@interface
关键字修饰
@Target
注解,是专门用来限定某个自定义注解能够被应用在哪些Java元素上面的
@Retention
注解,翻译为持久力、保持力。即用来修饰自定义注解的生命周期
@Documented
注解,是被用来指定自定义注解是否能随着被定义的java文件生成到JavaDoc文档当中
@Inherited
注解,是指定某个自定义注解如果写在了父类的声明部分,那么子类的声明部分也能自动拥有该注解
@Inherited
@Documented
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessLimit {}
(2)自定义切面类,扫描加入了该注解的方法
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import com.qf.springbootrediscrud.pojo.BaseResp;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
/**
* 1.aop的业务使用:
* 1.1 日志记录
* 1.2 事务
* 1.3 扫描自定义注解,进行业务增强
*/
@Aspect
@Component
public class AccessLimitAop {
@Autowired
private HttpServletResponse httpServletResponse;
//声明令牌桶,每秒放行20个请求
private RateLimiter rateLimiter=RateLimiter.create(1.0);
private static final Logger logger = LoggerFactory.getLogger(AccessLimitAop.class);
//1.定义对于哪些方法进行增强。声明切点。我们现在需要的是扫描自定义注解
//excution 定义切点对于哪些方法进行增强
@Pointcut(value = "@annotation(com.qf.springbootrediscrud.annoration.AccessLimit)")
public void pt1(){}
//我们对接口进行增强方法,拦截到加了自定义注解的接口,首先来执行增强方法
//判断当前的请求是否可以放行,是否可以从令牌桶中获取到令牌
@Around("pt1()")
public Object arround(ProceedingJoinPoint proceedingJoinPoint){
//判断是否该请求获取到了令牌
logger.debug("进入了令牌桶的判断是否放行");
//尝试从令牌桶中获取令牌
boolean b = rateLimiter.tryAcquire();
String name = Thread.currentThread().getName();
Object proceed = null;
try {
//判断是否获取到该令牌,则继续执行业务逻辑
if (b){
logger.debug("===获取到令牌继续执行业务逻辑==当前线程:{}"+name);
proceed = proceedingJoinPoint.proceed();
return proceed;
}else{
logger.debug("===获取令牌失败!=="+name);
//没有获取到令牌需要返回 ,不再继续请求
String result = JSONObject.toJSONString(new BaseResp().FAIL("请求量过大,请稍后再试!"));
//通过httpservletResponse进行返回
httpServletResponse.setContentType("application/json");
httpServletResponse.setCharacterEncoding("utf-8");
//使用response进行返回数据
ServletOutputStream outputStream = httpServletResponse.getOutputStream();
//将错误结果进行返回操作
outputStream.write(result.getBytes(StandardCharsets.UTF_8));
outputStream.close();
}
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return proceed;
}
}
(3)在接口中加入@AccessLimit注解,进行限流
/**
* 开始秒杀
*/
//在秒杀中加入 限流注解 对当前接口进行保护
@AccessLimit
@RequestMapping("/add")
public BaseResp add(@RequestParam("id")Integer goodsId, @RequestParam("date")String date, HttpServletRequest request){
return sekillService.add(goodsId,date,request);
}