Reids实战—黑马点评(三)秒杀篇

news2024/12/27 17:01:51

Reids实战—黑马点评(三)秒杀篇

来自黑马的redis课程的笔记

【黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目】

在这里插入图片描述

目录

  • Reids实战—黑马点评(三)秒杀篇
    • 一、全局唯一ID
      • 小结
    • 二、实现优惠券秒杀下单
    • 三、超卖问题
      • 3.1 问题描述
      • 3.2 乐观锁和悲观锁
      • 3.3 乐观锁实现
        • 3.3.1 版本号法
        • 3.3.2 CAS(Compare And Swap)
      • 3.4 小结
    • 四、一人一单
      • 4.1 添加功能
      • 4.2 并发问题
      • 4.3 字符串问题
      • 4.4 Spring事务问题
        • 4.4.1 锁在事务内
        • 4.4.2 事务不生效
      • 4.5 集群模式下的一人一单问题
    • 五、分布式锁
      • 5.1 分布式锁概述
      • 5.2 基于redis实现分布式锁
        • 5.2.1 锁误删
        • 5.2.2 Lua脚本保证命令原子性
      • 5.3 小结
    • 六、Redisson
      • 6.1 使用Redisson分布式锁
      • 6.2 Redisson分布式锁原理
        • 6.2.1 可重入原理
        • 6.2.2 可重试原理
        • 6.2.3 解决超时释放
        • 6.2.4 保证主从一致
        • 6.2.5 小结
    • 七、redis优化秒杀
    • 八、Redis消息队列实现异步秒杀
      • 8.1 基于List结构模拟消息队列
      • 8.2 PubSub发布订阅模式
      • 8.3 基于Stream结构的消息队列
        • 8.3.1 基本使用
        • 8.3.2 消费者组
        • 8.3.3 改造秒杀业务
        • 8.3.4 小结

一、全局唯一ID

每一个订单都需要不同的ID,如何做到ID唯一?

如果似乎用自增:

  • id规律明显(安全问题)
  • 受单表数据量限制

为了解决这些问题,我们有了全局ID生成器,这是一种生成全局唯一ID(或者叫分布式唯一ID)的工具,所生成的ID满足以下特征:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性(复杂递增)

唯一性和递增性我们可以用redis自增来实现,恰好,redis本身就满足高可用、高性能,安全性如何解决?

方案:

在这里插入图片描述

用Long(数值效率比字符串高)类型,一个Long类型占64位(Java中),我们可以在这些bit位上做文章。

首先:从左数第一位,是符号位,我们的id永远为正数,所以让它永远为0。接下来的31位,用来存时间戳,保证安全性的同时,让id基本不可能重复。最后的32位,我们存普通的递增值。这个方案,理论上可以保证68年都不会有id重复。

为了避免最后的自增值超出redis自增限制,我们可以每天新建一个key用于自增(例如,yyyy:MM:dd),不仅解决了问题,还方便统计。

实现:

@Component
public class RedisIdWorker {
    private StringRedisTemplate stringRedisTemplate;
    // 起始的时间戳值
    private static final Long BEGIN_TIMESTAMP = 1640995200L;

    public RedisIdWorker (StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    /**
    * @param “业务key前缀“
    * */
    public long nextId(String keyPrefix) {
        LocalDateTime now = LocalDateTime.now();
        long  nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + data);
        // 位运算提高效率
        return timestamp << 32 | count;
    }
}

起始的时间戳的值可以这样计算:

@Test
void timeTest() {
    LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
    long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
    System.out.println(epochSecond);
}

测试:

ExecutorService executorService = Executors.newFixedThreadPool(500);
@Test
void idTest() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(300);
    Runnable task = () -> {
        for (int i = 0; i < 100; i++) {
            redisIdWorker.nextId("order");
        }
        latch.countDown();
    };
    long begin = System.currentTimeMillis();
    for (int i = 0; i < 300; i++) {
        executorService.submit(task);
    }
    // 必须阻塞 如果不阻塞,则程序结束时,我们的任务还没有结束
    latch.await();
    long end = System.currentTimeMillis();
    System.out.println(end - begin);
}

