5. Redis优化秒杀、Redis消息队列实现异步秒杀

news2025/1/8 3:55:33

文章目录

  • Redis优化秒杀、Redis消息队列实现异步秒杀
  • 一、秒杀优化
    • 1.1 回顾“一人一单”秒杀业务代码
    • 1.2 异步秒杀思路
    • 1.3 基于Redis完成秒杀资格判断
      • 1.3.1 修改VoucherServiceImpl
      • 1.3.2 Lua脚本编写
      • 1.3.3 Redis+lua判断用户是否抢购成功
      • 1.3.4 基于阻塞队列实现异步秒杀下单
      • 1.3.4 秒杀初步优化总结
  • 二、Redis消息队列
    • 2.1 基于List实现消息队列
    • 2.2 基于PubSub实现消息队列
    • 2.3 基于Stream的消息队列
      • 2.3.1 Stream 单消费模式
        • 2.3.1.1 发送消息
        • 2.3.1.2 读取消息
        • 2.3.1.3 STREAN类型消息队列的XREAN命令特点
      • 2.3.2 Stream消费者组模式
        • 2.3.2.1 创建消费者组
        • 2.3.2.2 读取消息
        • 2.3.2.3 确认消息
        • 2.3.2.4 查看pending-list
        • 2.3.2.5 消费者组 - 消费者监听消息基本思路
        • 2.3.2.6 STREAN类型消息队列的XREANGROUP命令特点
    • 2.4 总结
  • 三、基于Stream消息队列实现异步秒杀

Redis优化秒杀、Redis消息队列实现异步秒杀

承接Redis - 优惠券秒杀、库存超卖、分布式锁、Redisson文章

一、秒杀优化

1.1 回顾“一人一单”秒杀业务代码

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    @Override
    public Result seckillVoucher(Long voucherId) {
//      1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//      2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            return Result.fail("秒杀活动尚未开始");
        }
//      3.判断秒杀是否结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            return Result.fail("秒杀活动已经结束");
        }
//      4.判断库存是否充足
        if (voucher.getStock() < 1) {
            return Result.fail("库存不足");
        }
        UserDTO user = UserHolder.getUser();
//      创建锁对象
//      锁定的范围与之前一样。切记不能把order锁住,范围太大了,以后有关order的都被锁住了
//      之前的方式:SimpleRedisLock lock = new SimpleRedisLock("order:" + user.getId(), stringRedisTemplate);
//      TODO 使用Redisson客户端获取锁对象
        RLock lock = redissonClient.getLock("lock:order:" + user.getId());
//      获取锁
//      订单大概是500ms,我们这里可以设定为秒
        boolean isLock = lock.tryLock();
//      判断是否获取锁成功
        if (!isLock) {
//      不成功
//      我们要避免一个用户重复下单,既然获取锁失败,说明在并发执行,我们要避免并发执行
            return Result.fail("不允许重复下单");
        }
//      成功
//      createVoucherOrder方法执行过程中可能会有异常,我们放到try...catch中
        try {
//          获取当前对象的代理对象 强转
            VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }finally {
//          出现异常做锁的释放
            lock.unlock();
        }

    }

    //  如果在方法上添加synchronized,说明同步锁是this,当前对象
//  不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
//  所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
    @Transactional
    public Result createVoucherOrder(Long voucherId) {
//      新增一人一单的判断
        UserDTO user = UserHolder.getUser();
//      user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
//      我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回

//      查询订单
        int count = query().eq("user_id", user.getId())
                .eq("voucher_id", voucherId).count();
//      判断是否存在
        if (count > 0) {
//      用户至少下过一单,不能再下了
            return Result.fail("一人一单,不可重复下单");
        }
//      说明没买过,继续执行代码
//      5.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1") //set stock = stock-1
                .eq("voucher_id", voucherId) //where  voucher_id= voucherId
                .gt("stock", 0)//where  stock>0
                .update();
        if (!success) {
            return Result.fail("扣减失败,可能库存不足");
        }

//      6.创建订单
//      我们只管订单id,代金券id,下单用户id
        VoucherOrder voucherOrder = new VoucherOrder();
//      6.1 订单id
//      使用自定义id生成器生成id
        long orderID = redisIdWorker.nextId("order");
        voucherOrder.setId(orderID);
//      6.2 用户id
//      我们之前编写的登录拦截器
        voucherOrder.setUserId(user.getId());
//      6.3 代金券id
        voucherOrder.setVoucherId(voucherId);

        save(voucherOrder);

//      7.返回订单id
        return Result.ok(orderID);

    }
}

代码中有大量数据库的操作,整个业务性能并不是很好

平均耗时达到了497毫秒

image-20230702151227074

1.2 异步秒杀思路

首先回顾一下之前秒杀业务的流程

  • 前端发起请求到达我们的Nginx,然后Nginx会把我们的请求负载均衡到我们的tomcat

  • 而在tomcat中执行各种逻辑,查询优惠券、判断秒杀库存、查询订单、校验一人一单、减库存、创建订单,整个业务的耗时就是各种逻辑耗时之和

    其中查询优惠券、查询订单、减库存、创建订单四步访问数据库,数据库的并发能力本身比较差,更何况减库存和创建订单是对数据库的写操作,为了避免安全问题还加了分布式锁,也使得业务性能大打折扣

image-20230702151959540

怎么改进一下呢?

