Redis实战笔记

news2024/11/28 10:36:36

黑马点评项目笔记

一:数据交互:

1.把String解析成Java对象集合并且存入Redis及Java对象集合转换成JSON。

 @Override
    public Result queryTypeList() {
        String s = stringRedisTemplate.opsForValue().get("cache:list:");
        System.out.println("s = " + s);
        if(!StrUtil.isBlank(s)){
            List<ShopType> cachedList = JSONUtil.toList(s, ShopType.class);
            return Result.ok(cachedList);
        }
        LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.orderByAsc(ShopType::getSort);
        List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);
        System.out.println("list = " + list);
        stringRedisTemplate.opsForValue().set("cache:list:", JSONUtils.toJSONString(list));
        return Result.ok(list);
    }

2.List存储:

List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);
        System.out.println("stringlist = " + stringlist);
        if (stringlist != null && !stringlist.isEmpty()) {
            List<ShopType> shopTypeList = new ArrayList<>();
            for (String jsonString : stringlist) {
                ShopType shopType = JSONUtil.toBean(jsonString, ShopType.class);
                shopTypeList.add(shopType);
            }
            return Result.ok(shopTypeList);
        }
        LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.orderByAsc(ShopType::getSort);
        List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);
        System.out.println("list = " + list);
        for (int i = 0; i < list.size(); i++) {
            stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));
            System.out.println(list.get(i));
        }
        return Result.ok(list);
    }
当然,亦可以用Lambda表达式进一步简化:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);
        System.out.println("stringlist = " + stringlist);
        if (stringlist != null && !stringlist.isEmpty()) {
           List<ShopType> shopTypeList = stringlist.stream()  
            .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))  
            .collect(Collectors.toList());  
            return Result.ok(shopTypeList);
        }
        LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.orderByAsc(ShopType::getSort);
        List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);
        System.out.println("list = " + list);
        for (int i = 0; i < list.size(); i++) {
            stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));
            System.out.println(list.get(i));
        }
        return Result.ok(list);
    }
注意,前端一般要的是Json数组,在这里,我们只需要一个对象集合就行,Springboot会默认JSON格式返回。
对于下面的代码:
List<ShopType> shopTypeList = stringlist.stream()  
            .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))  
            .collect(Collectors.toList());  
            return Result.ok(shopTypeList)
  1. 声明并初始化 shopTypeList
List<ShopType> shopTypeList = ...;

这行代码声明了一个ShopType对象的列表shopTypeList。它会在后续代码中通过Stream API操作进行初始化。

  1. 使用Stream API处理 stringlist
stringlist.stream()

这行代码将stringlist(一个包含JSON字符串的列表)转换为一个Stream,以便进行流式处理。

3.映射每个JSON字符串到 ShopType 对象

.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
使用map操作,将Stream中的每个JSON字符串元素映射为对应的ShopType对象。这里JSONUtil.toBean是一个可以将JSON字符串转换为指定类型对象的方法。

​ 4. 收集结果到新的列表

.collect(Collectors.toList());

使用collect操作,将Stream中的元素收集到一个新的列表中。这里使用了Collectors.toList()作为收集器,它会创建一个新的列表来存储转换后的ShopType对象。

二:缓存策略:

2.1.更新:
image-20240422202913302
一般我们选用第一种。

image-20240422203251310

对于三,两种都存在问题:

image-20240422203736068

但是,第二种发生问题的概率极低。所以我们用第二种:
@Override
@Transactional
public Result updateshop(Shop shop) {
    Long id = shop.getId();
    if (id == null){
        return Result.fail("店铺id不能为空");
    }
   updateById(shop);
   stringRedisTemplate.delete("cache:shop:" + id);
   return Result.ok();
}

2.2.穿透:

image-20240422205639525
我们这里用第一种方法:
if(StrUtil.isNotBlank(s)){
    Shop shop = JSONUtil.toBean(s,Shop.class);
    return Result.ok(shop);
}
//注意,null不等于""
if(s != null){
    return Result.fail("店铺不存在。");
}

//......

if(shop == null){
//写空值
stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
            return Result.fail("店铺不存在。");
     }

2.3.雪崩:

image-20240422211712419

2.4.击穿:

image-20240422212047122

对比:

image-20240422212634600

image-20240422212722940
注意,逻辑过期是假的过期,但是互斥锁是真的没有该数据的缓存了。

互斥锁:

image-20240422213224412
我们用setnx来实现该锁:
private Boolean tryLock(String key){
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(result);
    }
    private void unLock(String key){
        stringRedisTemplate.delete(key);
    }
防止大量缓存穿透及锁的使用代码:
public Shop queryWithLock(Long id){
        String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);
        log.info(s);
        if(StrUtil.isNotBlank(s)){
            Shop shop = JSONUtil.toBean(s,Shop.class);
            return shop;
        }
        if(s != null){
            return null;
        }
        //缓存重建
        String lockKey = "lock:shop:"+id;
        Shop shop = null;
        try {
            if(!tryLock(lockKey)){
                Thread.sleep(50);
                return queryWithLock(id);
            }
            shop = getById(id);
            log.info("shop = " +shop);
            if(shop == null){
                //写空值
                stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
                return null;
            }
            String key = "cache:shop:" + id;
            stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            unLock(lockKey);
        }
        return shop;
    }
注意,try-catch-finally是保障锁必须被释放的关键。方法调用如下:
@Override
    public Result queryById(Long id) {
       //缓存穿透
       //Shop shop =queryWithPassTrough(id);
       //互斥锁
            Shop shop = queryWithLock(id);
            if(shop == null){
                return Result.fail("店铺不存在。");
            }
            return Result.ok(shop);
    }

逻辑过期:

image-20240423125308260

首先是怎么实现设置逻辑过期:
@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}
然后我们接下来模拟一下将热点事件提前写入redis:
 public void saveShopRedis(Long id,Long expires){
        Shop shop = getById(id);
        RedisData redisData = new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expires));
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
    }
先给出独立线程代码:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
  1. ExecutorService: 这是Java的并发包java.util.concurrent中的一个接口,它代表了一个用于执行任务的线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的ThreadFactory创建一个新线程。
  2. CACHE_REBUILD_EXECUTOR: 这是我们创建的ExecutorService对象的变量名。根据命名,它可能用于重建或刷新某种缓存。
  3. Executors.newFixedThreadPool(10): 这是创建线程池的方法。Executors是Java中的一个工具类,它提供了创建线程池的静态方法。newFixedThreadPool(10)创建了一个固定大小的线程池,其中包含10个线程。这意味着无论提交多少任务,线程池中的线程数量都不会超过10个。当提交的任务数超过线程数时,任务会在队列中等待,直到线程空闲并可以处理它们。

简而言之,这行代码定义了一个私有的、静态的、不可变的ExecutorService对象,该对象是一个包含10个线程的固定线程池,用于处理与缓存重建相关的任务。