收获的小技巧:

  1. LocalDateTime的使用

    // 使用of可以快速获取指定时间的LocalDateTime对象
    LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
    // 使用toEpochSecond可以快速获取相应时间的时间戳
    long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
    // 使用format方法快速格式化时间
    LocalDateTime now = LocalDateTime.now();
    String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    
  2. 并发包的小技巧

    // 快速创建线程池,但是阿里的Java开发规范不建议这样做
    ExecutorService executorService = Executors.newFixedThreadPool(500);
    // 快速创建一个任务,使用lambda表达式
    Runnable task = () -> {
        ……
    };
    // 提交任务
    executorService.submit(task);
    
  3. 位运算

    更好的利用bit位来处理信息

小结

在这里插入图片描述

uuid效率较低,且不满足自增性。

Redis自增,就是刚刚的方案,很常用。

雪花算法,类似于刚刚的方案,区别在于雪花算法的最后32位是以机器时间为基准,Redis是自增。

数据库自增,也就是数据库版本的Redis方案,效率不如Redis。

二、实现优惠券秒杀下单

按照流程图,实现非常简单,但会出现非常多的问题(详见后文)。

小收获:

使用lambdaUpdate

// 减库存操作
lambdaUpdate().setSql("stock = stock -1")
              .eq(SeckillVoucher::getVoucherId, voucherId)
              .update();

三、超卖问题

3.1 问题描述

在刚刚的流程中,是有很多问题隐患的,比如,经典的超卖问题

在并发量较高的情况下,假如剩余最后一张票,在购买最后一张票的线程还未扣减库存时,其他线程进入,查询库存,都会查到还有余票的结果,此时再进行库存判断,并扣减库存,就会发生超卖。

在这里插入图片描述

如何解决超卖问题?

最简单的方法就是上锁,同一时间只让一个线程去执行查库存->扣减库存的操作。

3.2 乐观锁和悲观锁

加锁的话有两种锁可以加:

悲观锁:比较悲观,认为线程安全问题一定会发生,因此操作数据前先获取锁,确保线程串行执行。(如synchronized、Lock)

乐观锁:比较乐观,认为线程安全问题不一定发生,所以不加锁,而是在更新数据时去判断数据有没有被其他线程修改。若没有则更新,反之重试或异常。

3.3 乐观锁实现

悲观锁的解决方案较为简单,即在可能出问题的代码上加synchronized或Lock锁住。我们介绍一下乐观锁的解决方案:

常见两种方法:

3.3.1 版本号法

每次更新数据时,更新版本号,并判断版本号是否被修改。

在这里插入图片描述

3.3.2 CAS(Compare And Swap)

以数据本身为版本号

3.4 小结

秒杀场景下,悲观锁性能较差,乐观锁成功率较低。

小优化:更新库存时,不用判断库存是否和查询到的库存相同,只需要库存大于零即可,这样成功率大大提高。

在这里插入图片描述

四、一人一单

4.1 添加功能

某些业务我们需要用户只能购买一单,于是我们多加一个用户是否已经购买的判断:

在这里插入图片描述

从ThreadLocal中取出当前用户的id,根据用户和商品id查询用户是否已经有订单存在,若存在则失败。

在这里插入图片描述

4.2 并发问题

按照刚刚的业务流程,正常用户单线程操作的情况下是没有问题的,但是若是恶意用户,同时开多个线程访问我们的接口,则可能出现这样一种情况:

若多个线程同时进入该方法,在任一线程未保存订单前,都进行查询用户是否下单,得到的count都是0,拿到这个0后,再去判断是否一人一单则都会成功,都会下单,造成了一人多单的问题。

解决:给该方法加锁,因为涉及两个表的写操作,我们添加事务。

在这里插入图片描述

写完过后以看,这不成了悲观锁嘛!里面的乐观锁还拿来干嘛呢?

经过思考:我们只需要限制单个用户的并发操作,只需要锁住userId即可,于是我们不再锁整个方法。

该方法改进为:

@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
    Long userid = UserHolder.getUser().getId();
    synchronized(userid.toString()) {
       	......
        return Result.ok(voucherOrder.getId());   
    }
}

4.3 字符串问题

