整整3W字笔记,Redis最核心的秒杀业务、分布式锁、消息队列相关原理一篇文章就搞定(黑马点评项目)

news2024/9/20 4:21:05

目录

 四、 优惠卷秒杀系列功能实现

4.1 全局ID生成器

4.1.1 全局ID生成器的选型

4.1.2 全局ID生成器的实现

4.1.3 全局ID生成器的测试

4.1.4 其他ID生成器的拓展

4.2  利用PostMan模拟管理员后台添加秒杀优惠卷信息 

 【代码实现】

 【PostMan测试】

 4.3 优惠卷秒杀下单功能基础逻辑实现

【功能说明】

【代码实现】

【功能测试】

4.4 (优化)解决超卖问题

4.4.1 超卖现象重现及其原因

4.4.2 解决超卖问题的方案比较

悲观锁方案

乐观锁方案

4.4.3 乐观锁两大方案比较

 版本号法(涉及ABA问题时使用)

CAS法(不涉及ABA问题时使用)

4.4.4 基于CAS法解决超卖问题

4.5  (优化)实现单机情况下 限购一人一单

4.5.1 限购功能整体概述

4.5.2 限购功能实现及说明

【基础实现】

【基础锁实现】

【降低锁粒度实现】

【关于如何锁用户】

【扩大锁范围实现】

【解决代理对象事务】

4.5.3 限购功能测试检验

4.6 (拓展)集群模式下限购一人一单 

4.6.1 单体模式下限购功能实现的局限性

4.6.2 配置集群环境,测试限购代码的并发问题

4.6.3 解决策略——分布式锁

4.7 (知识点)分布式锁 

4.7.1 分布式锁介绍

4.7.2 基于Redis的分布式锁方案

4.7.3 基于Redis实现分布式锁的初级版本

4.7.3.1 代码实现

4.7.3.2 接口测试

4.7.4 (优化)解决分布式锁出现的误删问题

4.7.4.1 出现误删的原因判断

4.7.4.2 解决方案——检查锁是不是自己的:

4.7.4.3 代码实现及其测试

4.7.5 (优化)解决分布式锁的原子性问题

4.7.5.1 非原子性操作带来的并发安全问题

4.7.5.2 解决方案——引入Rua脚本实现命令原子化

4.7.5.3 Rua脚本在Redis中的基本使用

4.7.5.4 使用Rua编写释放锁脚本

4.7.5.5 在IDEA编写使用lua脚本的代码

4.7.5.6 功能测试

4.8 (拓展知识点) Redisson ——成熟的锁工具包 

4.8.1 基于setnx实现的分布式锁还存在的问题

4.8.2 Redisson介绍

4.8.3 Redisson入门

4.8.4 Redisson的可重入原理

4.8.5 (TODO)Redisson的可重试原理

4.8.6(TODO) Redisson的超时释放原理

4.8.7 Redisson的主从一致性原理

4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题

4.9 Redis优化秒杀系列功能实现 

4.9.1 业务性能瓶颈分析

4.9.2 优化流程分析

4.9.3 改进秒杀业务代码改造及详细解释

1. 将优惠卷信息存入Redis中

2. 基于Lua脚本,判断秒杀资格

3. 抢购成功,封装信息到阻塞队列中,实现异步下单

4.9.4 Redis优化秒杀功能测试

1. 简单测试功能是否实现,使用postman发送请求

2. 利用jmeter测试高并发情况

4.9.5 总结

4.10(知识点) 学习Redis消息队列实现异步秒杀

4.10.1 基于List类型模拟的消息队列原理及其使用

4.10.2  基于PubSub的消息队列原理及其使用

 4.10.3(三者之最) 基于Stream的消息队列原理及其使用

4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能

4.10.4.1 创建Stream消息队列

4.10.4.2 修改秒杀资格判断的Lua脚本

4.10.4.3 完善秒杀下单代码及其代码说明

1. 首先是修改预加载Lua脚本信息

2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本

3. 重头戏,开启线程任务!!

4.10.4.4 秒杀下单功能测试

1. 简单测试功能是否实现,使用postman发送请求

2. 利用jmeter测试高并发情况

4.11 总结情况


 四、 优惠卷秒杀系列功能实现

4.1 全局ID生成器

4.1.1 全局ID生成器的选型

为什么需要全局ID生成器呢?

        由于秒杀业务需要,有时候我们会生成数量极为庞大的业务数据。但是对于单张表,往往会存在存储数据的上限(随着存储数据的增加,查询的效率会越来越低),我们往往会进行分表操作。

        这么一来,分表带来的 ID不唯一、ID自增的规律性太强导致安全性低等问题就产生了。为了解决这些问题,我们可以利用全局ID生成器进行开发。

全局ID生成器需要具备的特征:

使用Redis符合上述五点需求

高可用、高性能: Redis的查询能力本身就比数据库要快得多

唯一性: 由于Redis数据库可以全局通用一张表,因此可以更好地维护ID的唯一性

递增性:Redis拥有自增操作,INCREMENT

安全性:通过设置ID的自增规律,可以提高ID的安全性

全局ID生成器的格式规范:

4.1.2 全局ID生成器的实现

步骤:

1. 获取一个基准时间戳,标记为BEGIN_TIMESTAMP

2. 获取当前时间戳标记为now

3. 计算全局ID的时间戳部分 

4. 以天为单位,分割key【可以更好的统计每天的订单数量】

5. 获取全局ID的序列号【调用Redis的自增方法】

6. 利用运算符将时间戳 与 序列号 进行拼接并返回

  在utils包下新建RedisWorkers用于实现全局ID生成:

@Component
public class RedisIdWorker {
    // 开始时间戳 2024-09-12-0-0-0
    private static final long BEGIN_TIMESTAMP = 1726099200L;
    // 序列号位数
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public Long nextId(String keyPrefix){
        // 1. 时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond-BEGIN_TIMESTAMP;

        // 2. 序列号
        //2.1 获取当前日期,精确到天
        String date =  now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
        // 3. 拼接返回
        return timestamp << COUNT_BITS | count;
    }

    /**
     * 获取当前时间戳
     * @param args
     */
    public static void main(String[] args) {
        LocalDateTime time = LocalDateTime.of(2024,9,12,0,0,0);
        Long second = time.toEpochSecond(ZoneOffset.UTC); // 1726099200
        System.out.println("second=" + second);
    }
}

4.1.3 全局ID生成器的测试

开启3000个线程任务,每个线程插入100条订单数据,运行查看效果

@SpringBootTest
class HmDianPingApplicationTests {
    @Resource
    private RedisIdWorker redisIdWorker;

    private ExecutorService es = Executors.newFixedThreadPool(500);

    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3000);
        Runnable task = () ->{
            for(int i=0;i<100;i++){
                long id = redisIdWorker.nextId("order");
                System.out.println("id = " + id);
            }
            latch.countDown();
        };
        long begin = System.currentTimeMillis();
        for(int i=0;i<3000;i++){
            es.submit(task);
        }
        latch.await();
        long end = System.currentTimeMillis();
        System.out.println("time = " + (end-begin));
    }
}

​​​​​​以日期作为key分割,可以统计当天订单数
7秒生成了30万条不重复的全局ID数据

4.1.4 其他ID生成器的拓展

除了采用Redis自增生成全局ID,还有以下方法可行:

1. UUID 【无序】

2. 数据库自增 【数据量不能太多】

3. 雪花算法 (本次项目使用的Redis自增实际上就是运用了雪花算法的思想)

Redis自增策略

1. 每天一个key,方便统计订单量

2. ID构造是 时间戳 + 计数器

4.2  利用PostMan模拟管理员后台添加秒杀优惠卷信息 