接下来就是业务代码:
 public Shop queryWithExpire(Long id){
        String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);
        log.info(s);
        if(StrUtil.isBlank(s)){
            return null;
        }
        RedisData bean = JSONUtil.toBean(s, RedisData.class);
        JSONObject object = (JSONObject) bean.getData();
        Shop shop = JSONUtil.toBean(object, Shop.class);
        LocalDateTime expirationTime = bean.getExpireTime();
        if(expirationTime.isAfter(LocalDateTime.now())){
                return shop;
        }
        //缓存重建
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        if(isLock){
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try {
                    saveShopRedis(id,20L);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    unLock(lockKey);
                }
            });
        }
        return shop;
    }
记得在finally中释放锁。

2.5.自定义工具类:

@Slf4j
@Component
public class CacheClient {
    private final StringRedisTemplate stringRedisTemplate;
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    public void set(String key, Object value, Long time , TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }
    private Boolean tryLock(String key){
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(result);
    }
    private void unLock(String key){
        stringRedisTemplate.delete(key);
    }
    public void setWithLogicalExpire (String key, Object value, Long time , TimeUnit unit) {
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }
    //穿透代码
    public <R,ID> R queryWithPassTrough (String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit) {
        String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);
        log.info(s);
        if(StrUtil.isNotBlank(s)){
            return JSONUtil.toBean(s,type);
        }

        if(s != null){
            return null;
        }
        R r = dbFallback.apply(id);
        if(r == null){
            //写空值
            stringRedisTemplate.opsForValue().set(keyPrefix + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
            return null;
        }
        String key = keyPrefix + id;
        this.set(key,r,time,unit);
        return r;
    }
    public <R,ID>R queryWithExpire (String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
        String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);
        log.info(s);
        if(StrUtil.isBlank(s)){
            return null;
        }
        RedisData bean = JSONUtil.toBean(s, RedisData.class);
        JSONObject object = (JSONObject) bean.getData();
        R r = JSONUtil.toBean(object, type);
        LocalDateTime expirationTime = bean.getExpireTime();
        if(expirationTime.isAfter(LocalDateTime.now())){
            return r;
        }
        //缓存重建
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        if(isLock){
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try {
                    R apply = dbFallback.apply(id);
                    this.setWithLogicalExpire(keyPrefix + id,apply,time,unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    unLock(lockKey);
                }
            });
        }
        return r;
    }
}

补充:
这里的StringRedisTemplate没有调用set方法,也没有注解,但是依然实现了依赖注入,这是因为在spring中,如果注入对象,被注入对象都加入IOC容器,并且有一个单一的构造函数,那么就可以实现不需要注解与配置的依赖注入。
使用举例:
 @Override
    public Result queryById(Long id) {
        Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);
            if(shop == null){
                return Result.fail("店铺不存在。");
            }
            return Result.ok(shop);
    }
注意,如果是缓存击穿,一定要数据预热。

三:秒杀业务:

在这种业务中,我们用全局ID生成器来生成ID:

image-20240426175726793

我们用redis来处理,可以满足要求。无论在哪里操作数据库,都是redis。

image-20240426175938648

常见全局唯一ID策略:

image-20240426212805722

注意,这里的数据库自增并不是数据库单纯的自增,因为我们知道,在后面的学习中,是分布式的,到时候,不同的线程可能会导致乱序,重序,这里是单独的那另外一张表做自增,其他的线程都从这里取id。
下面我们来看看基于redis自增的策略:
public class RedisIDWorker {
    public static final long BEGIN_TIMESTAMP = 1640995200L;
    private static final int COUNT_BITS = 32;
    private StringRedisTemplate stringRedisTemplate;
    public RedisIDWorker (StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    public long nextId (String keyPrefix) {
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);

        return timestamp << COUNT_BITS | count;
    }
}

increment(String key): 这个方法是对指定的 key 进行自增操作。如果该 key 不存在,那么它的初始值会被设为 0,然后进行自增,结果为 1。如果 key 存在且其值可以转换为整数,则该值会进行自增。

3.2.超卖问题:

image-20240427204959568

image-20240427205407204

image-20240427205711299

但是,这里的票的变化是与版本同步的,所以我们没有必要用版本号,直接用票作为乐观锁。

image-20240427210009616

CAS是指"比较和交换"(Compare and Swap)的缩写,是一种原子操作,通常用于多线程编程中实现同步。CAS操作包括三个参数:需要进行操作的内存位置、旧的预期值和新的值。CAS操作会比较内存位置的当前值与旧的预期值,如果相同则将内存位置的值更新为新的值,否则不做任何操作。CAS操作是一种乐观锁的实现方式,可以避免使用传统的锁机制带来的性能开销和死锁问题。CAS操作在Java中的实现包括AtomicInteger、AtomicLong等类。
@Transactional
    @Override
    public Result seckillVoucher(Long voucherId) {
       SeckillVoucher byId = seckillVoucherService.getById(voucherId);
        if(byId.getBeginTime().isAfter(LocalDateTime.now())){
            return Result.fail("秒杀还没开始");
        }
        if(byId.getEndTime().isBefore(LocalDateTime.now())){
            return Result.fail("秒杀已经结束");
        }
        if(byId.getStock()<=0){
            return Result.fail("库存不足");
        }
        boolean success = seckillVoucherService.update().setSql("stock = stock -1")
                .eq("voucher_id", voucherId)
                .eq("stock",byId.getStock())
                .update();
        if(!success){
            return Result.fail("库存不足");
        }
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setVoucherId(voucherId);
        long orderId = redisIDWorker.nextId("order");
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        save(voucherOrder);
        return Result.ok(orderId);
    }
}
注意,乐观锁也有弊端,他在多个线程同时买票时只能允许一个线程成功,其他线程都会失败,所以我们要解决这个问题:
boolean success = seckillVoucherService.update().setSql("stock = stock -1")
                .eq("voucher_id", voucherId)
                .gt("stock",0)
                .update();
  1. .eq("voucher_id", voucherId)
    • 这是一个条件语句,表示只更新那些voucher_id字段值等于voucherId的记录。voucherId应该是一个变量,存储了要更新的优惠券的ID。
  2. .gt("stock",0)
    • 这是另一个条件语句,表示只更新那些stock字段值大于0的记录。gt是“greater than”的缩写,所以这个条件确保不会更新那些库存已经为0的记录。
  3. .update();
    • 最后,调用update方法执行实际的更新操作。
注意,这里事实上是利用了数据库自带的行锁,当某个事务尝试更新某一行记录时,数据库会为该行加上行锁,阻止其他事务同时修改该行,直到当前事务完成并释放锁。

3.3.一人一单:

Long userid = UserHolder.getUser().getId();
long count = query().eq("user_id",userid).eq("voucher_id",voucherId).count();
        if(count > 0){
            return Result.fail("您已经购买过一次!");
        }