接着我们会发现锁不住,因为我们toString方法会创建一个新的字符串,不同的线程的userid.toString()出来的字符串不是同一个对象。

解决:使用intern方法

@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
    Long userid = UserHolder.getUser().getId();
    synchronized(userid.toString().intern()) {
       	......
        return Result.ok(voucherOrder.getId());   
    }
}

使用intern方法,该方法会从JVM常量池中找equals为true的字符串,意味着不同的线程都是同一个字符串。

4.4 Spring事务问题

spring的声明式事务是基于AOP实现的,意味着方法结束后才会提交事务。

4.4.1 锁在事务内

当synchronized在该方法内时,会出现这么一种情况:锁释放了,但事务未提交,事务未提交时,下一个线程进来,读到的是数据库未修改的快照,仍然会发生一人多单的问题,所以我们要在调用该方法处添加锁,或是使用编程式事务。

synchronized (userid.toString().intern()) {
	createVoucherOrder(voucherId);
}

4.4.2 事务不生效

正是因为spring事务是通过aop实现,而aop是通过动态代理实现。当我们直接调用该方法时,默认是this.createVoucherOrder(voucherId),使用的是this调用,而不是使用代理对象来调用,只有通过代理对象来调用,事务才会生效。

  1. 引入依赖

    <dependency>
        <groupId>org.aspectj</groupId>
        <artifactId>aspectjweaver</artifactId>
    </dependency>
    
  2. 暴露代理对象

    启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解

    @MapperScan("com.hmdp.mapper")
    @EnableAspectJAutoProxy(exposeProxy = true)
    @SpringBootApplication
    public class HmDianPingApplication {
        public static void main(String[] args) {
            SpringApplication.run(HmDianPingApplication.class, args);
        }
    }
    
  3. 获取代理对象,并使用代理对象调用方法

    synchronized (userid.toString().intern()) {
    	// 确保事务生效
    	IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    	proxy.createVoucherOrder(voucherId);
    }
    

4.5 集群模式下的一人一单问题

通过刚刚的几波操作,可以说是基本解决了单机部署情况下的一人一单问题,若是集群部署,仍然会有问题。

可以开两个服务debug一下:

(小坑:Intellij Idea 2022.3.1版本,断点调试打断点后,需要等待断点上的“√”出现才能启动服务,不然断点无效,多个服务建议多选并同时启动,不然要等上一个服务启动到“√”出现才能启动下一个服务。)

在这里插入图片描述

我们的 锁 锁的是userid,锁的是jvm常量池中的userid字符串。若是多个jvm,不是同一个常量池,自然就锁不住了。

五、分布式锁

解决集群情况下的并发问题,则需要一个多个进程能都使用的锁。

5.1 分布式锁概述

分布式锁:满足分布式系统或集群模式下多进程可见并互斥的锁。

在这里插入图片描述

分布式锁可以借助第三方中间件简单的实现:

在这里插入图片描述

5.2 基于redis实现分布式锁

通过redis的setnx命令可以轻松实现简单的互斥锁,因为redis是单线程的,不会存在并发问题。

在这里插入图片描述

通过set key value nx ex 10 来保证原子性,防止在设置ttl值前宕机,发生死锁。这是一种非阻塞的锁,拿不到锁立即返回false,没有重试机制。

5.2.1 锁误删

若只是简单的使用setnx和del来获取和释放锁,则会出现一些问题,例如,拿到锁的线程业务阻塞了,它的锁超时释放了,此时另一个线程拿到锁,在另一个线程还未释放锁时,它的业务完成,并删掉了另一个线程的锁。这样一来,第三个线程又能拿到锁。

在这里插入图片描述

所以我们需要有一种机制,防止锁的误删:

在这里插入图片描述

在释放锁时,再判断一下锁是否是自己的。

具体方案:在设置该锁的value时,我们使用threadid,因为在多个JVM中,threadid有可能相同,于是我们在threadid前拼接一个UUID。这样就保证了误删锁的情况不会再发生。

5.2.2 Lua脚本保证命令原子性

避免锁的误删后,我们的分布式锁就比较健壮了。但在某些极端场景下,仍会出现误删问题:

在释放锁时,会先判断是否是当前的锁(在redis中获取该锁的value),此时,因为一些原因(如GC)阻塞了,下一个线程拿到锁,此时阻塞结束,由于已经做过判断,上一个线程会把下一个线程拿到的锁误删,此时第三个线程又能拿到锁了。

在这里插入图片描述

解决方案:将判断和释放锁封装成一个原子操作。

使用Lua脚本可以完美解决这个问题。

在lua脚本中,可以直接使用redis.call()方法来编写redis命令,在redis中使用EVAL调用。有如下好处:

  1. 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延。
  2. 原子操作。Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务。
  3. 复用。客户端发送的脚本会永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。

在这里插入图片描述

注意:lua语言中,数组下标是从1开始。

了解lua脚本后,我们就可以开始编写lua脚本了。

在这里插入图片描述

根据需求:

在这里插入图片描述

简写:

在这里插入图片描述

接下来就是使用Java的redis客户端嗲用Lua脚本:

使用execute方法:

在这里插入图片描述

于是,我们写出了简单且健壮的分布式锁:

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
	// Lua脚本
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    // 锁的名字 key一般为业务名
    private String key;
	// 锁前缀
    private static final String KEY_PREFIX = "lock:";
    // 该区分不同的JVM
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "";
    // 初始化Lua脚本
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
	
    // 初始化key
    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.key = KEY_PREFIX + name;
    }

    /**
     * @param timeoutSec 锁持有的时长 过期自动释放 
     * @return
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(success);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        stringRedisTemplate.execute(UNLOCK_SCRIPT,
                Collections.singletonList(key),
                ID_PREFIX + Thread.currentThread().getId()
        );
    }
}

5.3 小结

在这里插入图片描述

编码收获:

  1. 读取文件,若该文件是不变的,则在static代码块中初始化一次就好了。
  2. ClassPathResource()类快速获取Resource目录下的资源。

六、Redisson

现在我们基于String类型的setnx实现的分布式锁已经足够健壮,但仍有不足:

在这里插入图片描述

要解决这些问题,我们的基础版redis分布式锁就不好发力了。

但Redisson轻松解决:

在这里插入图片描述

Redisson中不仅有分布式锁,分布式锁只是它的一个子集 。

6.1 使用Redisson分布式锁

  1. 引依赖

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.19.3</version>
    </dependency>
    
  2. 配置

    如果使用yaml来配置,则会将redis的配置覆盖。我们使用redisson只是作为分布式锁使用,所以单独配置:

    @Configuration
    public class RedissonConfig {
        @Bean
        public RedissonClient redissonClient() {
            Config config = new Config();
            // 单点配置,也可以用useClusterServers添加集群地址
            config.useSingleServer().setAddress("redis://192.168.0.122:6379").setPassword("123456");
            return Redisson.create(config);
        }
    }
    
  3. 使用

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userid = voucherOrder.getUserId();
        // 创建锁对象
        RLock lock = redissonClient.getLock("order:" + userid);
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        // 是否拿到锁
        if (!isLock) {
            log.error("不能重复下单!");
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    

    非常简单。

    也可以携带参数

在这里插入图片描述

6.2 Redisson分布式锁原理

redisson的分布式锁是如何解决这四个问题?

6.2.1 可重入原理

我们的基础版锁是不可重入的

redisson使用Hash结构轻松解决了这个问题

在这里插入图片描述

每重入一次,value+1,每次释放锁,value-1,value为0,则释放锁。非常巧妙

其底层都是lua脚本,保证命令原子性:

获取锁:

在这里插入图片描述

释放锁:

在这里插入图片描述

源码中lua脚本是写死了放在代码中的,可以去查看,和这些大差不差,释放锁的时候会发布一个释放信号(后文)。

6.2.2 可重试原理

我们基础版的redis是一个非阻塞式的锁,没有任何重试机制。redisson用PubSub模式解决了这个问题。

我们从redis源码中探究:

/**
 * 参数分别是等待时间,锁ttl,时间单位。我们不指定ttl时,默认为-1。  
 **/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 执行获取锁的方法,会返回锁的ttl,获取锁失败会返回null
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // 成功获取锁
    if (ttl == null) {
        return true;
    }
	// 获取失败,准备重试,先检查重试等待时间是否还有剩余
    time -= System.currentTimeMillis() - current;
    // 过期,获取锁失败,返回false
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
	// 还可以重试,准备重试
    current = System.currentTimeMillis();
    // 订阅锁的释放信号
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        // 等待锁释放,获取信号
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
        // 超时 失败
    } catch (TimeoutException e) {
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
            "Unable to acquire subscription lock after " + time + "ms. " +
            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
        // 异常 失败
    } catch (ExecutionException e) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
	// 获取锁释放信号成功
    try {
        // 再次判断等待时间是否过期
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
		// 没有过期 尝试获取锁
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // 获取锁成功
            if (ttl == null) {
                return true;
            }
			// 失败 检查等待时间是否过期
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // 未过期 准备重试 
            currentTime = System.currentTimeMillis();
            // 等待释放锁的信号
            if (ttl >= 0 && ttl < time) {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
			// 获得锁释放信号 再次检查等待时间
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 无论获取锁成功或失败,都要解除订阅
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
}

相比我们基础版redis分布式锁,有很多优点,例如:

  • 重试并不是无休止的重试,而是使用发布订阅模式,等待一个锁的释放信号再去重试,节约内存,性能。
  • 非常严谨,每次获取锁前,都要先判断重试等待时间是否过期。

6.2.3 解决超时释放

虽然锁超时释放可以解决业务异常带来的死锁问题,但是业务超时引发的自动释放也会产生线程安全问题。redisson使用看门狗机制解决了这个问题。

从redis源码中探究:

首先看获取锁的源码

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 参数和之前一致
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 默认情况 leaseTime为 -1 ,这个时候,redisson使用了一个变量internalLockLeaseTime,实际上是30 * 1000毫秒,也就是30秒
        // 该方法会执行lua脚本,尝试获取锁,若获取成功,返回null,获取失败,返回ttl
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // 判断是否获取成功
        if (ttlRemaining == null) {
            // 获取成功 刚才提到 默认情况下,leaseTime为 -1 所以这里走else分支
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 续约 该方法就是解决超时释放的关键
                scheduleExpirationRenewal(threadId);
            }
        }
        // 返回lua脚本返回的ttl或是nil
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