将业务逻辑分成两个不同的线程,主线程的作用就是判断用户的购买资格,如果有资格,就开启一个独立的线程来处理耗时较久的减库存、创建订单操作,这样一来,效率大大提高

Redis的性能要比MYSQL的性能好,我们完全可以把判断秒杀库存操作和检验一人一单操作放到Redis中去做

主线程进来以后首先找Redis完成对于秒杀资格的判断,如果有资格就执行后续的减库存、下单操作

image-20230702152951306


我们怎么在Redis中判断秒杀库存和检验一人一单呢?

我们需要把优惠券的库存信息和相关的订单信息缓存在redis中,我们应该选择一个什么样的数据结构?

  • 库存比较简单,只需要一个普通String结构即可,key是优惠券的id,值是库存的值,将来在做库存判断的时候,看一下值是否大于0

    当判断有库存的时候,value值要减1,先提前先减这

image-20230702153821088

  • 一人一单,我们需要在Redis中记录当前的优惠券被哪些用户购买过,以后再有用户来的时候,我们只需要判断此用户是否购买过

    对此我们采用set集合,此可以确保元素的唯一性,可以在一个key中保存多个值,以后有用户下单成功后,我们就记录用户的id,再用更多用户来,我们依次记录就行。若发现set集合中已经存在了,那肯定不允许购买了

image-20230702153828603

最终在Redis的流程图

image-20230702154003313

对Redis的判断有很多的判断,业务流程比较长,我们必须确保执行时的一个原子性,这时就需要采用Lua脚本了。

这一部分不需要写java代码,需要Lua脚本

我们进入到Tomcat中需要做的是什么?

耗时较久的下单、减库存等核心写操作,并没有在当前流程中出现

image-20230702154259458

什么时候做下单减库存的操作呢?

我们开启一个独立的线程来读取提前保存好的用户信息、优惠券信息,就可以完成异步数据库的写操作,当我们返回订单id给用户的那一刻,用户就可以付款了,秒杀业务已经结束了

所以我们什么时候将优惠券信息、用户信息写入到数据库里,完成下单减库存的操作,其实就没有那么重要了(实效性要求不高),根据MySQL数据库能够承受的频率去将数据写入数据库

1.3 基于Redis完成秒杀资格判断

案例:改进秒杀业务,提高并发性能

需求

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

1.3.1 修改VoucherServiceImpl

新增秒杀优惠券的同时,将优惠券信息保存到Redis中

优惠券库存信息永久保存,没有过期时间

    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
//      TODO 优惠券库存信息保存到Redis中
        stringRedisTemplate.opsForValue().set("seckill:stock:"+voucher.getId(),voucher.getStock().toString());
    }

测试一下

{
    "shopId": 1,
    "title": "100元代金券",
    "subTitle": "周一至周五均可使用",
    "rules": "全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食",
    "payValue": 8000,
    "actualValue": 10000,
    "type": 1,
    "stock": 10,
    "beginTime": "2022-01-25T10:09:17",
    "endTime": "2025-01-26T12:09:04"
}

image-20230702164847461

image-20230702164933373

第一个需求完成

1.3.2 Lua脚本编写

基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

编写Lua脚本

要使用乱脚本完成下面的内容

image-20230702154003313

-- 1. 参数列表
-- 1.1 优惠券id
--   需要去redis中读取库存数量,判断是否充足,其中key的前缀seckill:stock是固定的,后面的id是需要传入的
local voucherId = ARGV[1]

-- 1.2 用户id
--   需要知道用户id,才能判断用户之前是否下过单
local userId = ARGV[2]

-- 2.数据相关key
-- 2.1 库存key, lua中是用 .. 拼接字符串
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key,值是一个set集合,集合名称就是下面。内容是购买订单的用户的id,这样可以记录谁购买了谁没有购买
local orderKey = 'seckill:order:' .. voucherId

-- 3.lua脚本业务
-- 3.1 判断库存是否充足
--  redis.call('get',stockKey)得到的结果是字符串,是无法和数字比较的
if (tonumber(redis.call('get',stockKey))<=0) then
-- 3.2库存不足
    return 1
end
-- 3.2判断用户是否下单
--   借助命令SISMEMBER命令,判断一个给定的值是不是当前set集合中的一个成员,如果存在返回1,不存在返回0
if (redis.call('SISMEMBER',orderKey,userId) ==1) then
-- 3.3redis中存在,说明是重复下单
    return 2
end

-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单,保存用户 sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 成功返回0
return 0

1.3.3 Redis+lua判断用户是否抢购成功

如果抢购成功,将优惠券id和用户id封装后存入阻塞队列,完成下面的操作

image-20230702171426452

改造VoucherOrderServiceImpl类

现在还没有编写阻塞队列

    //  TODO 获取脚本
//  RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    //  TODO 静态代码块做初始化
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
//      TODO 借助Spring提供的方法区resource下找
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
//      TODO 配置一下返回值类型
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
//      TODO 1.执行lua脚本
//      我们没有传入key,所以传入一个空集合即可
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
//      TODO 2.判断结果是否为0(0有购买资格,不为0没有购买资格)
        int r = result.intValue();
        if (r!=0){
//      TODO 2.1 不为0没有购买资格
            return Result.fail(r==1 ? "库存不足":"不能重复下单");
        }
//      TODO 2.2 为0有购买资格,把下单的信息保存到堵塞队列,后续可以异步完成下单业务
//      生成订单id
        long orderId = redisIdWorker.nextId("order");