但是依然有问题,用户多线程操作时,依然会存在都判断通过的问题,所以我们先把代码分成几个方法,然后加锁:
 @Transactional
    public  Result createVoucher(Long voucherId){
        Long userid = UserHolder.getUser().getId();
        synchronized(userid.toString().intern()) {
            long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();
            if (count > 0) {
                return Result.fail("您已经购买过一次!");
            }
            VoucherOrder voucherOrder = new VoucherOrder();
            voucherOrder.setVoucherId(voucherId);
            long orderId = redisIDWorker.nextId("order");
            voucherOrder.setId(orderId);
            voucherOrder.setUserId(UserHolder.getUser().getId());
            save(voucherOrder);
            return Result.ok(orderId);
        }
    }
这里有两个细节:
1.userid.toString().intern(),当处理一个用户的多个请求时,如果只用toString(),该方法在Java中是new一个新的对象,那么即使是同一个用户,可能他的多次请求的锁都不一样,这是有问题的,但是intern()是在字符串常量池中判断有没有这个这个,如果有,就用这个对象。
2.不能在方法上加锁:因为所有的线程都走一个方法,可能会导致其他问题。

但是这样还是有问题:

因为这里又有事务又有锁,锁在事务的内部,如果锁先释放,事务还没有提交,然后其他线程去查询,有可能查询不到,就会存在问题:
所以我们把锁加在外面:
@Transactional
public  Result createVoucher(Long voucherId){
    Long userid = UserHolder.getUser().getId();
    long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();
    if (count > 0) {
        return Result.fail("您已经购买过一次!");
    }
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setVoucherId(voucherId);
    long orderId = redisIDWorker.nextId("order");
    voucherOrder.setId(orderId);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    save(voucherOrder);
    return Result.ok(orderId);
}
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {
        return createVoucher(voucherId);
  }
但是这样还是有问题:
在外部同步代码块中直接调用带有@Transactional注解的方法可能会导致事务管理不生效的原因,主要是因为Spring的AOP(面向切面编程)代理机制。@Transactional注解的事务管理是通过Spring的AOP代理来实现的,具体来说:

代理模式:**Spring使用代理(通常是JDK动态代理或者CGLIB代理)来拦截那些标有@Transactional注解的方法调用。**当一个被代理的对象调用这些方法时,Spring会在调用前后插入事务管理的逻辑,比如开始事务、提交或回滚事务。
自我调用问题:在同一个类内部,如果一个方法直接调用另一个带有@Transactional注解的方法,这种调用是“直接调用”,而非通过代理对象进行。因此,Spring的AOP代理无法拦截到这种内部调用,从而不会触发事务管理的行为。
在代码示例中,虽然createVoucher方法上有@Transactional注解,但如果它是直接在类内部通过同步代码块被调用,那么这个直接调用就绕过了Spring的AOP代理,事务管理因此可能不会生效。

为了解决这个问题,确保事务管理能够正常工作,可以考虑以下几种方案:
1.通过代理对象调用:如果必须在类内部调用事务方法,可以通过注入当前类的bean实例来间接调用,这样Spring的AOP代理就能生效。重构方法结构:将事务逻辑和同步逻辑整合到一个方法中,确保事务和同步都在同一个由Spring管理的方法体内执行。
2.使用AspectJ编织:Spring也支持使用AspectJ来实现更细粒度的切面编织,包括对类内部方法调用的拦截,但这需要额外的配置和可能的编译期织入。
综上所述,直接在同步代码块中调用事务方法的问题在于它规避了Spring AOP代理,从而可能忽略了事务管理的预期效果。
修改如下:
	 Long userid = UserHolder.getUser().getId();
        synchronized(userid.toString().intern()) {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucher(voucherId);
        }
AopContext.currentProxy(): 这是Spring提供的一个方法,用于在AOP代理方法内部获取当前代理对象。这一步很关键,因为它允许在类内部调用时仍然通过代理来执行,从而确保了@Transactional注解的事务管理能够生效。通过代理调用createVoucher: 通过强制类型转换得到的代理对象proxy来调用createVoucher方法,而不是直接调用。这样做使得事务管理逻辑能够被Spring的AOP代理捕获并正确执行,包括事务的开始、提交或回滚。
导入依赖:
 		<dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
暴露代理对象,在启动类上面加上注解:
@EnableAspectJAutoProxy(exposeProxy = true)

集群的并发问题:

image-20240428144744194
每个jvm内部共享一个synchronized锁监视器。如果在同一个tomcat,那肯定是没有问题的,但是如果是多个服务器,就有多个tomcat,JVM,就会出现问题。

3.4.分布式锁:

在这里插入图片描述

image-20240428145811718

image-20240428150237310

所以我们使用Redis。

为了避免释放错误,所以我们加上过期时间。注意,设置过期时间与setnx是两个语句,我们必须确保它们具有原子性。所以我们可以用set语句:
set lock k1 EX 10 NX
我们用非阻塞式实现这个操作。
public class SimpleRedisLock implements ILock{

    private StringRedisTemplate stringRedisTemplate;
    private String name;
    private static final String KEY_PREFIX = "lock:";

    public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void unLock() {
       stringRedisTemplate.delete(KEY_PREFIX+name);
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        long id = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);
        return Boolean.TRUE.equals(success);
    }
}
使用方法:
Long userid = UserHolder.getUser().getId();
        SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);
        boolean isLock = lock.tryLock(1200);
        if(!isLock){
            return Result.fail("不允许重复下单");
        }
        try {
            synchronized(userid.toString().intern()) {
                IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
                return proxy.createVoucher(voucherId);
            }
            } finally {
            lock.unLock();
        }
可能存在的问题:

image-20240428160736009

所以,要判断这个锁是不是自己的,不要把别人的锁释放了。
image-20240428161935640
注意,不要用本身的线程Id作为判断标准,因为不同的JVM有不同的线程Id,代码改进如下:
 private static  final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
 //......其他代码
 @Override
    public void unLock() {
        String id = ID_PREFIX + Thread.currentThread().getId();
        String lock = stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);
        if(id.equals(lock)) {
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        String id = ID_PREFIX + Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);
        return Boolean.TRUE.equals(success);
    }
但是事实上,还是有问题的(md,怎么这么多问题):
下面是一个极端情况:

image-20240428164447229

如果线程一判断相同通过了,但是删除锁的时候被阻塞了,还是会删除其他线程的锁。但是为什么这样的情况我们可以删掉呢:

注意这里存入redis的key,是前缀加用户ID,而前缀,后缀都是static修饰的,在同一个服务器内前缀后缀是一样的,而我们这里解决的是一人一单,也就是说,用户的ID也是一样的,因此在同一台服务器下的同一个用户的key就是固定的。其次,我们上面将后缀加上线程设置为key值,这是为了应对多台服务器下的问题,而我们用的redis,也是为了解决多台服务器下的并发问题,因此在这里,反而在同一台服务器下,在判断通过后删除之前的间隙中,删除动作迟迟不执行,然后老线程的key过期了,然后另外一个线程创建了一个相同的key,此时,老线程执行删除动作,就会删除掉新线程的key。注意,删除是按照key的!!!

