全局唯一ID
创建一个工具类
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIME_STAMP=1672531200L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS=32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIME_STAMP;
//2.生成序列号
//2.1获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接并返回
return timestamp<<COUNT_BITS|count;
}
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println("second= "+second);
}
}
对工具类进行测试
这里开启了300个线程,每个线程执行100次,最后自增长会达到30000.
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es= Executors.newFixedThreadPool(500);
@Test
void test() throws InterruptedException {
CountDownLatch latch=new CountDownLatch(300);
Runnable task=()->{
for(int i=0;i<100;i++){
long id = redisIdWorker.nextId("order");
System.out.println("id="+id);
}
latch.countDown();
};
long begin=System.currentTimeMillis();
for(int i=0;i<300;i++) {
es.submit(task);
}
latch.await();
long end=System.currentTimeMillis();
System.out.println("time = "+(end-begin));
}
}
结果如下,可以看见每个id都是不一样的。
然后看见redis里面的数据,的确是30000没错.
实现优惠券秒杀下单
代码实现(未考虑线程安全)
Controller层中
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private IVoucherOrderService voucherOrderService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
}
Service层中
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private ISeckillVoucherService seckillVoucherService;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否已经结束
if (voucher.getBeginTime().isBefore(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1) {
//库存不足
return Result.fail("库存不足");
}
//5.扣减库存
boolean success=seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id",voucherId)
.update();
if(!success){
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单Id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户Id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3代金券Id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单ID
return Result.ok(orderId);
}
}
超卖问题
真实场景下肯定是1秒会有成百上千的请求同时发送到后端的,使用Jmeter进行测试得到如下.
使用50个线程抢20个资源出现超卖情况
问题出现原因如下,多个线程读取到了同一个数据,然后进行修改。
解决方案分析
这里的版本号实际可以用库存数量作为版本号进行使用。
乐观锁解决超卖问题
//5.扣减库存
boolean success=seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock =stock - 1
.eq("voucher_id",voucherId).eq("stock",voucher.getStock()) //where id=? and stock =?
.update();
if(!success){
return Result.fail("库存不足!");
}
对这部分代码使用乐观锁优化之后再次50抢20结果如下,反而只抢了13张,很多都是报库存不足.
因为多个线程抢同一张票时只有一个线程可以成功.
缺点:
成功率太低。
优化修改
将stock=?改为stock>0即可
提示:这块还是基于了数据库的update语句自带行锁,自带互斥的,库存只是在原来的基础上进行--操作,所以可以保证不会超卖
//5.扣减库存
boolean success=seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock =stock - 1
.eq("voucher_id",voucherId)
.gt("stock",0)//where id=? and stock > 0
.update();
if(!success){
return Result.fail("库存不足!");
}
库存恰好为0
去看mysql进阶和java并发之后再看多表分段锁.
一人一单
新的业务流程如下所示
初版代码
这个代码在多线程时有并发安全问题,有可能多个线程都查到了0,然后都抢到了票
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1) {
//库存不足
return Result.fail("库存不足");
}
//5.一人一单
Long userId = UserHolder.getUser().getId();
//5.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//5.2判断是否存在
if(count>1){
//用户已经购买过了
return Result.fail("用户已经购买过一次");
}
//6.扣减库存
boolean success=seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock =stock - 1
.eq("voucher_id",voucherId)
.gt("stock",0)//where id=? and stock > 0
.update();
if(!success){
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1订单Id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2用户Id
voucherOrder.setUserId(userId);
//7.3代金券Id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8.返回订单ID
return Result.ok(orderId);
}
优化代码
这里要对一整段代码加锁,将其抽取出来作为独立的方法,但是使用synchronized锁一个方法的话每次都只能有一个用户执行抢票,但是这里的加锁应该是针对同一个用户的多个请求.
锁id.toString()的话每次都是锁一个全新的对象. 不能起到作用。锁userId.toString().intern()的话就可以,这个是去字符串常量池找对象,常量池里每个字符串都有唯一的对象.
但是这个代码也有问题,事务注解是加在方法上的,锁的内容执行完之后到事务提交之前的这一段时间可能会有别的线程进来继续查询,查询到的也是旧数据,因为上一个事务没有提交.
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1) {
//库存不足
return Result.fail("库存不足");
}
return createVoucherOrder(voucherId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()) {
//5.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//5.2判断是否存在
if (count > 1) {
//用户已经购买过了
return Result.fail("用户已经购买过一次");
}
//6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock =stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0)//where id=? and stock > 0
.update();
if (!success) {
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1订单Id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2用户Id
voucherOrder.setUserId(userId);
//7.3代金券Id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8.返回订单ID
return Result.ok(orderId);
}
}
最终代码
如果是像下面这样,在锁里面调用加了事务的方法的话,会有事务失效的问题,
下面的调用createVoucherOrder实际是this.createVoucherOrder这样的话,事务注解拿到的是当前的VoucherOrderServiceImpl对象,而不是其代理对象.
事务能生效是因为拿到了VoucherOrderServiceImpl的代理对象,做了一个动态代理,对代理对象做了事务处理.this指的是目标对象,是没有事务功能的,这是spring事务失效的几种可能性之一.
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
这里为了拿到代理对象,需要导入一个新依赖,并在启动类上开启代理对象暴露
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
@MapperScan("com.hmdp.mapper")
@EnableAspectJAutoProxy(exposeProxy = true)
@SpringBootApplication
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
接口层里新增一个方法
public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
Result createVoucherOrder(Long voucherId);
}
实现层里代码
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private ISeckillVoucherService seckillVoucherService;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if (voucher.getStock()<1) {
//库存不足
return Result.fail("库存不足");
}
//5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()) {
//取到了当前代理对象
IVoucherService proxy =(IVoucherService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//5.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//5.2判断是否存在
if (count > 0) {
//用户已经购买过了
return Result.fail("用户已经购买过一次");
}
//6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock =stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0)//where id=? and stock > 0
.update();
if (!success) {
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1订单Id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2用户Id
voucherOrder.setUserId(userId);
//7.3代金券Id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8.返回订单ID
return Result.ok(orderId);
}
}
50抢20测试结果:
成功实现只能抢一张票
在代码里面,如果是大于1就是允许抢两张票,想控制用户可抢票数量可以根据这里进行修改
//5.2判断是否存在
if (count > 0) {
//用户已经购买过了
return Result.fail("用户已经购买过一次");
}
集群下的线程并发安全问题
两个系统的代码互不干涉,所以肯定会有并发问题.
两台tomcat,两个jvm,两个字符串常量池,两把锁。
分布式下需要使用同一把锁,由此引出分布式锁.
分布式锁
基本原理和不同实现方式对比
Redis实现分布式锁的基本思路
代码实现
public class SimpleRedisLock implements ILock{
private String lockname;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
public SimpleRedisLock(String lockname, StringRedisTemplate stringRedisTemplate) {
this.lockname = lockname;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean trylock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().threadId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+lockname, threadId+"", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); //自动拆箱有风险
}
@Override
public void unlock() {
//释放锁完成
stringRedisTemplate.delete(KEY_PREFIX+lockname);
}
}
使用分布式锁对代码进行优化
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
...
//2.判断秒杀是否开始
...
//3.判断秒杀是否已经结束
...
//4.判断库存是否充足
...
//5.一人一单
Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁
boolean isLock = lock.trylock(1200);
//判断是否获取锁成功
if(!isLock){
//获取锁失败,返回报错
return Result.fail("不允许重复下单");
}
try {
//取到了当前代理对象
IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
//释放锁
lock.unlock();
}
}
@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
...
}
}
分布式锁误删问题
如下图所示,线程1释放了线程2的锁,然后这时候线程3过来获取了锁,然后就有两个线程在同时跑。这里可以取出锁之后判断是不是自己的标识.是才可以释放锁.
之前直接使用线程ID作为标识是不够的,可能在分布式时有多个线程有相同id
代码优化
public class SimpleRedisLock implements ILock{
private String lockname;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
public SimpleRedisLock(String lockname, StringRedisTemplate stringRedisTemplate) {
this.lockname = lockname;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean trylock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+lockname, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); //自动拆箱有风险
}
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + lockname);
//判断标识是否一致
if(threadId.equals(id)) {
//释放锁完成
stringRedisTemplate.delete(KEY_PREFIX + lockname);
}
}
}
分布式锁的原子性问题
在执行完业务到释放锁期间可能会因为jvm的垃圾回收fullGC发生阻塞。时间一长就触发超时释放。此时别的项目里的线程2过来加锁,又因为线程1里面已经判断过锁标识一致,所以会直接释放线程2的锁。
Lua脚本解决多条命令原子性问题
使用Lua脚本改造分布式锁
要准备一个脚本文件unlock.lua,这个文件放在和application.yaml同级的目录下
-- 比较线程标识与锁中的标识是否一致
if(redis.call('get',KEYS[1])==ARGV[1]) then
-- 释放锁 del key
return redis.call('del',KEYS[1])
end
return 0
改造后代码
public class SimpleRedisLock implements ILock{
private String lockname;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String lockname, StringRedisTemplate stringRedisTemplate) {
this.lockname = lockname;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean trylock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+lockname, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); //自动拆箱有风险
}
@Override
public void unlock() {
// //获取线程标识
// String threadId = ID_PREFIX+Thread.currentThread().getId();
// //获取锁中的标识
// String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + lockname);
// //判断标识是否一致
// if(threadId.equals(id)) {
// //模拟此处发生阻塞导致锁超时释放
// //Thread.sleep(1000);
// //释放锁完成
// stringRedisTemplate.delete(KEY_PREFIX + lockname);
// }
//调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+lockname),
ID_PREFIX+Thread.currentThread().getId());
}
}
总结
Redisson
快速入门
改造下单业务代码
经过测试也是可以正常使用。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
//创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁
// boolean isLock = lock.trylock(1200);
boolean isLock = lock.tryLock();
//判断是否获取锁成功
if(!isLock){
//获取锁失败,返回报错
return Result.fail("不允许重复下单");
}
try {
//取到了当前代理对象
IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
//释放锁
lock.unlock();
}
}
}
Redisson可重入锁原理
在下图左边的demo里面,是一个线程先后尝试获取锁共两次,在之前自己写的业务流程是没办法获取两次的。
参考jdk实现可重入锁的思路,同一个线程每次获取的时候记录次数加1,释放的时候次数减1.要记录锁的标识的和锁的重入次数可以使用hash结构。
每次释放锁的时候根据业务判断是减1还是删除锁。
使用Lua脚本完成可重入锁
获取锁的脚本
释放锁的脚本
Redisson的锁重试和WatchDog机制
整个流程如下所示
获取锁成功,发现锁的不过期的就利用看门狗机制一直刷新....逻辑很复杂。
Redisson的multiLock原理(解决主从一致性问题)
主节点宕机之后锁就失效了,别的线程就可以来获取锁了。
这个是必须所有主节点都获取到锁才算成功获取。