//      组合队列有点麻烦,先写到这里
//      TODO 3. 返回订单id
        return Result.ok(orderId);
    }

image-20230702173652145

image-20230702173724017

image-20230702173749252

image-20230702173902600

1.3.4 基于阻塞队列实现异步秒杀下单

我们刚刚编写的代码没有编写阻塞队列,下面来编写一下

完成下面功能

  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

这个地方就是将抢单成功的用户id,还有优惠券的id,以及相关的订单id封装一下放到一个队列里。

队列里放的就是所有等待下单的订单的信息

而后我们会开启一个线程的任务,异步去执行相关订单的操作,这样后下单就不会影响到我们整个业务的耗时

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    //TODO 获取脚本
//  RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    //静态代码块做初始化
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
//      借助Spring提供的方法区resource下找
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
//      配置一下返回值类型
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //  TODO 创建堵塞队列
//  有一个特点:当一个线程尝试从这个队列里获取元素时,如果没有元素,这个线程就会堵塞,知道队列中有元素他才会被唤醒
//  1024*1024表示队列的长度
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
    //  TODO 创建一个线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct//当前类初始化完毕后执行这个方法
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //  TODO 创建一个线程任务
//  什么时候执行这个任务呢?用户秒杀抢购之前,因为一旦用户开始秒杀,阻塞队列中就会有新的订单,这个任务就应该去取出订单相关信息
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
//          不断的从堵塞队列orderTasks取出信息,然后执行
            while (true) {
//              take就是一个阻塞方法,获取队列中的头部,如果没有就等到有
                try {
//                  TODO 1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
//                  TODO 2.创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
//                    e.printStackTrace();
                    log.error("处理订单异常", e);
                }
            }
        }
    }

    /**
     * 订单处理方法
     * 我们在Redisson脚本中加了一个次锁了,在这里为什么还要再加一次锁呢?
     * 做一个兜底方案,万一Redis出现问题,有个保证
     *
     * @param voucherOrder
     */
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
//      TODO 1.获取用户
        Long userId = voucherOrder.getUserId();
//      TODO 2.创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
//      TODO 3.获取锁
        boolean isLock = lock.tryLock();
//      TODO 4.判断是否获取锁成功
        if (!isLock) {
//          获取锁失败
            log.error("不允许重复下单");
            return;
        }
//      TODO
        try {
//            之前这种获取代理对象的方式是获取不到的,因为不在同一个线程下面了
//            VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
//            return proxy.createVoucherOrder(voucherId);
//            TODO 在主线程获取,这个地方使用
            proxy.createVoucherOrder(voucherOrder);
        } finally {
//          出现异常做锁的释放
            lock.unlock();
        }

    }

    private VoucherOrderServiceImpl proxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
//      1.执行lua脚本
//      我们没有传入key,所以传入一个空集合即可
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
//      2.判断结果是否为0(0有购买资格,不为0没有购买资格)
        int r = result.intValue();
        if (r != 0) {
//      2.1 不为0没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
//      TODO 2.2 为0有购买资格,把下单的信息保存到堵塞队列,后续可以异步完成下单业务
//      TODO 阻塞队列,将用户id,订单id,优惠券id,保存到阻塞队列中
        VoucherOrder voucherOrder = new VoucherOrder();
//      TODO 2.3 生成订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
//      TODO 2.4 用户id
        voucherOrder.setUserId(userId);
//      TODO 2.5 代金券id
        voucherOrder.setVoucherId(voucherId);
//      TODO 2.6 放入阻塞队列
        orderTasks.add(voucherOrder);
//      TODO 3.获取代理对象
        proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();


//      3. 返回订单id
        return Result.ok(orderId);
    }


    //  如果在方法上添加synchronized,说明同步锁是this,当前对象
//  不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
//  所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
//      新增一人一单的判断
        Long userId = voucherOrder.getUserId();
//      user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
//      我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回

//      查询订单
        int count = query().eq("user_id", userId)
                .eq("voucher_id", voucherOrder.getVoucherId()).count();
//      判断是否存在
        if (count > 0) {
//      用户至少下过一单,不能再下了
            log.error("用户至少下过一单,不能再下了");
            return;
        }
//      说明没买过,继续执行代码
//      5.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1") //set stock = stock-1
                .eq("voucher_id", voucherOrder.getVoucherId()) //where  voucher_id= voucherId
                .gt("stock", 0)//where  stock>0
                .update();
        if (!success) {
            log.error("扣减失败,可能库存不足");
            return;
        }
//      TODO 创建订单
        save(voucherOrder);
    }
 }
}

1.3.4 秒杀初步优化总结

  • 秒杀业务的优化思路是什么?

    变同步下单为异步下单

    同步下单:请求来了之后判断有没有资格,有资格就自己下单,由于各种原因,耗时非常久

    异步下单:业务分成两部分,一部分对于抢购资格的判断,如果有资格就立即结束,返回订单号;另一部分执行耗时较久的业务

    • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
    • 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于堵塞队列的异步秒杀存在哪些问题?

    • 内存限制问题

    我们使用的是JDK中堵塞队列,使用的JVM内存,如果不加以限制,在高并发的情况下,可能会有无数的订单对象需要去创建并放到堵塞队列里面,会导致内存溢出(所以设置了一个队列的长度)

    如果队列中存满了,就存不进去了

    • 数据安全问题

    我们现在基于内存保存这些订单信息,如果说服务突然宕机,内存中所有的订单信息就都丢失了,用户付款了但是后台没有相关的订单数据