所以我们引入Lua脚本:

image-20240428171308131

image-20240428172216314

image-20240428172818687

新建一个unlock.lua文件:
if(redis.call('get',KEYS[1]) == ARGV[1]) then
    return redis.call('del',KEYS[1])
end
return 0
然后使用静态代码块加载脚本文件:
 	private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
使用脚本:
 @Override
    public void unLock() {
       stringRedisTemplate.execute(
               UNLOCK_SCRIPT,
               Collections.singletonList(KEY_PREFIX+name),
               ID_PREFIX+Thread.currentThread().getId());
           }

image-20240428174619419

3.5.分布式锁优化——Redisson:

image-20240428175026803

因为自己写实在是太烦琐了,所以我们引入了其他第三方服务:

image-20240428175113837

导入依赖:
		<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.6</version>
        </dependency>
配置客户端:
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.6.128").setPassword("kz32330981");
        return Redisson.create(config);
    }
}
使用分布式锁:

image-20240502194926872

举例:
 @Override
    public Result seckillVoucher(Long voucherId) {
       SeckillVoucher byId = seckillVoucherService.getById(voucherId);
        if(byId.getBeginTime().isAfter(LocalDateTime.now())){
            return Result.fail("秒杀还没开始");
        }
        if(byId.getEndTime().isBefore(LocalDateTime.now())){
            return Result.fail("秒杀已经结束");
        }
        if(byId.getStock()<=0){
            return Result.fail("库存不足");
        }
        boolean success = seckillVoucherService.update().setSql("stock = stock -1")
                .eq("voucher_id", voucherId)
                .gt("stock",byId.getStock())
                .update();
        if(!success){
            return Result.fail("库存不足");
        }
        Long userid = UserHolder.getUser().getId();
        //SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);
        RLock lock = redissonClient.getLock("lock:order:" + userid);
        boolean isLock = lock.tryLock();
        if(!isLock){
            return Result.fail("不允许重复下单");
        }
        try {
            synchronized(userid.toString().intern()) {
                IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
                return proxy.createVoucher(voucherId);
            }
            } finally {
            lock.unlock();
        }
    }
这里boolean isLock = lock.tryLock();没有设置参数,默认失败一次就马上返回。

3.6.Redissson可重入锁原理:

为什么一般情况下不能实现可重入锁:

image-20240502213631856

string只能有一个标识,当我们实现可重入锁时,如果是当前的线程,我们把它的value++,释放锁的时候–,所以我们需要两个标识,所以我们使用hash:

image-20240503115517381

image-20240503115612799

Redisson内部也是这么实现的。

3.6.Redissson可重试与避免超时释放原理:

image-20240503122435943

image-20240503123318771

注意,这里为null说明获取锁成功了。为null说明没有存在这个锁,所以当然会成功了。如果失败,返回的是当前存在的锁的有效期。
重试机制:

在Redisson中,可重试机制通常与分布式锁的实现紧密相关。当客户端尝试获取分布式锁但失败时(如锁已被其他客户端持有),Redisson会利用Redis的发布/订阅机制来实现锁的可重试。下面我将简要解释Redisson是如何结合消息订阅来实现可重试机制的:

  1. 锁获取失败与等待
    • 当客户端尝试获取分布式锁但失败时(比如调用lock.tryLock(waitTime, leaseTime, TimeUnit.unit)方法),Redisson会记录当前线程并启动一个定时任务或监听器来等待锁的释放。
  2. 消息订阅
    • 为了实现等待锁的释放,Redisson会利用Redis的发布/订阅机制。具体来说,Redisson会为每个锁维护一个特定的频道(channel)或主题(topic)。
    • 当客户端尝试获取锁但失败时,它会订阅该锁的频道。这样,一旦锁的持有者释放了锁,Redisson就会在该频道上发布一个消息。
  3. 监听器与重试
    • 客户端在订阅了锁的频道后,会设置一个监听器来监听该频道上的消息。
    • 当锁的持有者释放锁并在频道上发布消息时,监听器会立即收到通知。
    • 监听器在收到通知后,会触发一个重试机制,即再次尝试获取锁。
避免超时释放:
  • Redis中的看门狗策略通常与Redisson库一起使用,用于自动检测并处理过期键的机制。

  • 应用程序使用Redisson库监视Redis服务器上的指定键。当这些键被修改时,看门狗策略会自动触发相应的操作,如更新本地缓存或重新计算依赖项。

  • 这有助于应用程序实时响应Redis中的数据变化,并防止在数据发生变化时出现问题,如死锁和其他并发问题。

  • 具体来说,leaseTime是锁的有效期,也就是锁的持有时间。当调用lock(leaseTime, unit)方法并传入一个非-1的leaseTime时,这个特定的锁将会在一个固定的时间后自动释放,即使锁的持有者仍然在运行。在这种情况下,Redisson不会启动看门狗机制,因为锁的过期时间是由应用程序明确指定的。然而,如果调用lock()方法(没有传入leaseTime参数),或者传入-1作为leaseTime的值,那么锁将没有固定的过期时间。此时,Redisson会启动看门狗机制。看门狗会定期检查锁的持有者是否仍然活跃,并在锁即将过期时自动延长锁的有效期,从而确保锁不会被意外释放,直到锁的持有者显式地释放锁或者持有者不再活跃。
注意,只用我们没有设置时间(或者-1)才会触发这个策略。
3.7.主从一致性:
为什么以前会出现问题:

image-20240506172825214

当主节点出现问题时,从从节点获取锁就会成功。
而Redisson是怎么解决这个问题的呢:

image-20240506173309320

以下是如何使用Redisson的MultiLock的基本步骤:

  1. 配置Redisson客户端
    首先,你需要配置Redisson客户端以连接到Redis集群或哨兵模式,如之前所述。

  2. 创建RLock对象
    对于每个Redis节点,你需要创建一个RLock对象。这些RLock对象将用于组成MultiLock

    RLock lock1 = redisson1.getLock("lock1"); // 假设redisson1连接到第一个Redis节点  
    RLock lock2 = redisson2.getLock("lock2"); // 假设redisson2连接到第二个Redis节点  
    // ... 为其他节点创建RLock对象 ...
    

    注意:虽然锁的名称(如"lock1""lock2")可以不同,但出于一致性和可维护性的考虑,最好使用相同的锁名称。

  3. 组合RLock对象为MultiLock
    使用这些RLock对象创建一个MultiLock

    List<RLock> locks = Arrays.asList(lock1, lock2, /* ... 其他locks ... */);  
    RLock multiLock = redisson.getMultiLock(locks);
    
  4. 使用MultiLock
    现在你可以像使用普通的RLock一样使用MultiLock了。

    multiLock.lock();  
    try {  
        // 临界区代码,只有获得锁的线程才能执行  
    } finally {  
        multiLock.unlock();  
    }
    

    注意:在finally块中解锁是非常重要的,以确保锁总是被释放,即使发生异常。

  5. 关闭Redisson客户端
    完成所有操作后,确保关闭所有Redisson客户端实例以释放资源。

    redisson1.shutdown();  
    redisson2.shutdown();  
    // ... 关闭其他Redisson客户端 ...
    
  6. 测试
    为了测试MultiLock在多节点环境中的行为,你可以模拟节点故障、网络分区等情况,并观察MultiLock是否仍然能够正确工作。