常规优惠券表结构
秒杀优惠券表结构

 【代码实现】

/**
     * 新增秒杀优惠券
     * 多表添加 加事务
     * @param voucher
     */
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        // 关联优惠券id
        seckillVoucher.setVoucherId(voucher.getId());
        // 秒杀库存
        seckillVoucher.setStock(voucher.getStock());
        // 开始时间
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        // 结束时间
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
    }

 【PostMan测试】

 4.3 优惠卷秒杀下单功能基础逻辑实现

【功能说明】

实现基本的下单逻辑

接口说明

请求方式

POST
请求路径        /voucher-order/seckill/{id}
请求参数优惠卷id
返回值        订单Id

【代码实现】

利用全局Id生成订单id

    @Resource
    private ISeckillVoucherService seckillVoucherService;
    /**
     * 全局ID生成器
     */
    @Resource
    private RedisIdWorker redisIdWorker;

    /**
     * 秒杀优惠券下单
     * @param voucherId
     * @return
     */
    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        //1. 提交优惠卷id 查询优惠卷id信息
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        if(voucher == null){
            return Result.fail("优惠券不存在!");
        }

        //2. 判断优惠卷是否在抢购时间内
        LocalDateTime beginTime = voucher.getBeginTime();
        LocalDateTime endTime = voucher.getEndTime();
        long stockCount =voucher.getStock();
        if(beginTime.isAfter(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀尚未开始!");
        }
        if(endTime.isBefore(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀已经结束!");
        }

        //3. 判断优惠卷库存是否充足
        if(stockCount <= 0){
            //   否----> 返回异常信息---->结束
            return Result.fail("库存不足!");
        }

        //4. 扣减库存
        boolean success =  seckillVoucherService.update()
                            .setSql("stock = stock-1")
                            .eq("voucher_id",voucherId).update();
        if(!success){
            return Result.fail("库存不足!");
        }
        //5. 创建优惠卷订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //5.1 订单id 【全局ID生成器】4
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //5.2 用户id 【当前登录用户】
        long userId =  UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //5.3 优惠卷id 【传递过来的优惠卷id】
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        //6. 返回订单id ----> 结束
        return Result.ok(orderId);
    }

【功能测试】

点击抢购,成功生成订单信息,库存扣减1

4.4 (优化)解决超卖问题

4.4.1 超卖现象重现及其原因

步骤:

        利用jmeter进行多线程模拟高并发抢购优惠卷时的情况

现象:

        发现优惠卷有超卖的现象

原因:

        在多线程高并发访问的环境下,当库存为1时,正常情况下最后一个线程在执行完最后一个扣减操作的过程中,有其余的线程涌入,此时由于扣减操作未完成,库存量还是为1,从而误判库存剩余,最后导致超卖。

 现象回顾

配置请求路径

 配置JSON断言,根据响应的JSON内容更精确的反映请求是否异常

 

配置请求头,token从自己的Redis里面拿

响应报告:

回看数据库报告

        正常来说异常为50%才是正确的,但是这里的异常没有到50%。在高并发访问的过程中出现了超卖现象,即同一时刻两个线程同时完成导致优惠卷最后的数量异常。

异常不是50%
数据库优惠卷变成了-9

4.4.2 解决超卖问题的方案比较

悲观锁方案
  • 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronizedlock
  • 悲观锁,适合写入操作较多、冲突频繁的场景
乐观锁方案
  • 乐观锁,认为线程安全问题不一定发生,因此不加锁,而是判断有没有其他线程在自己进行操作时修改了数据,有则重试。常见的实现方式有:版本号法、CAS操作.
  • 乐观锁,适合读取操作较多、冲突较少的场景。

4.4.3 乐观锁两大方案比较

 版本号法(涉及ABA问题时使用)

        首先,我们要为数据库表新增一个版本号字段 version

        然后,线程开始查询库存及其库存版本号,记录版本号信息。

        接着,在通过判断后,执行扣减时,检查当前版本号与先前记录的版本号信息是否一致,如若一致(例如线程1),则可以顺利执行扣减更新语句。如若不一致(例如线程2),则无法执行扣减更新语句。

CAS法(不涉及ABA问题时使用)

        在版本号的基础上,可以发现其实库存量直接可以充当版本号的作用,于是就出现了更加简洁,不用额外添加字段的方法——CAS法。

        尽管CAS操作具有诸多优点,但它也存在一个潜在的问题,即ABA问题。ABA问题是指在CAS操作过程中,一个线程a将数值改成了b,接着又改成了a,此时CAS认为是没有变化,其实是已经变化过了。这个问题在多线程环境下可能会导致程序行为不符合预期,从而引发并发错误。

4.4.4 基于CAS法解决超卖问题

只需要在扣减库存这里加多一个库存值的判断,同时记得如果判断出问题了要重试!

//4. 扣减库存
        boolean success =  seckillVoucherService.update()
                            .setSql("stock = stock-1")
                            .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
                            .update();

上述代码的异常率特别高 

解决失败率高的问题 

 //4. 扣减库存
        boolean success =  seckillVoucherService.update()
                            .setSql("stock = stock-1")
                            .eq("voucher_id",voucherId).gt("stock",0) // 库存大于0就行了
                            .update();

4.5  (优化)实现单机情况下 限购一人一单

高能瞬间:实战篇-07.优惠券秒杀-实现一人一单功能_哔哩哔哩_bilibili

4.5.1 限购功能整体概述

为什么要一人一单?

        对于秒杀优惠卷,我们当然不希望一个人承包大部分甚至所有的优惠卷,这种行径无疑是电商黄牛。我们希望每个顾客至多只能抢购一张券。

限购一人一单实现逻辑?

        在扣减库存之前,我们对优惠卷数据进行查询,查看当前用户(用户id)是否在数据库中已经存在抢购数据,如果有了,则不再给予抢购机会。否则才会进行扣减库存的操作

实现限购功能需要克服的潜在风险?

存在线程并发问题: 假设有100个相同用户标识的线程同时查询数据库,发现数据库没有数据时,100个线程同时通过了判断,结果导致1个用户抢了多单

4.5.2 限购功能实现及说明

【基础实现】

根据前面的概述,我们大概明白了整个功能的逻辑,实际上就是简单的增加一个判断。

//4. 限制一人一单【悲观锁方案】
        Long userId = UserHolder.getUser().getId();

        //4.1 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        //4.2 判断订单是否存在
        // 是 -----> 返回异常信息---->结束
        if (count > 0) {
            return Result.fail("该用户已经购买过一次了!");
        }
【基础锁实现】

但是单单只添加判断逻辑,还会造成线程并发问题,因此我们需要给判断过程加锁,最简单的方式就是将一人一单功能抽取成一个方法,在方法上添加synchronized关键字

@Transactional
    public synchronized Result createVoucherOrder(Long voucherId) {
        //4. 限制一人一单【悲观锁方案】
        Long userId = UserHolder.getUser().getId();

        //4.1 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        //4.2 判断订单是否存在
        // 是 -----> 返回异常信息---->结束
        if (count > 0) {
            return Result.fail("该用户已经购买过一次了!");
        }

        //5. 扣减库存——解决超卖问题【乐观锁方案】
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1")
                .eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
                .update();
        if (!success) {
            return Result.fail("库存不足!");
        }


        //6. 创建优惠卷订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1 订单id 【全局ID生成器】4
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2 用户id 【当前登录用户】
//        long userId =  UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //6.3 优惠卷id 【传递过来的优惠卷id】
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        //7. 返回订单id ----> 结束
        return Result.ok(orderId);
    }
【降低锁粒度实现】

以上代码还无法实现全部功能,由于我们将整个方法都锁住了,相当于每个用户进行抢购时,都需要判断该方法是否已经被执行,这样会导致整个抢购秒杀过程退化成串行执行。 而我们想实现的是同一个用户标识线程串行执行,不同用户之间并行执行。因此我们需要降低锁的粒度,将锁作用在用户上。

【关于如何锁用户】

userId.toString().intern()

由于同一个用户标识的不同线程创建的User对象实际上并不是同一个,为了能达成用户级锁,我们必须是按值锁,因此需要调用toString().intern()方法

@Transactional
    public Result createVoucherOrder(Long voucherId) {
        //4. 限制一人一单【悲观锁方案】
        Long userId = UserHolder.getUser().getId();
        
        synchronized (userId.toString().intern()) {
            //4.1 查询订单
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            //4.2 判断订单是否存在
            // 是 -----> 返回异常信息---->结束
            if (count > 0) {
                return Result.fail("该用户已经购买过一次了!");
            }

            //5. 扣减库存——解决超卖问题【乐观锁方案】
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock-1")
                    .eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
                    .update();
            if (!success) {
                return Result.fail("库存不足!");
            }


            //6. 创建优惠卷订单
            VoucherOrder voucherOrder = new VoucherOrder();
            //6.1 订单id 【全局ID生成器】4
            long orderId = redisIdWorker.nextId("order");
            voucherOrder.setId(orderId);
            //6.2 用户id 【当前登录用户】
//        long userId =  UserHolder.getUser().getId();
            voucherOrder.setUserId(userId);
            //6.3 优惠卷id 【传递过来的优惠卷id】
            voucherOrder.setVoucherId(voucherId);
            save(voucherOrder);

            //7. 返回订单id ----> 结束
            return Result.ok(orderId);
        }
    }
【扩大锁范围实现】

上述代码实现了将锁的粒度成功的缩小到了用户级,但是还不是最优的代码,还存在一定的风险。具体的,由于@Transactional事务是作用在方法上的,而现在我们的锁仅仅是作用在方法内部,这样一来就会存在一种异常情况:当方法内部都执行完毕了,锁会先释放,然后才是Spring自动提交事务。在提交事务完成之前,锁已经释放掉了,这也就意味着在这期间其他的线程完全有机会趁虚而入。为了解决这个问题,我们必须把锁的范围重新扩大到将方法包裹其中。

/**
     * 秒杀优惠券下单
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        //1. 提交优惠卷id 查询优惠卷id信息
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        if(voucher == null){
            return Result.fail("优惠券不存在!");
        }

        //2. 判断优惠卷是否在抢购时间内
        LocalDateTime beginTime = voucher.getBeginTime();
        LocalDateTime endTime = voucher.getEndTime();
        long stockCount =voucher.getStock();
        if(beginTime.isAfter(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀尚未开始!");
        }
        if(endTime.isBefore(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀已经结束!");
        }

        //3. 判断优惠卷库存是否充足
        if(stockCount <= 0){
            //   否----> 返回异常信息---->结束
            return Result.fail("库存不足!");
        }


        /* 为什么要加在createVoucherOrder方法中?
         * 因为createVoucherOrder方法中已经加了事务,如果加了锁,事务回滚,锁也会释放,这样就会导致锁失效

         * 为什么要指定锁用户id?
         * 将锁的粒度降低,将锁的粒度降低到用户粒度,这样就可以保证一个用户只会有一个订单
         * 添加.intern() 确保是按值加锁  而不是按对象加锁

         * 为什么要使用代理对象?
         * @Transactional注解是Spring的事务注解,它只能在Spring管理的Bean中生效,
         * 如果直接在ServiceImpl中调用createVoucherOrder方法,
         * 那么@Transactional注解就不会生效,
         * 因为createVoucherOrder方法不是在Spring管理的Bean中调用的,
         * 所以需要使用代理对象来调用createVoucherOrder方法,这样@Transactional注解就会生效
         */
        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            return createVoucherOrder(voucherId);
        }
    }