再来看最关键的续约的方法

protected void scheduleExpirationRenewal(long threadId) {
    // 用来存线程对应的锁
    ExpirationEntry entry = new ExpirationEntry();
    // 如果是重入的锁,则无法放入这个map,放入这个map的锁才有资格续约,会获取一个旧的entry,意思就是每一把锁在这个map中有且仅有一条记录
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    // 判断是否是重入的锁
    if (oldEntry != null) {
        // 是重入的锁,只做记录
        oldEntry.addThreadId(threadId);
    } else {
        // 不是重入的锁,记录并开始续约任务
        entry.addThreadId(threadId);
        try {
            // 续约 核心方法
            renewExpiration();
        } finally {
            // 线程终止,取消续约任务
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

看看redisson是如何巧妙地续约的:

private void renewExpiration() {
    // 获取当前锁的信息
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    // 没有记录 不用续约
    if (ee == null) {
        return;
    }
    
   	// 定时任务续约
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 检查是否需要续约
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 执行lua脚本续约,每次执行,都会重置有效期为30秒,续约成功返回ture 续约失败返回false
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                // 异常 移除续约map
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                // 续约成功 继续续约
                if (res) {
                    renewExpiration();
                } else {
                    // 失败 取消定时任务 可以通过EXPIRATION_RENEWAL_MAP.get(getEntryName())快速获取定时任务并取消掉
                    cancelExpirationRenewal(null);
                }
            });
        }
        // 每隔10秒重置一次
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

redisson相比我们的基础版redis,利用看门狗机制,解决了因为业务阻塞而导致锁超时释放的安全问题。有以下优点:

  • 维护一个可以存锁信息和定时任务的EXPIRATION_RENEWAL_MAP,可以快速的找到对应的锁,进行续约或是取消续约操作。
  • 每10秒钟重置一次,若业务没有完成就无限重置,不会因为业务阻塞而超时释放,若是业务异常或宕机,则会马上取消掉定时任务,让锁超时释放,避免导致死锁。

6.2.4 保证主从一致

普通redis分布式锁:

在这里插入图片描述

假如在未同步时,主节点故障,从节点成为主节点后,数据不一致,锁丢失了,有线程安全隐患。

相比普通的redis分布式锁,redisson是如何保证主从一致性的呢?

redisson让每一个redis都是主节点 ,并在这些节点后面跟上从节点

在这里插入图片描述

在这里插入图片描述

即使某个节点宕机,造成了数据不一致,锁也不会失效,因为redisson的连锁,需要在每一个节点上都能获取锁,才算成功。

6.2.5 小结

单体redisson原理:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

七、redis优化秒杀

之前的业务流程,我们是按顺序同步执行的,其中包含了很多数据库的读写操作,并且还花了很多时间来保证线程安全,现在我们使用lua脚本保证一部分的线程安全(例如超卖问题就不用考虑了),再用java的阻塞队列来让耗时操作异步执行。

在这里插入图片描述

redis执行lua脚本是单线程的,所以线程安全。

我们需要做几件事:

  • 新增秒杀优惠券的同时,将优惠券库存存入redis。
  • 利用lua脚本判断库存,一人一单。
  • 如果有购买资格,则生成订单id返回,并将用户id等订单信息传入阻塞队列。
  • 开启线程任务,不断从阻塞队列中获取信息,操作数据库,保存订单。

做完后,会有一些问题:

在这里插入图片描述

由于阻塞队列是javautil包下的,占用的是jvm的内存,会受jvm内存的限制。

当阻塞队列中还有未处理订单,或是处理订单异常等订单丢失的情况时,会导致数据不一致。

八、Redis消息队列实现异步秒杀

为了解决以上问题,我们引入第三方的消息队列。好处就是不会受JVM内存限制。当然redis消息队列不好玩,一般都是用专门的mq中间件(rabbitmq,rocketmq,kafka等)。

redis提供了三种不同的实现消息队列的方式:

  1. list:基于list结构模拟消息队列。
  2. PubSub:发布订阅模式,基本的点对点消息模型。
  3. stream:比较完善的消息队列模型。(功能强大但复杂的一批)

在这里插入图片描述

8.1 基于List结构模拟消息队列

就是利用LPUSH、RPOP这些命令,只要出入口不一致即可。但要模拟阻塞队列,就需要LPUSH、BRPOP这些阻塞的命令了。

8.2 PubSub发布订阅模式

在这里插入图片描述

在这里插入图片描述

8.3 基于Stream结构的消息队列

8.3.1 基本使用

发消息:

在这里插入图片描述

读消息:

阻塞地读消息:

在这里插入图片描述

这就是stream结构地基本使用。

问题:

我们是使用ID来指定读取地消息,当ID为0时,代表从第一个消息开始读,当ID为“$”时,代表读取最新消息,若在处理某条消息时,有n(n>1)条消息进入队列,当那条消息处理完,下次读取只会读取最新地一条,会漏掉中间地消息,造成消息漏读。

8.3.2 消费者组

在这里插入图片描述

消费者组不仅可以加快消息地处理速度,还有一定的数据保护措施。但不得不吐槽,真的麻烦,想要保证数据安全,有时候不得不手动ACK。

创建消费者组:

在这里插入图片描述

读消息:

在这里插入图片描述

这里比较特别的是ID可以取值为“>”。

8.3.3 改造秒杀业务

// 在类初始化完毕后,就开始监听队列
@PostConstruct
public void init() {
    Runnable task = () -> {
        String queueName = "stream.orders";
        while (true) {
            try {
                // 1. 获取消息队列中的订单消息
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                // 2. 判断消息是否获取成功
                if (list == null || list.isEmpty()) {
                    // 2.1 获取失败 下一轮循环
                    continue;
                }
                // 获取消息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);
                // 3. 获取成功 下单
                handleVoucherOrder(voucherOrder);
                // 4. ACK确认
                stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
            } catch (Exception e) {
                log.error("处理消息异常", e);
                // 异常,从pending-list中取出消息重试
                handlePendingList();
            }

        }
    };
    SECKILL_ORDER_EXECUTOR.submit(task);
}

异常消息重试:

private void handlePendingList() {
    String queueName = "stream.orders";
    while (true) {
        try {
            // 1. 获取pending-list中的订单消息
            List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create(queueName, ReadOffset.from("0"))
            );
            // 2. 判断消息是否获取成功
            if (list == null || list.isEmpty()) {
                // 2.1 获取失败 下一轮循环
                break;
            }
            // 获取消息
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);
            // 3. 获取成功 下单
            handleVoucherOrder(voucherOrder);
            // 4. ACK确认
            stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
        } catch (Exception e) {
            log.error("处理pending-list消息异常", e);
        }
    }
}