总结:

在这里插入图片描述

3.7.异步秒杀:

原来的串行执行:

image-20240506175926269

优化如下:

image-20240506180246721

完整的流程如下:

image-20240506184456708

Part 1:保存到redis:增加redis中的优惠券数量:
 	@Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        stringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), voucher.getStock().toString());
    }
Part 2:编写Lua脚本:
image-20240506190359407
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) then
    return 1
end
if(redis.call('sismember',orderKey,userId) == 1) then
    return 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
  1. 参数定义:
    • ARGV[1]voucherId,表示优惠券ID。
    • ARGV[2]userId,表示用户的ID。
  2. 定义键名:
    • stockKey:这是优惠券库存的键名,格式为 'seckill:stock:' 加上 voucherId
    • orderKey:这是已购买(或已下单)该优惠券的用户的集合的键名,格式为 'seckill:order:' 加上 voucherId
  3. 检查库存:
    • 使用 redis.call('get', stockKey) 获取库存数量。
    • 使用 tonumber 函数将返回的字符串转换为数字。
    • 如果库存数量小于或等于0,则返回 1,表示库存不足。
  4. 检查用户是否已经购买:
    • 使用 redis.call('sismember', orderKey, userId) 检查用户是否已经在 orderKey 集合中。
    • 如果用户已经购买(即用户ID在集合中),则返回 2,表示用户已购买。
  5. 扣减库存并记录订单:
    • 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
      • 使用 redis.call('incrby', stockKey, -1) 扣减库存。
      • 使用 redis.call('sadd', orderKey, userId) 将用户ID添加到已购买集合中。
    • 这两个操作是原子性的,因为它们都在同一个Lua脚本中执行。
  6. 返回值:
    • 如果成功扣减库存并记录订单,则返回 0,表示操作成功。
    • 如果库存不足,返回 1
    • 如果用户已购买,返回 2
Part 3:基于阻塞队列的秒杀业务:
image-20240506204827037
private BlockingQueue<VoucherOrder >orderTasks = new ArrayBlockingQueue<>(1024*1024);
@Resource
private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    @Override
    public Result seckillVoucher(Long voucherId) {
    Long resultT =stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(),
            String.valueOf(UserHolder.getUser().getId())
    );
    int res = resultT.intValue();
    if(res!=0){
        return Result.fail(res==1?"库存不足":"不能重复下单");
    }
    long id = redisIDWorker.nextId("order");
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setId(id);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //在父线程就先获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        orderTasks.add(voucherOrder);
   	 return Result.ok(id);
    }
Part 4:异步下单业务:
private static final ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
      private IVoucherOrderService proxy;

    private class VoucherOrderHandler implements Runnable{
    @PostConstruct
    public void init(){
        seckill_order_executor.submit(new VoucherOrderHandler());
    }
    @Override
    public void run() {
        while (true){
            try {
                VoucherOrder voucherOrder = orderTasks.take();
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.error("处理订单异常",e);
            }
        }
    }
}
  1. @PostConstruct注解:

    这个注解标记在init()方法上,表示该方法会在类的实例创建并注入到Spring容器后立即调用。这里是用来启动任务处理的逻辑。

  2. init()方法:

    seckill_order_executor是一个线程池,可能是ExecutorService类型的实例。
    方法中提交了一个新的VoucherOrderHandler实例到线程池执行。这意味着每次初始化VoucherOrderHandler,都会启动一个新的线程来处理订单任务,形成一个循环执行的任务队列。

  3. run()方法:

    作为Runnable接口的实现,run()方法包含实际的业务逻辑。使用一个无限循环while (true),确保线程将持续运行,直到程序停止或线程被显式中断。

  4. try-catch块:

    在循环中,从orderTasks队列(可能是BlockingQueue类型)中获取一个VoucherOrder对象并调用handleVoucherOrder(voucherOrder)进行处理。如果在处理过程中出现任何异常,捕获并记录错误日志,但不会中断循环。这样可以确保即使处理过程中出现问题,线程仍能继续尝试处理下一个订单。

private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("不允许重复下单");
            return;
        }
        try {
            proxy.createVoucher(voucherOrder);
        }finally {
            lock.unlock();
        }
    }
注意这里的两处细节,userId与createVoucher都与父线程有关系,而在这里是异步的子线程,所以代理对象要在父线程就创建好,而userid也不能通过线程本地直接获取。这里用Redisson重复验证是为了让线程更加安全。
 @Transactional
    public void createVoucher (VoucherOrder voucherOrder){
         Long userid = voucherOrder.getId();
            long count = query().eq("user_id", userid).eq("voucher_id", voucherOrder.getVoucherId()).count();
            if (count > 0) {
                log.error("用户已经购买过一次!");
                return ;
            }
        boolean success = seckillVoucherService.update().setSql("stock = stock -1")
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock",0)
                .update();
        if(!success){
            log.error("库存不足");
            return;
        }
        save(voucherOrder);
    }
重复验证道理同上。

3.8.阻塞队列优化——消息队列:

基于上面的阻塞队列存在两个问题:1.内存问题 2.服务宕机的安全问题,所以我们必须进行优化。

在这里插入图片描述

1.独立于JVM,解决内存问题。
2.内部做了数据的持久化,避免安全问题。
image-20240507125920501
(1):基于List

image-20240507130119033

image-20240507130403241

(2):PubSub:

image-20240507134533877

image-20240507134641523

image-20240507135041817

image-20240507135131273

如果没有被任何频道订阅,会直接丢失。

(3):Stream:

发送消息:

image-20240507135521200

读取消息:

image-20240507135729110

image-20240507140014875

image-20240507140118244

3.9.基于Stream的消费者组:

image-20240507184608215

image-20240507184732468

image-20240507184942090

基于Java代码:

image-20240507192911659

image-20240507193426970

总结:

image-20240507193455216

3.10.代码改造:

image-20240507193619350

(1):创建消息队列:
XGROUP CREATE stream.orders g1 0 MKSTREAM
这是创建消费者组,自动创建了队列。
(2):修改Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) then
    return 1
end
if(redis.call('sismember',orderKey,userId) == 1) then
    return 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