【解决代理对象事务】

上述代码看起来很完美了,但是事实上还是存在问题@Transactional事务机制是失效的。这是由于我们创建的createVoucherOrder()方法并不受Spring管理,所以没有办法使用Spring的事务机制。要解决这个问题,我们需要引入Spring中AspectJ注解支持来暴露代理对象,通过代理对象机制实现事务功能。具体操作如下:

1. 导入AspectJ注解支持依赖

2. 获取createVoucherOrder()方法的代理对象

3. 通过代理对象调用方法

4. 在启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解,暴露代理对象

        <!--动态代理模式-->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
/**
 * 启动类添加@EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
 *
 */        
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
// 开启AspectJ注解支持 暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {

    public static void main(String[] args) {
        SpringApplication.run(HmDianPingApplication.class, args);
    }

}



Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
        // 获取代理对象
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
}

4.5.3 限购功能测试检验

        两百个线程共用一个token向后端发送请求,来模拟同一用户高并发抢购的情况

        200个线程只有1个通过了请求,数据库中优惠卷剩余99张,优惠卷订单只有1个。证明我们的功能实现了 

4.6 (拓展)集群模式下限购一人一单 

4.6.1 单体模式下限购功能实现的局限性

        在4.5时,我们成功的完成了限购一人一单的业务功能。事实上还是有相对较大的局限性。由于秒杀业务的高并发性,为了防止单台服务器压力过大的原因,我们会考虑将多台服务器部署成一个集群环境。这时候,我们在单体环境中写的加用户锁方案就有了并发安全问题。

如下图,我们知道。Java提供的锁方案事实上是由JVM下的锁监视器去进行监听的。因此在单体环境中,由于锁监视器唯一,可以正常的执行线程监听任务。但是放在集群环境中,每一个服务器都有一个JVM平台,都有一个锁监视器。在nginx服务器发送轮询请求时,可以有多个线程同时获得锁,同时执行。于是乎限购功能出现了并发安全问题。 

4.6.2 配置集群环境,测试限购代码的并发问题

配置两台Tomcat服务器 

配置nginx服务器轮询访问

并重新启动nginx服务器

测试集群环境

访问:localhost:8080/api/voucher/list/1  两次

8081、8082 各被轮询了1次

4.6.3 解决策略——分布式锁

        解决策略无非是将监视器统一,且必须要在集成环境中唯一共享。分布式锁正是这样一种策略。

        我们将会在下一小结详细介绍分布式锁的相关知识,并使用分布式锁在集群环境下解决并发安全问题

4.7 (知识点)分布式锁 

4.7.1 分布式锁介绍

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的特点:

1、高可用的获取锁与释放锁;

2、高性能的获取锁与释放锁;

3、多进程可见

4、具备可重入特性;

5、具备锁失效机制,防止死锁;

6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

分布式锁的实现方式:

1. 基于数据库的实现:

        使用数据库的事务和锁机制来实现分布式锁。可靠性高,支持事务机制和锁机制,但是性能较低,对数据库性能有一定影响。适用于需要强一致性和高可靠性的场景

2. 基于缓存的实现:

        使用缓存系统(如Redis)的原子操作来实现分布式锁。性能较高,支持原子操作,但是可靠性相对较低,需要处理缓存故障和失效的情况。适用于高并发场景。

3. 基于分布式协调服务的实现:

        使用分布式协调服务(如ZooKeeper、etcd)来实现分布式锁。通过创建临时顺序节点和监听节点变化来控制锁的获取和释放。可靠性高,支持分布式环境,但是性能较低,对分布式协调服务的性能有一定影响。适用于需要高可靠性和一致性的场景。

4. 基于消息队列的实现:

        使用消息队列来实现分布式锁。通过发送和接收消息来控制锁的获取和释放。

优点简单易用,适用于简单的场景。缺点性能较低,不适用于高并发和高吞吐量的场景。