二、Redis消息队列

消息队列(Message Queue),存放消息的队列

最简单的消息队列模型包括下列角色

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

消息队列的好处

解除耦合,提高效率

image-20230704145732533

想象成快递员、外卖柜、消费者

假设一个快递员来投递快递(没有外卖柜),结果没有快递柜就送货上门,但是消费者正在公司上班工作,此时用户就和快递员说等等我,如果快递员等的话就浪费了快递员的时间,如果不等的话用户可能找不到外卖

有了快递柜就不一样了,外卖员取到物品放到外卖柜即可,然后用户什么时候有时间,什么时候就去取,此时快递员和消费者之间就解除耦合了

对于秒杀操作相同,有人抢购商品时,我们不着急真正下单,先判断有没有资格,如果有购买的资格,这个时候不要去写入数据库,而是把信息存入到消息队列中

这个时候我们开启一个独立的线程作为消费者,他不断的从队列里获取消息,真正的完成下单功能

image-20230704151415551

功能和读者队列差不多,但是有两点不同

  • 消息队列时独立在JVM服务之外的独立服务,不受JVM内存限制
  • 消息队列不仅仅做队列存储,还要确保数据的安全,要做持久化,不管服务宕机还是重启,数据不会丢失,消息投递给消费者以后,要求消费者做消息的确认,如果消费者没有确认,消息就会在队列中依然存在,下一次会再投递给消费者,让它继续处理,这个过程直到成功为止(确保消息至少被消费一次)

Redis提供了三种不同的方式来实现消息队列

  • List结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

之前也学习过两个消息队列

RabbitMQ基础介绍及同步通讯及异步通讯_mq怎么同步_我爱布朗熊的博客

SpringAMQP-Basic Queue、Work Queue、Fanout、Direct、Topic

2.1 基于List实现消息队列

Redis的List数据结构是一个双向链表,很容易模拟出队列效果

队列时入口和出口不在一遍,因此我们可以利用LPUSH结合RPOP或者RPUSH结合LPOP实现

List有关命令:Redis命令——通用命令、String类型、Key层级结构、Hash类型、List类型、Set类型、SortedSet类型_redis指令查看层级结构

image-20230704153220263

但是!!当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并代码消息

因此这里应该使用BROPO或者BLPOP来实现阻塞效果

相比与JDK中阻塞队列此种方式有什么好处

  • 独立于JVM之外的独立存储,不依赖于JVM内存

  • 数据安全,Redis支持数据持久化,存储到队列中后,数据就不会丢失

  • 可以满足消息有序性(队列的特性:先进先出)

缺点是什么

  • 无法避免消息丢失

    如果我们从Redis将消息取出,还没有执行完,挂掉了,那这个任务在Redis中就没有了,无法再次执行了

  • 只支持单消费者

    一旦任务被一个消费者拿走后,就从队列移除了,其他消费者不能够拿到。无法实现一条消息被很多人消费这样的需求

2.2 基于PubSub实现消息队列

PubSub(发布订阅)是Redis 2.0版引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关信息

相关命令

  • SUBSCRIBE channel [channel] : 订阅一个或多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道。pattern是通配符

?:代表一个字符

* :代表0个或多个字符

[ae]:表示可以是a字符,也可以是e字符

生产者:发送消息一方基本上没有什么变化,只不过发送消息的时候要带上通道名称

消费者:可以允许有多个消费者订阅,订阅的时候可以指定自己的频道名称

image-20230704170141136

下面可以演示一下

消费者1进行订阅,发现天生就是堵塞式的

image-20230704170315760

消费者2进行订阅

image-20230704170431082

生产者进行发布消息,两个消费者可以收到消息

image-20230704170518802

基于PubSub消息队列有哪些特点?

优点

  • 采用发布订阅模型,支持多生产、多消费

缺点

  • 不支持数据持久化

    List结构的本质不是一个消息队列,是一个链表,只不过我们当成消息队列来用了

    而PubSub本身设计出来就是来做消息发送的,如果我们发布出来的消息没有任何人订阅,那这个消息就丢失了

  • 无法避免消息丢失

    发布完没人收,就丢失了

  • 消息堆积有上限,超出时数据丢失

2.3 基于Stream的消息队列

Stream是Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

官网:Commands | Redis

2.3.1 Stream 单消费模式

2.3.1.1 发送消息

发送消息命令xadd

  • key:队列名称
  • NOMKSTREAM: 如果队列不存在,是否自动创建队列,默认自动创建,如果给了这个值,就不创建
  • MAXLEN | MINID [= | ~] threshold [LIMIT count]:设置消息队列的最大消息数量。比如说我们将消息队列最大消息数量设置为1000,如果超过1000后,会将一些旧的信息剔除。如果不给值的话就不设置上限
  • *|ID :指定消息唯一id,,*代表由Redis自动生成,格式是“时间戳-递增数字”
  • field value:代表字段和值,存到消息里面的消息体,被称为Entry,格式就是多个key-value键值对,一个键值对就是一个Entry

image-20230704195555522

例如:

image-20230704195258983

2.3.1.2 读取消息