redis.call('xadd','strem.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
这行代码是使用 Redis 的 XADD 命令向名为 “stream.orders” 的 Stream 数据结构中添加一条新的消息。该消息包含了三个字段:“userId”、“voucherId” 和 “id”,分别对应传入的参数 userId、voucherId 和 orderId 的值。这样可以将订单相关的信息存储在 Redis 中,并且可以通过 Stream 数据结构的功能对这些信息进行处理和查询。这种方式可以方便地实现消息队列、事件日志等功能。
(3):修改发消息业务代码:
@Override
public Result seckillVoucher(Long voucherId) {
long id = redisIDWorker.nextId("order");
Long resultT =stringRedisTemplate.execute(
        SECKILL_SCRIPT,
       Collections.emptyList(),
        voucherId.toString(),
        String.valueOf(UserHolder.getUser().getId()),String.valueOf(id)
);
int res = resultT.intValue();
if(res!=0){
    return Result.fail(res==1?"库存不足":"不能重复下单");
}
    proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(id);
}
(4):收消息:
 private class VoucherOrderHandler implements Runnable{
        String queueName = "stream.orders";
    @PostConstruct
    public void init(){
        seckill_order_executor.submit(new VoucherOrderHandler());
    }
    @Override
    public void run() {
        while (true){
            try {
                List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                if(list == null || list.isEmpty()){
                    continue;
                }
                //解析消息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                //ack
                stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
            } catch (Exception e) {
                log.error("处理订单异常",e);
                handlePendinglist();
            }
        }
    }

        private void handlePendinglist() {
            while (true){
                try {
                    List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    if(list == null || list.isEmpty()){
                        break;
                    }
                    //解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    handleVoucherOrder(voucherOrder);
                    //ack
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                } catch (Exception e) {
                    log.error("处理pending异常",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException interruptedException) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("不允许重复下单");
            return;
        }
        try {
            proxy.createVoucher(voucherOrder);
        }finally {
            lock.unlock();
        }
    }
我们分成三个部分来看:
4.1.正常接受处理消息:
String queueName = "stream.orders";
    @PostConstruct
    public void init(){
        seckill_order_executor.submit(new VoucherOrderHandler());
    }
    @Override
    public void run() {
        while (true){
            try {
                List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                if(list == null || list.isEmpty()){
                    continue;
                }
                //解析消息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                //ack
                stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
            } catch (Exception e) {
                log.error("处理订单异常",e);
                handlePendinglist();
            }
        }
    }
  1. 参数详解:
    • Consumer.from(“g1”, “c1”)
      • 这表示从消费者组 “g1” 中的消费者 “c1” 读取数据。在 RedisStreams 中,消费者组用于允许多个消费者并发地读取同一个 Stream。注意,这里因为没有c1,所以先创建了c1。
    • StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
      • StreamReadOptions.empty() 创建一个空的读取选项对象。
      • .count(1) 设置读取的条目数量为 1。这意味着,无论 Stream 中有多少未读的数据,此方法仅返回一个条目。
      • .block(Duration.ofSeconds(2)) 设置读取操作的阻塞时间。如果 Stream 中没有新的数据可读,那么这个方法会阻塞最多 2 秒,然后返回一个空列表。
    • StreamOffset.create(queueName, ReadOffset.lastConsumed())
      • 这定义了从哪个位置开始读取 Stream。
      • queueName 是 Stream 的名称。
      • ReadOffset.lastConsumed() 表示从消费者 “c1” 最后消费的位置开始读取。如果消费者 “c1” 还没有消费过任何数据,那么这个位置可能是 Stream 的开始位置或任何其他预定义的位置(例如 Stream 的起始点)。
等价于:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS mystream >
4.2.处理因异常而位于pending-list的消息:
  private void handlePendinglist() {
            while (true){
                try {
                    List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    if(list == null || list.isEmpty()){
                        break;
                    }
                    //解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    handleVoucherOrder(voucherOrder);
                    //ack
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                } catch (Exception e) {
                    log.error("处理pending异常",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException interruptedException) {
                        e.printStackTrace();
                    }
                }
            }
        }
与上面同理,只是这里是获取未确认的消息,这里的异常不做递归处理,因为外面本身就是true循环。
4.3.新增:
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("不允许重复下单");
            return;
        }
        try {
            proxy.createVoucher(voucherOrder);
        }finally {
            lock.unlock();
        }
    }

四:其他业务:

4.1.不存在该表中字段处理方法如下,后续需要自己维护:

@TableField(exist = false)
private String icon;
/**
 * 用户姓名
 */
@TableField(exist = false)
private String name;

4.2.点赞排序:

image-20240509175116540

改造前:
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//......
stringRedisTemplate.opsForSet().add(key,userId.toString());
//......
stringRedisTemplate.opsForSet().remove(key,userId.toString());
改造后:
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//...
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
//......
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
部分代码如下:
 @Override
    public Result queryBlogLikes(Long id) {
        String key = "blog:liked:" + id;
        Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);
        if(range == null || range.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        List<Long> ids =  range.stream().map(Long::valueOf).collect(Collectors.toList());
        List <UserDTO> users = userService.listByIds(ids).stream().map(
                user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());
        return Result.ok(users);
    }
注意,这里有个问题,取出的结果是有序的,都是我们在进行userService.listByIds(ids)时,数据库内部用的是WHERE id IN(1,5),而它在数据库内部的索引中,就优化了,导致无论怎么样,都是先1再5。可以用下面的sql语句优化:

image-20240509194119419

在MP中,就可以这么写:
@Override
public Result queryBlogLikes(Long id) {
    String key = "blog:liked:" + id;
    Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if(range == null || range.isEmpty()){
        return Result.ok(Collections.emptyList());
    }
    List<Long> ids =  range.stream().map(Long::valueOf).collect(Collectors.toList());
    String idStr = StrUtil.join(",", ids);
    List <UserDTO> users = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list()
            .stream().map(
            user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());
    return Result.ok(users);
}

4.3.共同关注:

基于Set集合的交集:
 @Override
    public Result followCommons(Long id) {
        Long userId = UserHolder.getUser().getId();
        Set<String> intersect = stringRedisTemplate.opsForSet().intersect("follows:" + userId, "follows:" + id);
        if(intersect==null||intersect.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        List<Long> collect = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
        List<UserDTO> userDTOS = userService.listByIds(collect).stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
        return Result.ok(userDTOS);
    }

4.4.关注推送——Feed流

image-20240509211548013

image-20240509212052545

image-20240509212325997

image-20240509213611387

image-20240509213803443

image-20240509213920882

以下是基于推模式:
image-20240509214104598
分页问题:

image-20240509214321964

image-20240509214603581

滚动查询:

image-20240512185712520

代码实现如下:
@Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
        Long userId = UserHolder.getUser().getId();
        String key = "feed:" + userId;
        Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
        if(typedTuples == null || typedTuples.isEmpty()){
            return Result.ok();
        }
        List<Long>ids = new ArrayList<>(typedTuples.size());
        long minTime = 0;
        int os = 1;
        for(ZSetOperations.TypedTuple <String> tuple : typedTuples){
            ids.add(Long.valueOf(tuple.getValue()));
            long time = tuple.getScore().longValue();
            if(time == minTime){
                os++;
            }
            else{
                minTime = time;
                os = 1;
            }
        }
        List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();
        for (Blog blog : blogs) {
            queryBlogUser(blog);
            queryBlogLikes(blog);
        }
        ScoreResult r = new ScoreResult();
        r.setList(blogs);
        r.setMinTime(minTime);
        r.setOffset(os);
        return Result.ok(r);
    }
下面是关键代码解释:
 Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);