4.7.2 基于Redis的分布式锁方案

获取锁 + 添加过期时间  【且确保原子性操作】

EX 设置过期时间   NX 不存在才添加

SETNX lock thread1 EX 10 NX

释放锁

DEL key

4.7.3 基于Redis实现分布式锁的初级版本

利用Redis,完成基本的分布式锁构建

4.7.3.1 代码实现

创建Lock接口

//父类

public interface ILock {

    /**
     * 获取锁
     * @param timeoutSec
     * @return
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();
}

实现Redis分布式锁方法 

//实现 ILock接口
public class SimpleRedisLock implements ILock{

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    // 锁前缀
    private static final String KEY_PREFIX = "lock:";

    /**
     * 获取锁
     * @param timeoutSec
     * @return
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        long threadId = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name ,threadId+"",timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

使用分布式锁 

 /*
         *   分布式锁方案
         */
        // 1. 创建锁对象
        SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

        // 2. 尝试获取锁
         boolean isLock =  lock.tryLock(1200);

        // 3. 判断锁是否获取成功
        if(! isLock){
            return Result.fail("不允许重复下单");
        }
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        } finally {
            // 4. 释放锁
            lock.unlock();
        }
4.7.3.2 接口测试

利用postman进行测试

发送两个请求,获取锁成功,一个获取锁失败,线程Id为72

 

4.7.4 (优化)解决分布式锁出现的误删问题

4.7.4.1 出现误删的原因判断

        线程1获取分布式锁后运行,结果发生了业务阻塞,阻塞时间大于设定的超时释放锁时间。于是在线程1还未执行完业务时,分布式锁已经被释放了。

        这时候线程2获取锁开始执行业务,恰好线程1结束了阻塞,也顺利的结束了业务,于是将锁释放了。这样一来线程2的锁又没有了,线程3恰巧又来了......如此反复,导致线程1释放线程2拿到的的锁;线程2释放线程3拿到的锁,造成并发安全问题

4.7.4.2 解决方案——检查锁是不是自己的:

如何知道这个锁是不是自己的呢?   -----在获取锁时存入线程标识【UUID】

流程:

4.7.4.3 代码实现及其测试

// 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();

// 判断锁是不是自己的
        if(threadId.equals(id)){    // 如果是自己的锁,则释放锁
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
        // 如果不是自己的锁,则不管

 /**
     * 获取锁
     * @param timeoutSec
     * @return
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();

        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name ,threadId,timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标识
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断锁是不是自己的
        if(threadId.equals(id)){    // 如果是自己的锁,则释放锁
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
        // 如果不是自己的锁,则不管
    }

集群环境测试

获取锁失败
获取锁成功

4.7.5 (优化)解决分布式锁的原子性问题

4.7.5.1 非原子性操作带来的并发安全问题

        在我们完善了Redis分布式锁误删问题后,程序还是没有达到完美状态。以下这种情况依旧存在并发安全问题:

        线程1获取锁执行业务,在执行业务完成后判断当前锁的标识是否和自己的一致,发现一致后,执行释放锁操作。但恰好在执行释放锁操作时出现了阻塞,知道超过了锁的超时释放时间。锁被释放了,于是乎线程2提前拿到了锁,结果线程1阻塞结束,又把线程2刚刚拿到的锁给释放掉了

4.7.5.2 解决方案——引入Rua脚本实现命令原子化

        出现上述问题,究其原因就是 判断锁标识 和 释放锁 是两个动作,不符合原子性。因此若是在两者间发生了阻塞,就会造成并发安全问题。

       所以解决策略也很简单, 将判断锁的操作和释放锁的操作组合成一个原子性操作,一起执行,要阻塞都阻塞,要通过都通过

        在Redis中, 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。

4.7.5.3 Rua脚本在Redis中的基本使用

1. 执行Redis命令

redis.call('命名名称','key','其他参数')

redi.call('set','name','jack')

2. 执行脚本

EVAL "脚本内容" 脚本需要的key类型的参数个数

# 执行无参脚本
EVAL "return redis.call('set','name','jack') " 0

# EVAL执行带参脚本
redis 127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"
4.7.5.4 使用Rua编写释放锁脚本
-- 释放锁的业务
-- 1. 获取锁中的线程标识
-- 2. 判断是否与指定的标识相同
-- 3. 一致则释放锁
----------------------开始脚本-------------------------

-- 锁的key
--local key = "lock:order:5"
local key = KEYS[1]

-- 当前线程标识
--local threadId = "asdasdasdasd"
local threadId = ARGV[1]

-- 获取锁中的线程标识
local id = redis.call("GET", key)

-- 比较锁中的线程标识与当前线程标识是否一致
if(id == threadId) then
    -- 释放锁
    redis.call("DEL", key)
end
return 0
----------------------结束脚本-------------------------
4.7.5.5 在IDEA编写使用lua脚本的代码

在SimpleRedisLock方法内修改:

// 提前读取脚本
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        // 指定脚本位置
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
        // 指定返回类型
        UNLOCK_SCRIPT.setResultType(Long.class);
    }


/**
     * 释放锁 --- 基于lua脚本
     */
    @Override
    public void unlock() {
        /*
         *   调用Lua脚本
         *   参数1: 预加载的lua脚本
         *   参数2: KEYS[]
         *   参数3: ARGV[]
         */
        String key = KEY_PREFIX + name;
        String argv = ID_PREFIX + Thread.currentThread().getId();
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(key),
                argv
        );
    }
4.7.5.6 功能测试

1.启动集群环境,先让8081 获取锁成功

2. 模拟锁超时,将Redis中的锁删掉,让8082获得一把新的锁

3. 让失去锁的8081执行lua脚本,观察8082的锁会不会被清除 

结果:不会被清除

4. 让获得锁的8082执行lua脚本,观察8082的锁会不会被清除 

结果: 被清除了

4.8 (拓展知识点) Redisson ——成熟的锁工具包 

4.8.1 基于setnx实现的分布式锁还存在的问题

1. 不可重入问题:同一个线程无法多次获取相同的一把锁
2. 不可重试问题:获取锁只尝试一次就返回false,没有重试机制
3. 超时释放问题:业务耗时过长,还是有可能导致锁释放,设置超时时间很讲究
4. 主从一致性问题:主从同步存在延迟,如果在主节点设置锁,在还没有同步到从节点时,主节点宕机。
        当然,以上问题指针对部分有需要的业务才会有问题,我们前面完善的Redis + lua脚本的分布式锁方案已经很优秀了

4.8.2 Redisson介绍

官网:Home · redisson/redisson Wiki (github.com)

        Redisson 是一个在 Redis 的基础上实现的一个 Java 驻内存数据网格(In-Memory Data Grid, IMDG)。它提供了丰富的分布式和可扩展的 Java 数据结构,包括分布式锁和同步器。Redisson 的可重试锁(Retryable Lock)是基于 Redis 的特性来实现的一种锁机制,它允许在锁不可用时进行重试,直到成功获取锁或者达到重试次数上限。

4.8.3 Redisson入门

1. 引入依赖

<!--Redisson-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.6</version>
        </dependency>

2. 创建Config文件

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        // 单机模式
        // 集群模式:config.useClusterServers().addNodeAddress("redis://192.168.186.136:7000");
        config.useSingleServer().setAddress("redis://192.168.186.136:6379").setPassword("123321");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

3. 使用Redisson——以秒杀优惠卷为例

/*
         *   Redisson方案
         */
        //1. 创建锁对象
        RLock lock =  redissonClient.getLock("lock:order:" + userId);

        //2. 尝试获取锁
        boolean isLock = lock.tryLock();

        // 3. 判断锁是否获取成功
        if(! isLock){
            return Result.fail("不允许重复下单");
        }
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        } finally {
            // 4. 释放锁
            lock.unlock();
        }