读取消息方式之一:xread

  • Count count:每次读取消息的最大数量,可以一次读多条,也可以读取一条

  • BLOCK milliseconds:当没有消息时,是否堵塞、堵塞时长。

    BLOCK 如果不给参数就是不堵塞,如果有消息就直接返回,没有消息就返回为空;给参数是堵塞,有消息就返回,没消息就堵塞;

    milliseconds的就是等待的时长,若为0,就是一直等待

  • STREAMS KEY [KEY …]:要从哪个队列读取消息,key就是队列名。可同时指定对个队列

  • ID:起始消息id,只返回大于该ID的消息。

    0:代表从第一个消息开始

    $:代表从最新的消息开始

image-20230704200458361

测试一下

我们发现了,同一个消息可以读取两次。

读完之后不会删除,永久存在

image-20230704200657735

开发中我们可以循环调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果

while(true){
    //尝试读取队列中的消息,最多堵塞2秒
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    if(mes == null){
        continue;
    }
   // 处理消息
    handleMessage(msg);
}

注意!当我们指定其实ID为$时,代表读取最新的消息,如果我们处理一条消息过程中,有超过一条以上的消息到大队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

2.3.1.3 STREAN类型消息队列的XREAN命令特点

STREAN类型消息队列的XREAN命令特点

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

2.3.2 Stream消费者组模式

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备一下特点:

  • 消息分流

    队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度。从一定程度上可以避免消息堆积问题

  • 消息标示

    消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费

  • 消息确认

    消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完后需要通过XACK来确认消息,标记消息已处理,才会从pending-list移除。这样完美解决消息丢失问题

2.3.2.1 创建消费者组

创建消费者组

XGROUP create key groupName ID [MKSTREAM]
  • key:

    队列名称

  • groupName

    消费者组的名称

  • ID

    这个组监听消息的时候,从哪开始监听

    0:代表从第一个消息开始

    $:代表从最新的消息开始

  • MKSTREAM:队列不存在时自动创建队列

其他常见命令

#删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

#删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupnameconsumername

一般情况下不需要自己添加消费者,当我们从组中指定一个消费者并监听消息的时候,如果发现消费者不存在,会自动帮我们创建出来

2.3.2.2 读取消息

怎么监听消息?

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group

    消费组名称

  • consumer

    消费者名称,如果消费者不存在,会自动创建一个消费者

  • count

    本次查询的最大数量

  • BLOCK milliseconds

    当没有消息时,是否堵塞、堵塞时长。

    BLOCK 如果不给参数就是不堵塞,如果有消息就直接返回,没有消息就返回为空;给参数是堵塞,有消息就返回,没消息就堵塞;

    milliseconds的就是等待的时长,若为0,就是一直等待

  • NOACK

    参数代表不用消费者确认,如果不给这个参数代表需要消费者确认,所以一般我们不给这个参数

  • STREAMS KEY [KEY …]:

    要从哪个队列读取消息,key就是队列名。可同时指定对个队列

  • ID

    起始消息id

    “>”:从下一个未消费的消息开始

    其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

正常情况下,我们都应该给大于号,读那些未消费的消息,如果出现了异常情况,再去pending-list中读已消费但未处理的消息

测试一下

image-20230704205915340

我们上面读了好几条消息,但是从来没有确认过

一定要一个消息,确认一条消息

2.3.2.3 确认消息

怎么确认消息?

XACK key group ID [ID ...]
  • key

    队列名称

  • group

    组名称

  • ID

    要确认哪一条消息,将待处理的消息变成正确处理(已经处理)的消息,这样的话就会从pending-List中移除

测试

比如我们下面确认s1队列、g1组中后面五个id对应的消息

image-20230704210650274

2.3.2.4 查看pending-list

假如我们读取了一个消息,但是出现异常没有确认,此消息会放入pending-list中,那我们怎么查看一下pending-list

官网:XPENDING | Redis

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
  • key

    队列名称

  • group

    组名称

  • IDLE min-idle-time

    空闲时间。获取消息以后,确认之前的这段时间。

    比如说我们给了一个参数5000,那空闲时间超过5000ms以上的消息才进入pending-list队列

  • **start end **

    起始范围。pending-list中有很多消息,这两个参数就是告诉它我们像获取的最小id和最大id是什么

    -:减号代表最小的

    +:加号代表最大的

  • count

    我们要获取的数量

  • consumer

    要获取哪个消费者的pending-list

image-20230704212024902

读取pending-list中的第一条消息

就是正常读取

XREADGROUP GROUP g1 C1 COUNT 1 BLOCK 200 STREAMS S1 0

image-20230704212132332

2.3.2.5 消费者组 - 消费者监听消息基本思路

使用while循环一直去获取消费者组中的消息

image-20230704212908270

2.3.2.6 STREAN类型消息队列的XREANGROUP命令特点

首先具备XREAD单消费模式所有优点,并且弥补一些缺点

STREAN类型消息队列的XREAN命令特点

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取

STREAN类型消息队列的XREANGROUP命令特点

  • 消息可回溯

    消息被消费完了,并不会从队列中删除,其他人也可以来拿(这是针对不同的组)

  • 可以多消费者争抢消息,加快消费速度

  • 可以堵塞读取

  • 没有消息漏读的风险

    消费者组里面会去标记,上一次消费到哪里,下一次再来的时候就可以从上一次标记之后读取

  • 有消息确认机制,保证消息至少被消费一次

独立于JVM之外,不受JVM限制

可以在Redis中做持久化,安全性得到保证

2.4 总结

image-20230704214754792

三、基于Stream消息队列实现异步秒杀

需求

① 创建一个Stream类型的消息队列,名为stream.orders

MKSTREAM:队列不存在时自动创建队列