参数

  1. key: 这是 Redis 中有序集合的键名。在这个例子中,它是通过 "feed:" + userId 构建的,其中 userId 是从 UserHolder 获取的当前用户的 ID。
  2. 0: 分数范围查询的起始分数(包含)。在这个例子中,查询从分数 0 开始。
  3. max: 分数范围查询的结束分数(不包含)。这是你要查询的最高分数(不包括这个分数)。
  4. offset: 分页查询的偏移量。这表示从有序集合中跳过多少个元素后再开始返回结果。
  5. 2: 这是一个限制参数,表示最多返回 2 个元素。然而,在实际应用中,这个限制可能不是非常有用,因为它会限制返回结果的数量。通常,你可能会根据分页参数(如每页大小)来动态设置这个值。
在这个例子中,通过 reverseRangeByScoreWithScores(key, 0, max, offset, 2),你正在查询从分数 0 到 max(不包含)之间的博客,按分数从高到低排序,并跳过 offset 个元素,最后最多返回 2 个结果。每个结果都是一个 TypedTuple,它包含博客的 ID 和其发布时间(作为分数)。
注意,分布时间越前的,id越小,所以从零开始,而且这里的os,及偏移量,是上次发布时间最早的时间的重复个数,不是在所有结果中的个数,比如时间戳为4 4 4 3,每次两个,第一次查出4与4,4虽然有三个,但是这里记录的是上一次查询中重复的最小时间,及两个,下一次,就从0到最大时间4,并且跳过2个,即第三个4开始查询。

4.5.附近商铺:

在这里插入图片描述

image-20240512202043735

我们可以先做数据预热,先在test中存入redis:
@Test
    public void test(){
    List<Shop> list = shopService.list();
    Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
    for(Map.Entry<Long, List<Shop>> entry : map.entrySet()){
      Long typeId = entry.getKey();
      String key = "shop:geo:" + typeId;
      List<Shop> value = entry.getValue();
      for(Shop shop : value){
          stringRedisTemplate.opsForGeo().add(key,new org.springframework.data.geo.Point(shop.getX(),shop.getY()),shop.getId().toString());
      }
    }
}
或者可以这么写:

image-20240512203734547

代码如下:
 @Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
       if (x == null || y == null){
           // 根据类型分页查询
           Page<Shop> page = query()
                   .eq("type_id", typeId)
                   .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
           // 返回数据
           return Result.ok(page.getRecords());
       }
        int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
        int end = (current + 1) * SystemConstants.DEFAULT_PAGE_SIZE;
        String key = SHOP_GEO_KEY + typeId;
        GeoResults<RedisGeoCommands.GeoLocation<String> >search = stringRedisTemplate.opsForGeo()
                .search(key,
                        GeoReference.fromCoordinate(x, y),
                        new Distance(5000),
                        RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance()
                                .limit(end));
        if(search == null){
            return Result.ok();
        }
        List<GeoResult<RedisGeoCommands.GeoLocation<String>>>contents = search.getContent();
        if(contents.size()<=from){
            return Result.ok();
        }
        //截取
        List <Long> ids = new ArrayList<>(contents.size());
        Map<String,Distance>distanceMap = new HashMap<>(contents.size());
        contents.stream().skip(from).forEach(result->{
            String shopIdStr = result.getContent().getName();
            ids.add(Long.valueOf(shopIdStr));
            Distance distance = result.getDistance();
            distanceMap.put(shopIdStr,distance);
        });
        List<Shop> shops = query().in("id",ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();
        for (Shop shop : shops) {
            shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
        }
        return Result.ok(shops);
    }

4.6.用户签到:

image-20240512215935791

image-20240512220014515

image-20240513163758603

  @Override
    public Result sign() {
        Long userId = UserHolder.getUser().getId();
        LocalDateTime now = LocalDateTime.now();
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key = USER_SIGN_KEY + userId + keySuffix;
        int day = now.getDayOfMonth() - 1;
        stringRedisTemplate.opsForValue().setBit(key, day, true);
        return Result.ok();
    }
String keySuffix = now.format(DateTimeFormatter.ofPattern(“:yyyyMM”));: 根据当前日期的年月(格式为"yyyyMM",例如"202302")创建一个字符串后缀,用于构建Redis键。
int day = now.getDayOfMonth() - 1;: 获取当前日期减1(因为数组或位数组通常从0开始计数,所以这里将日期减1来对应位数组的索引)。
stringRedisTemplate.opsForValue().setBit(key, day, true);: 使用Spring Data Redis的StringRedisTemplate操作位值。在Redis的键key对应的位数组中,设置第day位为true,表示用户在这一天签到了。
连续签到:

在这里插入图片描述

 @Override
    public Result signCount() {
        Long userId = UserHolder.getUser().getId();
        LocalDateTime now = LocalDateTime.now();
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key = USER_SIGN_KEY + userId + keySuffix;
        int day = now.getDayOfMonth();
        List <Long> result = stringRedisTemplate.opsForValue().bitField(
                key,
                BitFieldSubCommands.create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(day))
                        .valueAt(0)
        );
        if(result == null || result.isEmpty()){
            return Result.ok(0);
        }
        Long num = result.get(0);
        if(num == null || num == 0){
            return Result.ok(0);
        }
        int count = 0;
        while (true){
            if((num & 1) == 0){
                break;
            }else {
                count++;
            }
            num >>>= 1;
        }
        return Result.ok(count);
    }
关键代码如下:
List <Long> result = stringRedisTemplate.opsForValue().bitField(
                key,
                BitFieldSubCommands.create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(day))
                        .valueAt(0)
        );
这段代码使用 StringRedisTemplate 和 Redis 的 bitField 命令从一个 Redis 字符串键的指定偏移量开始,获取一个指定大小的位字段的无符号整数值,并将结果存储在一个 List<Long> 中。

4.7.UV统计:

image-20240513170528253

在这里插入图片描述

image-20240513171219913

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1670799.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Gitee添加仓库成员

1.进入你的项目 2.点击管理 3.左侧有个仓库管理 4.要加哪个加哪个&#xff0c;有三个方式~ 可以直接添加之前仓库合作过的开发者

【SpringBoot记录】从基本使用案例入手了解SpringBoot-数据访问-更改DataSource(2)