4.8.4 Redisson的可重入原理

不可重入锁的基本流程Demo:

        在method1中获取锁成功后,其调用的method2没办法获取锁。在这种嵌套的业务中,不可重入锁具有局限性

如何实现可重入?
添加一个当前获取锁的次数字段   —— hash结构存储

        
        同一个线程内的业务方法在获取锁的时候,发现如果锁存在了就将锁的次数+1,锁的次数就代表了当前获取锁的方法个数。在进行删除锁时,需要先检查锁的重入次数-1 后是否为 0 ,是才能释放锁,否则只做-1操作。

可重入锁的设计流程

可重入锁的脚本实现

---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by good boy.
--- DateTime: 2024/9/15 16:30
---

------------------------获取可重入锁的Lua脚本-------------------------------------
local key = KEYS[1] -- 获取锁的key
local threadId = ARGV[1] -- 获取线程唯一标识
local releaseTime = ARGV[2] -- 获取锁的自动释放时间

-- 判断锁是否存在
    -- 不存在的逻辑
if(redis.call('exists',key) == 0 ) then
    -- 不存在则,获取锁,标记获取锁的次数
    redis.call('hset', key, threadId, '1');
    -- 设置过期时间
    redis.call('expire', key, releaseTime);
    -- 返回获取锁成功
    return 1;
end;

-- 锁已经存在,判断线程标识是不是自己的
    -- 是自己的逻辑
if(redis.call('hexists', key, threadId) == 1) then
    -- 是自己的,则获取锁的次数+1
    redis.call('hincrby', key, threadId, 1);
    -- 重置有效期
    redis.call('expire', key, releaseTime);
    -- 返回获取锁成功
    return 1;
end;

-- 走到这里说明获取到的锁并不是自己的
return 0;




----------------释放可重入锁的Lua脚本-----------------------------
local key = KEYS[1] --获取锁的key
local threadId = ARGV[1] --获取线程唯一标识
local releaseTime = ARGV[2] --获取锁的自动释放时间

-- 判断锁是不是自己的
    -- 锁不是自己的逻辑
if(redis.call('hexists', key, threadId) == 0 ) then
    return nil; -- 锁不是自己的,直接返回nil,表示自己的锁已经被释放了
end;

-- 锁是自己的逻辑
-- 将锁的计数器-1
local count = redis.call('hincrby', key, threadId, -1);

-- 判断锁的计数器是否为0
if(count > 0) then
    -- 大于0,说明还有别的方法获取了锁,不用释放锁,只需要重置过期时间,让其他业务有足够的时间执行
    redis.call('expire', key, releaseTime);
    return nil;
else
    -- 等于0,说明最外层业务已经完成了,可以释放锁
    redis.call('del', key);
    return nil;
end;

4.8.5 (TODO)Redisson的可重试原理

基于看门狗 + 消息订阅模型实现

(看晕了,源码留后面啃)

实战篇-19.分布式锁-Redisson的可重入锁原理_哔哩哔哩_bilibili

4.8.6(TODO) Redisson的超时释放原理

基于看门狗的定时续约机制

        简单来说,在成功获取锁之后,系统会立即启动一个自动续期机制(通常通过scheduleExpirationRenewal(threadId)方法实现),该机制利用一个映射(map)来关联业务名称与定时任务。这个定时任务被设置为每隔一定时间(例如10秒)执行一次,其主要职责是重置锁的最大超时时间。通过递归或循环调用重置锁时间的逻辑,确保锁在业务执行期间不会因为超时而被自动释放,从而实现了锁的“永久持有”效果,直到业务逻辑执行完毕。

当业务逻辑执行完成并显式释放锁时,系统会同时取消之前设置的定时任务,以避免不必要的资源消耗。这种设计确保了锁的持有与业务执行周期紧密相关,既提高了系统的灵活性,又增强了资源使用的效率。

实战篇-20.分布式锁-Redisson的锁重试和WatchDog机制_哔哩哔哩_bilibili

4.8.7 Redisson的主从一致性原理

Redis主从集群模式造成一致性问题的原因

        Java应用发送命令到主节点,主节点崩溃,Redis哨兵机制选取更新最近主节点的从节点,将其变成新的主节点。但是还是存在先前存储的命令失效问题

Redisson解决主从一致性问题方案——连锁方案

        取消单主多从方案,采用高可用集群 + 主从分布

4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题

实战篇-21.分布式锁-Redisson的multiLock原理_哔哩哔哩_bilibili

太吃电脑性能了,得晚点实现,或者等上docker

4.9 Redis优化秒杀系列功能实现 

4.9.1 业务性能瓶颈分析

优化前的方案

        业务耗时为Tomcat六步总和,其中查询优惠卷、查询订单、减库存、创建订单都去数据库查询,效率慢。此外,目前我们还没实现MySQL集群,就更慢了

优化后的方案

        将校验秒杀资格的两个环节提前到Redis异步执行,从而减少处理的环节,提高整体效率。

4.9.2 优化流程分析

Redis部分——lua脚本实现     

        1. 判断秒杀库存------使用String结构存储
        2. 校验一人一单------使用Set结构存储userId

Java部分

        1. 判断lua脚本执行结果
        2. 返回“小票”,
优惠卷id、用户id、订单id

        

4.9.3 改进秒杀业务代码改造及详细解释

1. 将优惠卷信息存入Redis中
/**
     * 新增秒杀优惠券
     * 多表添加 加事务
     * @param voucher
     */
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);

        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        // 关联优惠券id
        seckillVoucher.setVoucherId(voucher.getId());
        // 秒杀库存
        seckillVoucher.setStock(voucher.getStock());
        // 开始时间
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        // 结束时间
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);

        // 保存库存到Redis
        stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
    }
添加优惠卷
将优惠卷保存到Redis
2. 基于Lua脚本,判断秒杀资格
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by good boy.
--- DateTime: 2024/9/15 22:30
---

-----------判断秒杀资格的lua脚本---------------------------------
local voucherId = ARGV[1]   -- 优惠卷id
local userId = ARGV[2]      -- 用户id

local stockKey = 'seckill:stock:' .. voucherId  -- 库存key
local orderKey = 'seckill:order:' .. voucherId  -- 订单key

-- 脚本开始
-- 1. 判断库存是否充足
if( tonumber(redis.call('get', stockKey))  <=0) then
    -- 库存不足返回1
    return 1
end

-- 2. 判断用户是否下单
if( redis.call('sismember', orderKey, userId) == 1 ) then
    -- 用户已经下单返回2
    return 2
end
-- 3. 扣减库存 incrby -1
redis.call('incrby', stockKey, -1)

-- 4. 下单(保存用户)
redis.call('sadd', orderKey, userId)

-- 5. 返回0表示下单成功
return 0