XGROUP CREATE stream.orders g1 0 MKSTREAM

② 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherld、userld、orderld

对于XADD中的参数*|ID :指定消息唯一id,,*代表由Redis自动生成,格式是“时间戳-递增数字”

改了两行代码

-- 1. 参数列表
-- 1.1 优惠券id
--   需要去redis中读取库存数量,判断是否充足,其中key的前缀seckill:stock是固定的,后面的id是需要传入的
local voucherId = ARGV[1]

-- 1.2 用户id
--   需要知道用户id,才能判断用户之前是否下过单
local userId = ARGV[2]
-- TODO 1.3 订单id
local orderId = ARGV[3]

-- 2.数据相关key
-- 2.1 库存key, lua中是用 .. 拼接字符串
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key,值是一个set集合,集合名称就是下面。内容是购买订单的用户的id,这样可以记录谁购买了谁没有购买
local orderKey = 'seckill:order:' .. voucherId

-- 3.lua脚本业务
-- 3.1 判断库存是否充足
--  redis.call('get',stockKey)得到的结果是字符串,是无法和数字比较的
if (tonumber(redis.call('get',stockKey))<=0) then
-- 3.2库存不足
    return 1
end
-- 3.2判断用户是否下单
--   借助命令SISMEMBER命令,判断一个给定的值是不是当前set集合中的一个成员,如果存在返回1,不存在返回0
if (redis.call('SISMEMBER',orderKey,userId) == 1) then
-- 3.3redis中存在,说明是重复下单
    return 2
end

-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单,保存用户 sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- TODO 3.6 发送消息到队列当中,XADD stream.orders * k1 v1 k2 v2.....
redis.call("xadd","stream.orders","*","userId",userId,"voucherId",voucherId,"id",orderId)
return 0

改造seckill秒杀业务逻辑

java代码确实删减了很多

    //TODO 获取脚本
//  RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    //静态代码块做初始化
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
//      借助Spring提供的方法区resource下找
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
//      配置一下返回值类型
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private VoucherOrderServiceImpl proxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
//      获取用户id
        Long userId = UserHolder.getUser().getId();
//      TODO 获取订单id,这不操作移到上面
        long orderId = redisIdWorker.nextId("order");
//      1.执行lua脚本
//      我们没有传入key,所以传入一个空集合即可
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(),String.valueOf(orderId)
        );
//      2.判断结果是否为0(0有购买资格,不为0没有购买资格)
        int r = result.intValue();
        if (r != 0) {
//      2.1 不为0没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
//      3.获取代理对象
        proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();

//      4. 返回订单id
        return Result.ok(orderId);
    }

③ 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

这一步直接看完整代码

 //创建一个线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct//当前类初始化完毕后执行这个方法
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {
        String queueName = "stream.orders";
        @Override
        public void run() {

            while (true) {
                try {
//                  TODO 1.获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
//                  为什么返回一个List?因为count不是1的话,可能会有多条消息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
//                          组是属于消费者的一部分信息,在这里统一叫做Consumer(一定是Spring包下的)
                            Consumer.from("g1", "c1"),
//                          StreamReadOptions.empty()创建一个空的,count(1)每次读取一条消息
//                          block(Duration.ofMillis(2000)),只能传入一个Duration参数
                            StreamReadOptions.empty().count(1).block(Duration.ofMillis(2000)),
//                          queueName消息队列名字,ReadOffset.lastConsumed()最近一个未消费消息
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
//                  TODO 2.判断消息获取是否成功
                    if(list ==null || list.isEmpty() ){
//                      TODO 2.1 如果获取失败,说明没有消息,继续下一次循环
//                      如果获取失败,说明没有消息,继续下一次循环
                        continue;
                    }

//                  TODO 3.解析消息中的订单信息
//                  明确知道count是1,所以直接获取0就行
                    MapRecord<String, Object, Object> record = list.get(0);
//                  消息id
                    RecordId id = record.getId();
//                  将Map转成VoucherOrder
                    Map<Object, Object> values = record.getValue();
//                  true表示出现错误忽略
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//                  TODO 3.如果获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
//                  TODO 4.ACK确认,XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",id);
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
//                  TODO 处理时消息时报异常了,消息没有被确认,会进入pending-list,我们在这里需要去pendingList取出来再去做处理
                    handlePendingList();
                }
            }
        }
        private void handlePendingList() {
            while (true) {
                try {
//                  TODO 1.获取pending-list中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1  STREAMS stream.orders 0
//                  为什么返回一个List?因为count不是1的话,可能会有多条消息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
//                          组是属于消费者的一部分信息,在这里统一叫做Consumer(一定是Spring包下的)
                            Consumer.from("c1", "g1"),
//                          StreamReadOptions.empty()创建一个空的,count(1)每次读取一条消息
                            StreamReadOptions.empty().count(1),
//                          queueName消息队列名字,ReadOffset.from("0"),读取pending-list第一条消息
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
//                  TODO 2.判断消息获取是否成功
                    if(list ==null || list.isEmpty() ){
//                      TODO 2.1 如果获取失败,说明pending-list没有异常消息,结束循环
//                      如果获取失败,说明没有消息,继续下一次循环
                        continue;
                    }

//                  TODO 3.解析消息中的订单信息
//                  明确知道count是1,所以直接获取0就行
                    MapRecord<String, Object, Object> record = list.get(0);
//                  消息id
                    RecordId id = record.getId();
//                  将Map转成VoucherOrder
                    Map<Object, Object> values = record.getValue();
//                  true表示出现错误忽略
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//                  TODO 3.如果获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
//                  TODO 4.ACK确认,XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",id);
                } catch (Exception e) {
                    log.error("处理订单异常", e);
//                  这个地方不用递归,直接下一次循环处理就行
                }
            }

        }
    }

    /**
     * 订单处理方法
     * 我们在Redisson脚本中加了一个次锁了,在这里为什么还要再加一次锁呢?
     * 做一个兜底方案,万一Redis出现问题,有个保证
     *
     * @param voucherOrder
     */
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
//      1.获取用户
        Long userId = voucherOrder.getUserId();