前言 通过上一个数据访问基本案例成功可以发现&#xff0c;SpringBoot在数据访问案例中也做了许多自动配置&#xff0c;上节只分析了其中的Properties。 而在自动配置包的jdbc下 还有其他配置文件。 根据名称可以大致了解他们的作用&#xff1a; DataSourceAutoConfiguration…

前端报错 SyntaxError: Unexpected number in JSON at position xxxx at JSON.parse

问题描述​ 控制台提示 SyntaxError: Unexpected number in JSON at position xxxx at JSON.parse 问题原因​ 原因&#xff1a;JSON 数据格式错误&#xff0c;是否符合 JSON 格式。 解决方法​ 应为json格式数据 什么是json格式数据 JSON&#xff08;JavaScript Object …

前端XHR请求数据

axios封装了XHR(XMLHttpRequest) 效果 项目结构 Jakarta EE9&#xff0c;Web项目。 无额外的maven依赖 1、Web页面 index.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title&…

MySQL基础入门【mysql初识 | 数据库操作 | 表操作 | sql数据类型】

博客主页&#xff1a;花果山~程序猿-CSDN博客 文章分栏&#xff1a;Linux_花果山~程序猿的博客-CSDN博客 关注我一起学习&#xff0c;一起进步&#xff0c;一起探索编程的无限可能吧&#xff01;让我们一起努力&#xff0c;一起成长&#xff01; 目录 一&#xff0c;为什么会有…

Vue3 动态引入图片: require is not defined报错

问题&#xff1a;在 Vue3 项目中&#xff0c;使用 require 引入图片&#xff0c;报错 require is not defined 原因&#xff1a; Vue3 使用的是 vite&#xff0c;而 require 是 Webpack 的方法。 官网说明&#xff1a; 解决代码&#xff1a; <template><div v-fo…

k8s coredns配置

1.coredns可根据集群具体数量修改pod数&#xff0c;官方推荐比例为5/1&#xff0c;即有15台服务器最好是3个pod。 2.coredns会继承pod所在主机的dns解析,修改了主机的dns解析之后&#xff0c;coredns有一段时间的缓存&#xff0c;重启coredns才会在集群内部立刻生效该解析。 …

Linux本地部署Nightingale夜莺监控并实现远程访问提高运维效率

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

C/C++实现汉诺塔游戏和详细解

C/C实现汉诺塔游戏和详细解析 需要详细代码可联系QQ&#xff1a;3324729792 引言 汉诺塔问题是一个经典的递归问题&#xff0c;起源于一个传说中的印度寺庙。在这个问题中&#xff0c;我们需要将所有的圆盘从一个柱子移动到另一个柱子上&#xff0c;且在移动过程中&#xff…

OpenAI放大招:不是GPT-5和搜索引擎,或推AI助理?

近日&#xff0c;关于 ChatGPT 的开发公司 OpenAI 将推出与谷歌搜索竞争的产品的传闻甚嚣尘上。有报道指出&#xff0c;OpenAI 计划增强 ChatGPT 的功能&#xff0c;并进军搜索引擎市场&#xff0c;新产品甚至可能在 5 月 13 日谷歌 I/O 大会的前一天发布。 然而&#xff0c;Op…

React 第三十五章 Fiber 双缓冲

我们可以从三个维度来理解 Fiber&#xff1a; 是一种架构&#xff0c;称之为 Fiber 架构是一种数据类型动态的工作单元 Fiber 架构 在 React v16之前&#xff0c;使用的是 Stack Reconciler&#xff0c;因此那个时候的 React 架构被称之为 Stack 架构。从 React v16 开始&am…

01-02-2

1、typedef的使用 a.语法 typedef 原名 别名&#xff1b;。 ​ typedef struct student {int num;char name[20];char sex; }stu,*pstu;//stu相当于struct student这个类型&#xff0c;*pstu相当于struct student * 别名的理解方法&#xff1a;若是字母前面有符号&#xff0…

uniapp编译H5解决ios的border-radius失效问题

1.解决方案 .card-itemA {width: 650rpx;height: 326rpx;box-shadow: 0rpx 0rpx 30rpx 14rpx rgba(236, 235, 236, 0.25);background: linear-gradient(180deg, #FFFFFF 0%, rgba(255, 255, 255, 0) 100%);border-radius: 60rpx;overflow: hidden;// 兼容ios的圆角问题transfor…

python实现星号打印出金字塔

#编程实现下列图形的打印 a input() for i in range(int(a)//21): num * * ((i1)*2-1) print(num.center(int(a), )) 编译后通过。输入20后得到下面的星号金字塔

Pikachu 靶场 RCE 通关解析

前言 Pikachu靶场是一种常见的网络安全训练平台&#xff0c;用于模拟真实世界中的网络攻击和防御场景。它提供了一系列的实验室环境&#xff0c;供安全专业人士、学生和爱好者练习和测试他们的技能。 Pikachu靶场的目的是帮助用户了解和掌握网络攻击的原理和技术&#xff0c;…

31万奖金池等你挑战!IJCAI 2024 第九届“信也科技杯”全球AI算法大赛正式开赛!聚焦AI尖端赛题!

赛事概况 随着语音合成技术的不断进步,合成语音与真实语音之间的界限变得模糊,这不仅对数据安全构成威胁,也对科技伦理提出了新的要求。 第九届“信也科技杯”全球AI算法大赛聚焦于语音深度鉴伪识别领域,旨在激发全球算法爱好者和专家的创新潜力,共同应对由人工智能技术发展带来…

Bean的生命周期与循环依赖

如有不对的地方&#xff0c;还请大佬指正 Bean生命周期 扫描类 得到 BeanDefinition(包含bean的class等属性值) 后在BeanFactoryPostProcessor对bean实例化之前对Bean的元数据进行操作&#xff0c;修改Bean的属性值、添加自定义的BeanDefinition 实例化非懒加载单例bean1. …

Android 系统省电软件分析

1、硬件耗电 主要有&#xff1a; 1、屏幕 2、CPU 3、WLAN 4、感应器 5、GPS(目前我们没有) 电量其实是目前手持设备最宝贵的资源之一&#xff0c;大多数设备都需要不断的充电来维持继续使用。不幸的是&#xff0c;对于开发者来说&#xff0c;电量优化是他们最后才会考虑的的事情…

【Linux】自动化构建工具make/Makefile和git介绍

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/qinjh_/category_12625432.html 目录 前言 Linux项目自动化构建工具-make/Makefile 举例 .PHONY 常见符号 依赖关系…

如何把root账号的文件修改为tank账号

linux系统 以当前用户命令创建的目录都是root的用户并且是只读的&#xff1a; 用优盘拷贝的文件及文件夹的权限是&#xff1a; 解决方案是&#xff1a;把root账号的文件修改为tank账号 在Linux系统中&#xff0c;如果您需要将属于root用户的文件或目录的所有权更改到另一个用…