/**
     * 秒杀优惠券下单------秒杀优化代码----lua脚本
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                        SECKILL_SCRIPT,
                        Collections.emptyList(),
                        voucherId.toString(),userId.toString()
        );
        //2.判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //3.不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
        }

        long orderId =  redisIdWorker.nextId("order");
        //TODO 4.为0,代表有购买资格,将下单信息保存至阻塞队列

        //5.返回订单id
        return Result.ok(orderId);
    }
测试基本功能:第一次请求成功
第二次请求失败

3. 抢购成功,封装信息到阻塞队列中,实现异步下单

        调用创建订单逻辑,首先会进入到seckillVoucher秒杀优惠卷,进入seckillVouvher方法后,首先调用Lua脚本去判断该用户请求是否具备秒杀资格,如果具备秒杀资格,则创建订单信息,并将订单信息保存到阻塞队列中去.

    /**
     * 预加载lua脚本
     */
    private static DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/Seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);

    }
    /**
     * 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程
     * @param voucherId
     * @return
     */
    private IVoucherOrderService proxy;
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                        SECKILL_SCRIPT,
                        Collections.emptyList(),
                        voucherId.toString(),userId.toString()
        );
        //2.判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //3.不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
        }

        long orderId =  redisIdWorker.nextId("order");

        //4.为0,代表有购买资格,将下单信息保存至阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);

        // 放入阻塞队列
        orderTasks.add(voucherOrder);
        //提前 获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();

        //5.返回订单id
        return Result.ok(orderId);
    }

        注意标号顺序,我们提前定义了阻塞队列和线程池,并将线程任务放到初始化方法中,只要程序一启动,就会执行init()中的方法。
        当seckillVoucher方法执行成功将订单信息存储到阻塞队列中后。只要阻塞队列有值,我们就会通过线程任务方法VoucherOrderHandler()方法取出队首元素,并且调用handleVocherOrder()方法创建订单。
        进入到handleVocherOrder()方法,我们再次判断是否正常,判断通过后调用创建订单方法createVoucherOrder()
        由于该方法添加了事务,在子线程中事务及其代理对象都失效了,为了能继续使用代理对象进行调用,我们在主方法seckillVoucher()中提前声明并赋值了代理对象,这样在子线程任务中就可以获取到代理对象执行创建订单的方法了。
        进入到最后的创建订单方法createVoucherOrder() ,首先对一人一单、库存超卖问题再次进行判断,均通过以后创建订单。
通过上述步骤实现了异步创建订单,提高了并发效率

//1. 创建-- 阻塞队列
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    //2,创建-- 秒杀线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //5. 初始化方法  一初始化就执行
    @PostConstruct
    public void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //4. 创建-- 订单外部类
    private void handleVocherOrder(VoucherOrder voucherOrder){
        // 获取用户
        Long userId = voucherOrder.getUserId();

        // 1. 创建锁对象
        RLock lock =  redissonClient.getLock("lock:order:" + userId);

        //2. 尝试获取锁
        boolean isLock = lock.tryLock();

        // 3. 判断锁是否获取成功
        if(! isLock){
            log.error("不允许重复下单");
        }
        try {
             proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 4. 释放锁
            lock.unlock();
        }
    }

    //3. 创建-- 秒杀线程任务
    private class VoucherOrderHandler implements Runnable{

        @Override
        public void run() {
            while (true) {
                try {
                    //1. 获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    //2. 创建订单
                    handleVocherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("获取订单异常",e);
                }
            }
        }
    }
/**
     * 秒杀优惠券下单------秒杀优化代码----创建订单
     * @param voucherOrder
     */
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        //4. 限制一人一单【悲观锁方案】
        Long userId = voucherOrder.getUserId();

        //4.1 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        //4.2 判断订单是否存在
        // 是 -----> 返回异常信息---->结束
        if (count > 0) {
            log.error("用户已经购买了一次了");
        }

        //5. 扣减库存——解决超卖问题【乐观锁方案】
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // 库存大于0就行了
                .update();
        if (!success) {
            log.error("库存不足");
        }

        //6. 创建订单
        save(voucherOrder);
    }

4.9.4 Redis优化秒杀功能测试

1. 简单测试功能是否实现,使用postman发送请求

【错误提醒】千万不要把Redis中的  seckill:stock:11 删掉啊!这个优惠卷信息是我们创建优惠卷时顺便存进去的,你要是将它清除了,后面的测试就进行不了了,lua脚本会报nil错误的(俺排了大半个小时)

第一次请求成功
第二次请求失败
2. 利用jmeter测试高并发情况

这里我生成了1000个token的文件,实际上是10个token重复了100次,用于模拟1000个请求(10个用户),同时请求jmeter的情况。结果和视频中做出来的不太一样,但是因为没有1000不同用户,所有我也没办法求证我的程序性能到底得到优化没有

准备10个用户token

优惠卷改成了5张

1000个token

准备1000个token的文件
配置基本路径信息
配置请求头读取
配置读取文件

开启测试

测试1s内发送1000个请求 10个不同用户token,抢购5张券

QPS:379,平均耗时:1076
没有出现超卖现象

测试 1s内发送100个请求10个用户 抢5张券

吞吐:595,平均耗时:39

测试5秒内 发送10000个请求 10个不同用户token 抢5张券

吞吐:621,平均耗时:1934

测试1秒内 发送10000个请求 10个不同用户token 抢5张券【有点不行】

吞吐:934,平均耗时:4308

4.9.5 总结

简单来说,就是Java阻塞队列还不够好

4.10(知识点) 学习Redis消息队列实现异步秒杀

消息队列(Message Queue):存放消息的队列。最简单的消息队列模型包括3个角色:

        消息队列:存储和管理消息,也被称为消息代理(Message Broker)
        生产者:发送消息到消息队列
        消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:
        list结构:基于List结构模拟消息队列
        PubSub:基本的点对点消息模型
        Stream:比较完善的消息队列模型

基础的消息队列模型

4.10.1 基于List类型模拟的消息队列原理及其使用

        在Day2中我就总结过Redis中各个数据结构使用的业务场景,其中基于双向链表实现的list结构则可以用于模拟消息队列的功能(Redis学习Day2——Redis基础使用-CSDN博客)

实现原理

  • 使用 LPUSH + RPOP 或 RPUSH + LPOP 实现消息队列
  • 使用 BLPOP 或 BRPOP 实现带有阻塞效果的消息队列

实现效果

 基于List结构实现的消息队列比较简单,只支持单消费者的模式

单消费者

优点

  • 相比于java实现的阻塞队列,不会受限于JVM的存储上限,没有未知的内存溢出风险
  • 属于Redis的基础数据结构,可以实现持久化存储,数据安全性也有所保障
  • List队列,先进先出,消息处理有序

缺点

  • 消息丢失无法避免
  • 只支持单消费者模式

缺点说明

        假设从Redis中取出一条消息,但是还没来得及处理就挂掉了,结果等到恢复正常后,刚从队列中取出来的消息已经不可复现了,因此存在消息丢失的风险。

        由于消息一旦被某个消费者拿走了,就会从消息队列中移除,因此该模式仅仅支持单消费者

常用命令

  • 阻塞监听
  • 存放消息

4.10.2  基于PubSub的消息队列原理及其使用

        PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

实现原理

  • 使用 SUBSCRIBE channel [channel] :订阅一个或多个频道
  • 使用 PUBLISH channel msg :向一个频道发送消息
  • 使用 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

实现效果

发布者 + 订阅者模式,也就是说,可以实现多消费者的消息队列模型

多消费者

优点

  • 采用发布订阅模型,支持多生产者、多消费者的模型

缺点

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出是数据丢失

缺点说明

        PubSub本身设计出来是用于消息发送的,不具备存储功能,因而没有持久化策略。当某个频道没有任何人订阅,在该频道发送的数据直接丢失了。此外PubSub发送的信息过多未处理完容易造成堆积丢失。因此PubSub不适用于可靠性要求高的场景。

常见命令使用

  • 订阅单个频道【自带阻塞】
  • 订阅多个频道
  • 发布消息

 4.10.3(三者之最) 基于Stream的消息队列原理及其使用

Stream是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

主要特征

  • 数据结构:Redis Stream是一个由有序消息组成的日志数据结构,每个消息都有一个全局唯一的ID,确保消息的顺序性和可追踪性。

  • 消息ID:消息的ID由两部分组成,分别是毫秒级时间戳和序列号。这种设计确保了消息ID的单调递增有序性。

  • 消费者组:Redis Stream新增消费者组的概念,允许多个消费者以组的形式订阅Stream,并且每个消息只会被组内的一个消费者处理,避免了消息的重复消费