//      2.创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
//      3.获取锁
        boolean isLock = lock.tryLock();
//      4.判断是否获取锁成功
        if (!isLock) {
//          获取锁失败
            log.error("不允许重复下单");
            return;
        }
        try {
//            之前这种获取代理对象的方式是获取不到的,因为不在同一个线程下面了
//            VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
//            return proxy.createVoucherOrder(voucherId);
//            在主线程获取,这个地方使用
            proxy.createVoucherOrder(voucherOrder);
        } finally {
//          出现异常做锁的释放
            lock.unlock();
        }

    }

    private VoucherOrderServiceImpl proxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
//      获取用户id
        Long userId = UserHolder.getUser().getId();
//      获取订单id,这不操作移到上面
        long orderId = redisIdWorker.nextId("order");
//      1.执行lua脚本
//      我们没有传入key,所以传入一个空集合即可
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
//      2.判断结果是否为0(0有购买资格,不为0没有购买资格)
        int r = result.intValue();
        if (r != 0) {
//      2.1 不为0没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
//      3.获取代理对象
        proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();

//      4. 返回订单id
        return Result.ok(orderId);
    }


    //  如果在方法上添加synchronized,说明同步锁是this,当前对象
//  不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
//  所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
//      新增一人一单的判断
        Long userId = voucherOrder.getUserId();
//      user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
//      我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回

//      查询订单
        int count = query().eq("user_id", userId)
                .eq("voucher_id", voucherOrder.getVoucherId()).count();
//      判断是否存在
        if (count > 0) {
//      用户至少下过一单,不能再下了
            log.error("用户至少下过一单,不能再下了");
            return;
        }
//      说明没买过,继续执行代码
//      5.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1") //set stock = stock-1
                .eq("voucher_id", voucherOrder.getVoucherId()) //where  voucher_id= voucherId
                .gt("stock", 0)//where  stock>0
                .update();
        if (!success) {
            log.error("扣减失败,可能库存不足");
            return;
        }
//      创建订单
        save(voucherOrder);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/734655.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java024——String类

一、String类简单说明 1、单个字符可以用char类型保存&#xff0c;多个字符组成的文本就需要保存在String 对象中。 2、String是Java中的一个类 3、String 通常被称为字符串&#xff0c;一个String对象最多可以保存(2^32-1)个字节(占用4GB空间大小)的文本内容。 二、String类…

Jmeter 压测工具的安装及使用

目录 一、简介二、下载三、安装四、启动五、使用1.调整界面显示大小2.改为中文&#xff08;选&#xff09;3.添加线程组4.添加 HTTP 请求5.添加 HTTP 请求头6.保存压测配置7.启动压测8.查看结果树9.查看聚合报告 一、简介 官网地址&#xff1a; https://jmeter.apache.org/ JM…

HTML案例:滚动条下拉后导航栏有网格状效果

案例&#xff1a;在滚动条下拉的时候&#xff0c;导航栏是固定的&#xff0c;当下拉卷出的内容在导航条的范围内时&#xff0c;导航条的背景虚化为卷出的内容 固定定位的效果 需求效果&#xff1a; 1、HTML代码 <style>* {margin: 0;padding: 0;}nav {/* 固定定位 */pos…

穷且益坚,不坠青云之志——忆我的大学四年

文章目录 一、前言二、大学之前1.从小的经历以及家庭的困境2.一切的根源——自卑3.不愉快的初高中经历 三、大一&#xff1a;所谓追梦——改变的起点1.敏感而自卑的内心2.百团大战&#xff0c;与协会埋下不解之缘3.改变的契机——追梦App4.持续的学习&#xff0c;热爱铸就改变5…

vscode安装+配置+使用+调试【保姆级教程】

1. VScode是什么 Visual Studio Code简称VS Code&#xff0c;是一款跨平台的、免费且开源的现代轻量级代码编辑器&#xff0c;支持几乎主流开发语言的语法高亮、智能代码补全、自定义快捷键、括号匹配和颜色区分、代码片段提示、代码对比等特性&#xff0c;也拥有对git的开箱即…

Django_发送邮件

目录 一、开启SMTP服务并获取授权码 二、在Django的配置文件中添加邮箱服务配置 三、发送邮箱代码 源码等资料获取方法 使用django邮箱功能需要搭建smtp服务器&#xff0c;如果没有&#xff0c;可以使用第三方smtp服务器。 本文以第三方QQ邮箱服务器演示如何使用python的s…

从钢铁行业数字化管控平台的智能进化,看超自动化能力边界

文/王吉伟 钢铁行业的数字化转型&#xff0c;历来都是值得探讨的热点话题。 2022年&#xff0c;我国粗钢产量10.13亿吨&#xff0c;占据了全球钢铁供给市场的半壁江山。 这组数据可谓非常抢眼&#xff0c;但仍旧难掩诸多企业的各种经营问题。 钢铁生产过程工序众多&#xf…