8.3.4 小结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/374471.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

改进的 A*算法的路径规划(路径规划+代码+毕业设计)

引言 近年来&#xff0c;随着智能时代的到来&#xff0c;路径规划技术飞快发展&#xff0c;已经形成了一套较为成熟的理论体系。其经典规划算法包括 Dijkstra 算法、A算法、D算法、Field D算法等&#xff0c;然而传统的路径规划算法在复杂的场景的表现并不如人意&#xff0c;例…

一些cmake error fixed

建完虚拟环境后 运行 pip install . 出现报错&#xff0c;显示svox2安装出错&#xff0c;然后开始进入到svox2中进行手动编译和安装。 1. cmake svox2/csrc pybind11找不到 conda install pybind11用 pip install 在虚拟环境中安装不行&#xff0c;据说会安装到全局下… 2. c…

Allegro如何标注PCB的尺寸参数操作指导

Allegro如何标注PCB的尺寸参数操作指导 在输出生产文件之前,需要对PCB的尺寸进行标注,如下图 用Allegro如何进行标注,具体操作如下 点击Manufacture选择Dimension Enviroment<

量化学习(一)数据列表获取

试验环境 windows10 AnacondaPyCharm&#xff08;小白参考文章&#xff1a;https://coderx.com.cn/?p14&#xff09; 数据库&#xff1a; VM中安装MySQL5.7&#xff08;设置utf8及相应配置优化&#xff09; 复权 小白参考文章&#xff1a;https://zhuanlan.zhihu.com/p/469820…