​​​

优点

  • 消息可回溯,可被多个消费者读取,可阻塞读取
  • 消息读完了不消失,会永久的保持在队列当中。

缺点

  • 任然有消息漏读的风险,在处理消息的过程中,如果同时来了多条消息,最后可能只能读到最后一条新消息,从而造成了消息漏读

 常见命令使用

  • 发送消息 —— XADD
  • 读取消息—— XREAD
    • 读取第一条消息,且可重复读
    • 读取最新消息
    • 读取并阻塞等待最新消息
    • BUG——漏读消息

基于Stream的消息队列——消费者组

消费者组(Consumer Group)

将多个消费者划分到一个组中,监听同一个队列。具备下列的特点:

        消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度

        消息标识消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费

        消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。【解决消息丢失问题】

消费者组常见命令

  • 创建消费者组——XGROUP CREATE 
  • 删除消费者组——XGROUP DESTORY
  • 添加消费者到消费组——XGROUP  CREATECONSUMER
  • 将消费者移除消费组——XGROUP  DELCONSUMER
  • 从消费者组读取信息——XREADGROUP 

4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能

步骤说明

①创建一个Stream类型的消息队列,名为stream.orders
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

4.10.4.1 创建Stream消息队列

 1. 登录redis

2. 创建队列和消费者组

4.10.4.2 修改秒杀资格判断的Lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by good boy.
--- DateTime: 2024/9/16 20:32
---

-----------判断秒杀资格的lua脚本---------------------------------
local voucherId = ARGV[1]   -- 优惠卷id
local userId = ARGV[2]      -- 用户id
local orderId = ARGV[3]    -- 订单id

local stockKey = 'seckill:stock:' .. voucherId  -- 库存key
local orderKey = 'seckill:order:' .. voucherId  -- 订单key

-- 脚本开始
-- 1. 判断库存是否充足
if( tonumber(redis.call('get', stockKey))  <=0 ) then
    -- 库存不足返回1
    return 1
end

-- 2. 判断用户是否下单
if( redis.call('sismember', orderKey, userId) == 1 ) then
    -- 用户已经下单返回2
    return 2
end
-- 3. 扣减库存 incrby -1
redis.call('incrby', stockKey, -1)

-- 4. 下单(保存用户)
redis.call('sadd', orderKey, userId)

-- 5. 发送消息到队列中 XADD [队列名]stream.orders * k1 v1 k2 v2 ...
redis.call('XADD','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)

-- 6. 返回0
return 0

4.10.4.3 完善秒杀下单代码及其代码说明
1. 首先是修改预加载Lua脚本信息
/** 方案二、三公共代码
     * 预加载lua脚本
     */
    private static DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        // 这是第二种方案需要执行的lua脚本
        // SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/seckill.lua"));
        // 这是第三种方案需要执行的lua脚本
        SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/streamSeckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);

    }
2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本
    /**
     *  秒杀优惠券下单------秒杀优化代码----lua脚本---主线程---使用Redis stream的消息队列完成的
     */
    private IVoucherOrderService proxy;
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();
        // 获取订单id
        long orderId =  redisIdWorker.nextId("order");

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString(),String.valueOf(orderId)
        );
        //2.判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //3.不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
        }

        //5.返回订单id
        return Result.ok(orderId);
    }
3. 重头戏,开启线程任务!!

【注意特意标明】【黑马点评】已解决java.lang.NullPointerException异常-CSDN博客

项目启动时,init()方法执行,开启线程任务VoucherOrderHandler()方法。

进入到VoucherOrderHandler()方法,我们不断循环尝试去去消息队列中的信息,

        获取成功执行handleVocherOrder()方法创建订单

        获取失败: 说明没有消息 ---->继续循环

        出现异常执行handlePendingList()方法处理PendingList的异常

handleVocherOrder()方法和之前java阻塞队列方案写法一致,不过多赘述。

进入到handlePendingList()方法后,一样循环获取PendingList中的消息

        获取成功: 那就反过来调用handleVocherOrder()方法,执行订单创建

        获取失败: 说明Pending List没有消息 ---->结束循环

        出现异常休眠一段时间后自动回到VoucherOrderHandler()方法中的下一次循环

// 1,创建-- 秒杀线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //2. 初始化方法  一初始化就执行
    @PostConstruct
    public void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //3. 创建线程任务用于接收消息队列的信息
    private class VoucherOrderHandler implements Runnable{

        // 消息队列名称
        private String queueName = "stream.orders";


        @Override
        public void run() {
            while (true) {
                try{
                    //1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.oredes >
                    // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
                    List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2. 判断消息获取是否成功
                    if( list == null || list.isEmpty()){
                        //2.1 获取失败 说明没有消息 ---->继续循环
                        continue;
                    }
                    // 解析消息中的订单信息
                    MapRecord<String,Object,Object> record = list.get(0);
                    //  获取键值对集合
                    Map<Object,Object> values = record.getValue();
                    // 获取订单信息
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);

                    //3. 获取成功,执行订单创建
                    handleVocherOrder(voucherOrder);

                    //4. ACK确认  SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());

                }catch (Exception e) {
                    // 消息没有被ACK确认 进入Pending List
                    log.error("订单处理出现异常",e);
                    handlePendingList();
                }
            }
        }

        // 5.取不到订单————— 处理Pending List中的订单信息
        private void handlePendingList(){
            while (true) {
                try {
                    //1. 获取Pending List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1  STREAMS stream.oredes 0
                    // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2. 判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        //2.1 获取失败 说明Pending List没有消息 ---->结束循环
                        break;
                    }

                    // 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    //  获取键值对集合
                    Map<Object, Object> values = record.getValue();
                    // 获取订单信息
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);

                    //3. 获取成功,执行订单创建
                    handleVocherOrder(voucherOrder);

                    //4. ACK确认  SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());

                } catch (Exception e) {
                    log.error("Pending List订单处理出现异常", e);
                    try {
                        Thread.sleep(20);
                    }catch (InterruptedException interruptedException){
                        interruptedException.printStackTrace();
                    }
                }
            }
        }
    }


        // 4. 取到了订单—————创建订单
        private void handleVocherOrder(VoucherOrder voucherOrder){
            // 获取用户
            Long userId = voucherOrder.getUserId();

            // 1. 创建锁对象
            RLock lock =  redissonClient.getLock("lock:order:" + userId);

            //2. 尝试获取锁
            boolean isLock = lock.tryLock();

            // 3. 判断锁是否获取成功
            if(! isLock){
                log.error("不允许重复下单");
            }
            try {
                proxy.createVoucherOrder(voucherOrder);
            } finally {
                // 4. 释放锁
                lock.unlock();
            }
        }

4.10.4.4 秒杀下单功能测试
1. 简单测试功能是否实现,使用postman发送请求

2. 利用jmeter测试高并发情况

老规矩10个token

这次我们先测试一下 10个线程请求 10个不同用户 抢5张卷的情况

测试一下 1000个线程请求 10个不同用户 抢5张卷的情况

测试一下 10000个线程请求 10个不同用户 抢5张卷的情况

吞吐669

4.11 总结情况

        思路好像跟着来的,但是耗时和吞吐量始终是上不去,我上网看了看大家做的情况,好像都没有黑马一千多甚至两千的吞吐量。我感觉其实stream实现的消息队列相比java实现的阻塞队列性能上其实也没有优化到哪去,只是更加灵活可靠了吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2147842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jenkins 构建后操作(Send build artifacts over SSH)