CentOS环境下的JDK8安装

CentOS 安装 JDK8 安装JDK8 下载 官网&#xff1a;https://www.oracle.com/java/technologies/downloads/#java8-linux 将下载好的压缩包拷贝到根目录下 通过xshell如果出现 bash: rz: 未找到命令 &#xff0c;需要先运行下面的命令 yum -y install lrzsz解压 tar -zxvf …

ubuntu彻底卸载libreoffice和安装wps

一.卸载libreoffice 1. 打开ubuntu桌面&#xff0c;右键 打开终端 方法一 完全卸载 sudo apt-get remove libreoffice-common 根据提示输入管理员密码&#xff0c;然后根据提示 输入 Y 这样&#xff0c;等待卸载完成。 正常情况下上述命令执行完成后Libreffice的表格、绘图…

DevOps基础服务1——版本控制gitlab

文章目录 一、基本了解1.1 安装git客户端1.2 git命令1.2.1 本地仓库1.2.2 远程仓库 二、安装gitlab三、功能管理3.1 创建账号3.2 用户注册授权通知功能3.3 创建项目远程库3.4 ssh设置3.5 克隆远程库项目到本地3.6 上传本地项目代码到远程库3.7 授权用户查看项目权限 一、基本了…

华为OD机试真题 Java 实现【查找树中元素】【2023 B卷 100分】,附详细解题思路

目录 一、题目描述二、输入描述三、输出描述四、补充说明五、Java算法源码六、效果展示1、输入2、输出 一、题目描述 已知树形结构的所有节点信息&#xff0c;现要求根据输入坐标&#xff08;x,y&#xff09;找到该节点保存的内容值&#xff1b; 其中&#xff1a; x表示节点…

光场1.0——非聚焦型光场相机

本文概要 本文讲主要从光场硬件结构设计以及软件处理方式的层面来介绍一下光场的相关内容&#xff0c;关于光场的优势和具体应用点并不在本文的主要范围内。 光场1.0 1. 结构原理说明 首先来介绍一下光场相机&#xff0c;那么什么是光场相机呢&#xff0c;光场相机经历了两…

c语言实现单链表

#include<stdio.h> #include<malloc.h>typedef struct LNode{double data;struct LNode *next; }LNode,*LinkList;bool InitList(LinkList &L){L(LNode*) malloc(sizeof(LNode));//分配一个头结点 if(LNULL) //内存不足&#xff0c;分配失败 return false;L-&g…

机器学习与深度学习——自定义函数进行线性回归模型

机器学习与深度学习——自定义函数进行线性回归模型 目的与要求 1、通过自定义函数进行线性回归模型对boston数据集前两个维度的数据进行模型训练并画出SSE和Epoch曲线图&#xff0c;画出真实值和预测值的散点图&#xff0c;最后进行二维和三维度可视化展示数据区域。 2、通过…

day68_Vue基础

今日内容 零、 复习昨日 零、 复习昨日 一、Vue简介 1.1 简介 Vue (读音 /vjuː/&#xff0c;类似于 view) 是一套用于构建用户界面的渐进式的js框架&#xff0c;发布于 2014 年 2 月。与其它大型框架不同的是&#xff0c;Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注…

100、基于STM32单片机自动跟随小车 红外遥控控制小车避障模式 跟随模式设计(程序+原理图+PCB源文件+流程图+硬件设计资料+元器件清单等)

绪 论 智能小车通过各种感应器获得外部环境信息和内部运动状态&#xff0c;实现在复杂环境背景下的自主运动&#xff0c;从而完成具有特定功能的机器人系统。而随着智能化电器时代的到来&#xff0c;它们在为人们提供的舒适的生活环境的同时&#xff0c;也提高了制造智能化电器…

系统上线前,SQL脚本的9大坑

前言 系统上线时&#xff0c;非常容易出问题。 即使之前在测试环境&#xff0c;已经执行过SQL脚本了。但是有时候&#xff0c;在系统上线时&#xff0c;在生产环境执行相同的SQL脚本&#xff0c;还是有可能出现一些问题。 有些小公司&#xff0c;SQL脚本是开发自己执行的&am…

React Dva修改路由设置,不要井号

我们Dva项目的路由 他默认是设置了带井号的这种 其实我觉得到还可以 但是有些人会觉得不太美观 如果 你想去除他 那么 你先要在终端执行 npm install --save history将 history 引入进来 装好之后 我们来到src下的 index.js 加上如下代码 import {createBrowserHistory as …

nginx配置例子-反向代理实现

4.1 反向代理实现&#xff08;实例1&#xff09; 4.1.1需要实现的效果 (1)打开浏览器&#xff0c;在浏览器地址栏输入地址 www.123.com&#xff0c;跳转到liunx.系统tomat主页面中 4.1.2 准备工作 (1&#xff09;在liunx, 系统安装 tomcat, 使用默认端口8080. tomcat安装文…

【C++】-vector的具体使用(迭代器失效问题)

&#x1f496;作者&#xff1a;小树苗渴望变成参天大树&#x1f388; &#x1f389;作者宣言&#xff1a;认真写好每一篇博客&#x1f4a4; &#x1f38a;作者gitee:gitee✨ &#x1f49e;作者专栏&#xff1a;C语言,数据结构初阶,Linux,C 动态规划算法&#x1f384; 如 果 你 …