实例3:树莓派呼吸灯

实例3&#xff1a;树莓派呼吸灯 实验目的 通过背景知识学习&#xff0c;了解digital与analog的区别。通过GPIO对外部LED灯进行呼吸控制&#xff0c;熟悉PWM技术。 实验要求 通过python编程&#xff0c;用GPIO控制LED灯&#xff0c;使之亮度逐渐增大&#xff0c;随后减小&am…

10万字大数据平台数据治理体系和大数据架构技术方案word

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a; 总体技术架构设计 基于企业内部…

高频面试题|RabbitMQ如何防止消息的重复消费?

一. 前言最近有很多小伙伴开始找工作&#xff0c;在面试时&#xff0c;面试官经常会问我们这样一个题目&#xff1a;RabbitMQ如何防止重复消费?有很多小伙伴这个时候都在想&#xff0c;消息怎么还会重复消费呢???.......所以他们在面试后就跑来问壹哥&#xff0c;针对这个比…

【华为OD机试模拟题】用 C++ 实现 - 异常的打卡记录(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 去重求和(2023.Q1) 文章目录 最近更新的博客使用说明异常的打卡记录【华为OD机试模拟题】题目输入输出备注示例一输入输出说明示例二输入输出说明示例三输入输出说明

基于合作型Stackerlberg博弈的考虑差别定价和风险管理的微网运行策略研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

数据结构与算法(五):优先队列

这节总结一下优先队列的常用实现方法。 一、基本概念 普通的队列是一种先进先出的数据结构&#xff0c;元素在队列尾追加&#xff0c;而从队列头删除。在优先队列中&#xff0c;元素被赋予优先级。当访问元素时&#xff0c;具有最高优先级的元素最先删除。优先队列具有最高级…

100天精通Python(数据可视化篇)——第77天:数据可视化入门基础大全(万字总结+含常用图表动图展示)

文章目录1. 什么是数据可视化&#xff1f;2. 为什么会用数据可视化&#xff1f;3. 数据可视化的好处&#xff1f;4. 如何使用数据可视化&#xff1f;5. Python数据可视化常用工具1&#xff09;Matplotlib绘图2&#xff09;Seaborn绘图3&#xff09;Bokeh绘图6. 常用图表介绍及其…

81页5G 智慧工厂物联数字孪生可视化建设方案

数字企业建设思路3 XXXX智慧企业将以信息化为基础、以数据为纽带、以制造为核心、以管理为载体打造新型智慧园区&#xff0c;该智慧园区整合了企业的安全、环保、能源、安防、应急、服务等数据资源&#xff0c;支撑企业科学、准确、及时决策&#xff0c;提升企业综合监管能力、…

计算机网络笔记、面试八股(一)—— TCP/IP网络模型

本章目录1. TCP/IP网络模型1.1 应用层1.1.1 应用层作用1.1.2 应用层有哪些常用协议1.2 运输层1.2.1 TCP与UDP的区别1.2.2 分块传输1.2.3 端口1.3 网络层1.3.1 IP报文1.3.2 IP地址1.3.3 网络号和主机号的获得1.3.4 子网掩码的获得1.3.5 路由1.3.6 IP地址与MAC地址的区别1.3.7 AR…

【C++】List 基本接口的使用

LISTList 基本接口介绍前言list 构造方法list 析构方法容量相关元素获取迭代器元素的修改其他相关操作前边博客中已经介绍了c STL 中的 string 以及 vector 基本接口的使用方法并进行了接口的模拟实现&#xff0c;接下来让我们来学习 list 的基本接口使用方法吧~~ List 基本接…

Linux基础命令-stat显示文件的状态信息

文章目录 stat 命令介绍 语法格式 基本参数 测试三个时间的变化过程 1&#xff09;使用cat命令 2&#xff09;使用echo命令 3&#xff09;使用chmod命令 4&#xff09;使用vim命令 参考实例 1&#xff09;显示文件的状态信息 2&#xff09;以简洁的形式显示状态信…

Android:IdleHandler的简单理解和使用

IdleHandler的简单理解和使用1、IdleHandler 是什么2、IdleHandler 使用方式2.1、添加和删除2.2、执行3、常见问题和使用场景3.1、使用场景3.2、常见问题参考1、IdleHandler 是什么 IdleHandler 说白了&#xff0c;就是 Handler 机制提供的一种&#xff0c;可以在 Looper 事件…

Cesium 100K数据加载 支持弹窗 动态更改位置

前言&#xff1a;今天总结关于point、label、billboard海量数据加载。后续会研究下大量model加载以及大bim(几百G上T)模型记载 海量点加载 弹窗 加载点位时&#xff0c;不加载弹窗。点击点位时在加载弹窗&#xff0c;及有效的减少加载量&#xff0c;优化性能。 const handler …

FPGA学习之日常工作复位电路

最近一个多月没有写博客了&#xff0c;然后最近工作中也遇到一个复位信号的问题。问题是这样的&#xff0c;关于外部复位信号&#xff0c;之前我们的处理方式都是通过PLL产生的Lock信号作为内部的复位信号。但是由于换到A54上面没有IP核&#xff0c;所以只有不用PLL&#xff0c…

Mybatis持久层框架 | 动态SQL、缓存

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 动态SQL 动态SQL就是指根据不同条件生成不同的sql语句&#xff0c;本质还是SQL语句&#xff0c;知识可以在SQL层面&#xff0c;执行逻辑代码 搭建环境 创建数据库 cre…

Linux搭建SVN服务器,并内网穿透实现公网远程访问

文章目录1. Ubuntu安装SVN服务2. 修改配置文件2.1 修改svnserve.conf文件2.2 修改passwd文件2.3 修改authz文件3. 启动svn服务4. 内网穿透4.1 安装cpolar内网穿透4.2 创建隧道映射本地端口5. 测试公网访问6. 配置固定公网TCP端口地址6.1 保留一个固定的公网TCP端口地址6.2 配置…