Jenkins 构建后操作(Send build artifacts over SSH) 针对Jenkins部署项目的注意事项 配置Send build artifacts over SSH SSH Server,这是一个系统配置 配置地址&#xff1a;系统管理 -> 系统配置 ->SSH Server 注意1&#xff1a;记得点一下高级里面有一个密码配置&…

10 vue3之全局组件,局部组件,递归组件,动态组件

全局组件 使用频率非常高的组件可以搞成全局组件&#xff0c;无需再组件中再次import引入 在main.ts 注册 import Card from ./components/Card/index.vuecreateApp(App).component(Card,Card).mount(#app) 使用方法 直接在其他vue页面 立即使用即可 无需引入 <templat…

240919-Pip先在线下载不安装+再离线安装

A. 最终效果 # 使用modelscope sdk下载模型 import os os.environ[MODELSCOPE_CACHE] 您希望的下载路径from modelscope import snapshot_download model_dir snapshot_download(opendatalab/PDF-Extract-Kit) print(f"模型文件下载路径为&#xff1a;{model_dir}/model…

【新手上路】衡石分析平台使用手册-认证方式

认证方式​ 用户登录衡石系统时&#xff0c;系统需要对输入的用户名和密码进行验证&#xff0c;保证系统的安全。衡石提供 CAS、SAML2、OAUTH2等多种单点登录认证方式。在 SSO 单点登录中&#xff0c;衡石是服务提供者 SP&#xff08;Service Provider&#xff09;为用户提供所…

synchronized是怎么实现的?

synchronized是JVM的语法糖&#xff0c;主要是通过JVM来控制的。其实现原理依赖于JVM的监视器和对象头。 synchronized修饰方法时&#xff0c;JVM会通过编译完的字节码的访问标记来区分该方法是否被synchronized修饰&#xff0c;在进入方法的时候就会进行获得锁的竞争&#xff…

鸿蒙媒体开发系列06——输出设备与音频流管理

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧&#xff01;扫描下方名片&#xff0c;关注公众号&#xff0c;公众号更新更快&#xff0c;同时也有更多学习资料和技术讨论群。 1、音频输出设备管理 有时设备同时连接多个音频输出设备&#xff0c;需要指定音频输…

python 爬虫 selenium 笔记

todo 阅读并熟悉 Xpath, 这个与 Selenium 密切相关、 selenium selenium 加入无图模式&#xff0c;速度快很多。 from selenium import webdriver from selenium.webdriver.chrome.options import Options# selenium 无图模式&#xff0c;速度快很多。 option Options() o…

栈、队列、链表

基于《啊哈&#xff01;算法》和《数据结构》&#xff08;人民邮电出版社&#xff09; 本博客篇幅较多&#xff0c;读者根据目录选择&#xff0c;不理解的可留言和私信。 栈、队列、链表都是线性结构。 三者都不是结构体、数组这种数据类型&#xff0c;我认为更像是一种算法…

面试必备!值得收藏!不容错过的100+ 大语言模型面试问题及答案

引言 大语言模型&#xff08;LLMs&#xff09;现在在数据科学、生成式人工智能&#xff08;GenAI&#xff0c;即一种借助机器自动产生新信息的技术&#xff09;和人工智能领域越来越重要。这些复杂的算法提升了人类的技能&#xff0c;并在诸多行业中推动了效率和创新性的提升。…

Windows如何查看已缓存的DNS信息

Windows server 2016如何查看已缓存的DNS信息 在Windows server 2016系统下&#xff0c;如何查看已缓存的DNS信息呢? 1.打开“运行”&#xff0c;输入cmd&#xff0c;点击“确定” 2.在命令行界面输入ipconfig /displaydns&#xff0c;按回车即可查看已缓存的dns信息

9月26日云技术研讨会 | SOA整车EE架构开发流程及工具实施方案

面向服务的架构&#xff08;Service Oriented Architecture, SOA&#xff09;实施需要复杂的基础技术作为支撑&#xff0c;伴随着整车硬件资源的集中化、车载以太网等高速通信技术在车内的部署&#xff0c;将在未来一段时间内成为行业技术研究和市场布局的热点。 近年来&#x…

分享几种方式获取免费精致的Live2d模型

文章目录 1、 Live2D官方示例数据集&#xff08;可免费下载&#xff09;2、模之屋3、unity商店4、直接b站搜索5、youtube6、BOOTH完结 1、 Live2D官方示例数据集&#xff08;可免费下载&#xff09; 官方提供了一些 Live2D实例模型给大家下载使用 地址&#xff1a;https://ww…

房屋租赁系统源码分享:SpringBoot + Vue 免费分享

这是一套使用 SpringBoot 与 Vue 开发的房屋租赁系统源码&#xff0c;站长分析过这套源码&#xff0c;推测其原始版本可能是一个员工管理系统&#xff0c;经过二次开发后&#xff0c;功能被拓展和调整&#xff0c;现已完全适用于房屋租赁业务。 源码说明&#xff1a; 该系统功…

一键生成高级感PPT封面,首推这个在线AI生成PPT软件!

PPT封面怎么做&#xff1f; ppt封面的重要性不言而喻&#xff0c;就像写文章讲究的“凤头”&#xff0c;一个漂亮的PPT封面&#xff0c;可以吸引观众的注意力&#xff0c;让人有意愿驻足下来听你演讲&#xff0c;才会有后面更多的故事发生。 漂亮的ppt封面怎么做&#xff1f;…

dll文件丢失怎么恢复?10种dll修复方法任你选,一次学会!

dll文件丢失怎么恢复&#xff1f;dll文件丢失在多个Windows 版本中都是常见的问题&#xff0c;包括win7/win8/win10和 win11。这类错误通常与一些特定的dll文件有关&#xff0c;比如MSVCR110.DLL、MSVCR71.DLL、d3compiler_43.DLL、LogiLDA.DLL、MSVCP140.DLL、api-ms-win-crt-…

组装电脑-电脑配置

键盘、鼠标&#xff1a;买一百多的机械盘。 主板 电脑台式机主板是计算机最基本的同时也是最重要的部件之一&#xff0c;它在整个计算机系统中扮演着举足轻重的角色。以下是对它的详细介绍&#xff1a; 基础功能&#xff1a; 主板作为计算机的核心部件&#xff0c;负责连接和…

【图像检索】基于颜色模型的图像内容检索,matlab实现

博主简介&#xff1a;matlab图像代码项目合作&#xff08;扣扣&#xff1a;3249726188&#xff09; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于颜色模型的图像内容检索&#xff0c;用matlab实现。 一、案例背景和算法介绍 这…

inBuilder低代码平台新特性推荐-第二十四期

今天给大家带来的是 inBuilder 低代码平台新特性推荐第二十四期 ——表单格式支持流程配置。 场景介绍&#xff1a; 如下图所示&#xff0c;目前支持在流程设计上的不同节点设置表单字段的必填、显隐等属性控制&#xff0c;不必在表单设计上进行配置&#xff0c;从而减少了开…

VS Code远程连接虚拟机

VS Code远程连接虚拟机 1.下载vscode2.打开VS Code下载Remote-SSH插件1.修改相关信息 3.虚拟机检查或安装ssh4.检查虚拟机服务是否安装成功5.开启ssh&#xff0c;并检查是否开启成功 1.下载vscode 2.打开VS Code下载Remote-SSH插件 1.修改相关信息 2. 3.虚拟机检查或安装ssh…

同一个单元格内包含标签和文本框

<!DOCTYPE html> <html> <head> <title>单元格内包含标签和文本框</title> <style> /* 可选的CSS样式&#xff0c;用于美化表格 */ table { width: 50%; /* 设置表格宽度为页面宽度的50% */ border-collapse: collapse; /* 合并…