黑马---Redis入门到实战【实战篇】

news2025/1/15 22:39:26

 一、短信登录

基于session实现短信登录的流程

实现发送短信验证码功能

 发送验证码功能:

    @Override
    public Result sendCode(String phone, HttpSession session) {
        //1.校验手机号
        if(RegexUtils.isPhoneInvalid(phone)){
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);
        //4.保存验证码到session
        session.setAttribute("code",code);
        //5.发送验证码
        log.debug("发送短信验证码成功,验证码:{}"+code);
        //返回ok
        return Result.ok();
    }

登录功能:

登录表单的实体类:

@Data
public class LoginFormDTO {
    private String phone;
    private String code;
    private String password;
}

登录逻辑代码实现:

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
            //如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //2.校验验证码
        Object cacheCode = session.getAttribute("code");
        String code = loginForm.getCode();
        if(cacheCode==null ||! cacheCode.toString().equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误!");
        }
        //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();
        //5.判断用户是否存在
        if(user==null) {
            //6.不存在,创建新用户并保存
            user=createUsrWithPhone(phone);
        }
        //7.保存用户信息到session中
        session.setAttribute("user",user);
        return null;   
    }

    private User createUsrWithPhone(String phone) {
        //1.创建用户
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
        //2.保存用户
        save(user);
        return user;
    }

实现登录校验拦截器

ThreadLocal 叫做本地线程变量,意思是说,ThreadLocal 中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal 为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。

注意,为了隐藏用户敏感信息,也为了节省ThreadLocal的空间,需要将User转为UserDTO返回给前端。

可以通过hutool工具类,在UserService里修改:

session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
public class UserHolder {
    private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

    public static void saveUser(UserDTO user){
        tl.set(user);
    }

    public static UserDTO getUser(){
        return tl.get();
    }

    public static void removeUser(){
        tl.remove();
    }
}

拦截器:

public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取session
        HttpSession session = request.getSession();
        //2.获取session中的用户
        Object user = session.getAttribute("user");
        //3.判断用户是否存在
        if(user==null) {
            //4.不存在,拦截,返回401状态码,代表未授权
            response.setStatus(401);
            return false;
        }
        //5.存在,保存用户信息到ThreadLocal
        UserHolder.saveUser((UserDTO) user);
        //6.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}

添加拦截器:

@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/voucher/**","/upload/**");
    }
}

登录并返回:

    @GetMapping("/me")
    public Result me(){
        // TODO 获取当前登录的用户并返回
        UserDTO user = UserHolder.getUser();
        return Result.ok(user);
    }

session共享的问题分析

session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题

session的替代方案应该满足:

        数据共享

        内存存储

        key、value结构

===>redis

Redis代替session的业务流程

基于Redis实现短信登录

拦截器修改:

public class LoginInterceptor implements HandlerInterceptor {

    private StringRedisTemplate stringRedisTemplate;

    public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        //判断token是否为空
        if(StrUtil.isBlank(token)){
            response.setStatus(401);
            return false;
        }
        String key=LOGIN_USER_KEY+token;
        //2.基于token获取redis中的用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        //3.判断用户是否存在
        if(userMap.isEmpty()){
            //不存在,拦截,返回401状态码
            response.setStatus(401);
            return false;
        }
        //5.存在,将查询到的Hash数据转为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        //6.保存用户到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.刷新token有效期
        stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
        //8.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}

MvcConfig修改:

@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**",
                        "/upload/**");
    }
}

UserServiceImpl修改:

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result sendCode(String phone, HttpSession session) {
        //1.校验手机号
        if(RegexUtils.isPhoneInvalid(phone)){
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);
        //4.保存验证码到redis中
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
        //5.发送验证码
        log.debug("发送短信验证码成功,验证码:{}"+code);
        //返回ok
        return Result.ok();
    }

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
            //如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //2.校验验证码
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        String code = loginForm.getCode();
        if(cacheCode==null ||!cacheCode.equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误!");
        }
        //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();
        //5.判断用户是否存在
        if(user==null) {
            //6.不存在,创建新用户并保存
            user=createUsrWithPhone(phone);
        }
        //保存用户信息到redis中
        //1.随机生成token,作为登录令牌
        String token = UUID.randomUUID().toString(true);  //true代表isSimple,即不带中划线
        //2.将User对象转为Hash存储
        UserDTO userDTO=BeanUtil.copyProperties(user,UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
        String tokenKey=LOGIN_USER_KEY+token;
        //7.存储
        stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
        //设置token有效期
        stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
        return Result.ok();
    }

    private User createUsrWithPhone(String phone) {
        //1.创建用户
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
        //2.保存用户
        save(user);
        return user;
    }
}

Redis代替session需要考虑的问题:

选择合适的数据结构

选择合适的key

选择合适的存储粒度

解决登录状态刷新的问题

RefreshTokenInterceptor

public class RefreshTokenInterceptor implements HandlerInterceptor {

    private  StringRedisTemplate stringRedisTemplate;

    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        //判断token是否为空
        if(StrUtil.isBlank(token)){
            return true;
        }
        String key=LOGIN_USER_KEY+token;
        //2.基于token获取redis中的用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        //3.判断用户是否存在
        if(userMap.isEmpty()){
            return true;
        }
        //5.存在,将查询到的Hash数据转为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        //6.保存用户到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.刷新token有效期
        stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
        //8.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}

LoginInterceptor

public class LoginInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //判断是否需要拦截(ThreadLocal中是否有用户)
        if(UserHolder.getUser()==null){
            //没有,需要拦截,设置状态码
            response.setStatus(401);
            return false;
        }
        //有用户,则放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}

配置添加拦截器

        注意通过order控制拦截器执行顺序,order越小越先执行

@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**",
                        "/upload/**").order(1);
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
    }
}

二、商户缓存查询

什么是缓存

缓存就是数据交换的缓冲区(称作Cache),是存储数据的临时地方,一般读写性能较高

添加Redis缓存

    /**
     * 根据id查询商铺信息
     * @param id 商铺id
     * @return 商铺详情数据
     */
    @GetMapping("/{id}")
    public Result queryShopById(@PathVariable("id") Long id) {
        return shopService.queryById(id);
    }

ShopServiceImpl:

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result queryById(Long id) {
        String key=CACHE_SHOP_KEY+id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在,返回错误
        if(shop==null){
            return Result.fail("店铺不存在!");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
        //7.返回
        return Result.ok(shop);
    }
}

练习:给店铺类型查询业务添加缓存

@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {

    @Autowired
    private ShopTypeMapper shopTypeMapper;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Result queryBatch() {
        String key="CACHE_SHOP_TYPE_KEY";
        String jsonType = stringRedisTemplate.opsForValue().get(key);
        if(StrUtil.isNotBlank(jsonType)){
            List<ShopType> shopTypes = JSONUtil.toList(jsonType, ShopType.class);
            return Result.ok(shopTypes);
        }
        List<ShopType> shopTypes = shopTypeMapper.selectList(new QueryWrapper<>());
        if(shopTypes.isEmpty()){
            return Result.fail("您查询的页面不存在!");
        }
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopTypes));
        return Result.ok(shopTypes);
    }
}

缓存更新策略

 业务场景:

低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存

高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存

主动更新策略:

 操作缓存和数据库时有三个问题需要考虑:

1.删除缓存还是更新缓存?

        更新缓存:每次更新都更新缓存,无效写操作较多

        删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)

 2.如何保证缓存与数据库的操作的同时成功或失败?

        单体系统:将缓存与数据库操作放在一个事务

        分布式系统:利用TCC等分布式事务方案

3.先操作缓存还是先操作数据库?

 总结:

     

实现缓存与数据库的双写一致

 案例:给查询商铺的缓存添加超时剔除和主动更新的策略

修改ShopController中的业务逻辑,满足下面的需求:

①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间

②根据id修改店铺时,先修改数据库,再删除缓存

更新操作:

    @Override
    @Transactional
    public Result update(Shop shop) {
        Long id=shop.getId();
        if(id==null){
            return Result.fail("店铺id不能为空!");
        }
        //1.更新数据库
        updateById(shop);
        //2.删除缓存
        stringRedisTemplate.delete(CACHE_SHOP_KEY+shop.getId());
        return Result.ok();
    }

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

编码解决商铺查询的缓存穿透问题

    @Override
    public Result queryById(Long id) {
        String key=CACHE_SHOP_KEY+id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //命中的是否为空值
        if(shopJson!=null){
            return Result.fail("店铺信息不存在!");
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在,返回错误
        if(shop==null){
            //将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
            return Result.fail("店铺信息不存在!");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
        //7.返回
        return Result.ok(shop);
    }

总结:

缓存穿透产生的原因是什么?

        用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

        缓存null值

        布隆过滤

        增强id的复杂度,避免被猜测id规律

        做好数据的基础格式校验

        加强用户权限校验

        做好热点参数的限流

缓存雪崩

缓存雪崩是指同一时刻大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

给不同的Key的TTL添加随机值

利用Redis集群提高服务的可用性

给缓存业务添加降级限流策略

给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

 常见的解决方案有两种:

互斥锁

逻辑过期

互斥锁和逻辑过期对比:

 互斥锁和逻辑过期优缺点:

利用互斥锁解决缓存击穿问题

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题

    public Shop queryWithMutex(Long id){
        String key = CACHE_SHOP_KEY+id;
        //从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //判断缓存是否命中
        if(StrUtil.isNotBlank(shopJson)){
            //命中则直接返回数据
            return JSONUtil.toBean(shopJson, Shop.class);
        }
        //判断是否是缓存穿透
        if(shopJson!=null){
            return null;
        }
        //实现缓存重建
        //1.获取互斥锁
        String lockKey=LOCK_SHOP_KEY+id;
       try{
           boolean isLock = tryLock(lockKey);
           //2.判断是否获取成功
           if(!isLock) {
               //3.失败,则休眠并重试
               Thread.sleep(50);
               queryWithMutex(id);
           }
           //4.成功,根据id查询数据库
           Shop shop = getById(id);
           //5.不存在,返回错误
           if(shop==null){
               stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
               return null;
           }
           //6.存在,写入redis
           stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
           return shop;
       }catch(InterruptedException e){
           throw new RuntimeException(e);
       }finally{
           unlock(lockKey);
       }
    }

    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
        return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
    }

    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }

基于逻辑过期方式解决缓存击穿问题

需求:修改根据id删除商铺的业务,基于逻辑过期的方式来解决缓存击穿问题

添加逻辑过期时间:

@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}

整体实现逻辑:

   private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);

    public Shop queryWithLogicalExpire(Long id){
        String key=CACHE_SHOP_KEY+id;
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //缓存未命中
        if(StrUtil.isBlank(shopJson)){
            return null;
        }
        //命中,需要先把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
        Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())) {
            //未过期,直接返回店铺信息
            return shop;
        }
        //已过期,需要缓存重建
        String lockKey=LOCK_SHOP_KEY+id;
        //获取互斥锁
        boolean isLock = tryLock(lockKey);
        //判断是否获取锁成功
        if(isLock) {
            //成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try {
                    saveShop2Redis(id, CACHE_SHOP_TTL);
                }catch(Exception e){
                    throw new RuntimeException(e);
                }finally{
                    unlock(lockKey);
                }
            });
        }
        //返过期的商铺信息
    }

    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
        return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
    }

    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }

    public void saveShop2Redis(Long id,Long expireSeconds){
        //1.查询店铺数据
        Shop shop = getById(id);
        //2.封装逻辑过期时间
        RedisData redisData = new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
        //3.写入Redis
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
    }

封装Redis工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列要求:

方法1:任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间

方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题

方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题

方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

@Component
public class CacheClient {
    private final StringRedisTemplate stringRedisTemplate;

    public CacheClient(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }

    public void set(String key, Object value, Long time, TimeUnit unit){
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }

    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
        //设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit){
        String key=keyPrefix+id;
        String json = stringRedisTemplate.opsForValue().get(key);
        if(StrUtil.isNotBlank(json)){
            return JSONUtil.toBean(json,type);
        }
        //判断命中的是否为空值
        if(json!=null){
            return null;
        }
        R r = dbFallback.apply(id);
        if(r==null){
            stringRedisTemplate.opsForValue().set(key,"",time,unit);
            return null;
        }
        this.set(key,r,time,unit);
        return r;
    }

    private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);

    public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
        String key=keyPrefix+id;
        String json = stringRedisTemplate.opsForValue().get(key);
        if(StrUtil.isBlank(json)){
            return null;
        }
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())){
            //未过期,直接返回店铺信息
            return r;
        }
        //已过期,需要缓存重建
        String lockKey=LOCK_SHOP_KEY+id;
        boolean isLock = tryLock(lockKey);
        if(isLock){
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try{
                    //重建缓存
                    //1.查询数据库
                    R apply = dbFallback.apply(id);
                    //2.存入redis
                    this.setWithLogicalExpire(key,apply,time,unit);
                }catch (Exception e){
                    throw new RuntimeException(e);
                }finally{
                    unlock(key);
                }
            });
        }
        return r;
    }

    private boolean tryLock(String key){
        Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }
}

缓存总结

认识缓存

什么是缓存?

        一种具备高效读写能力的数据暂存区域

缓存的作用?

        降低后端负载

        提高读写响应速度

缓存的成本?

        开发成本

        运维成本

        一致性问题

缓存更新策略

三种策略:

        内存淘汰:redis自带的内存淘汰机制

        过期淘汰:利用expire命令给数据设置过期时间

        主动更新:主动完成数据库与缓存的同时更新

策略选择:

        低一致性需求:内存淘汰或过期淘汰

        高一致性需求:

                主动更新为主

                过期淘汰兜底

主动更新方案

        Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新

                一致性良好、实现难度一般

        Read/Write Through:缓存与数据库集成为一个服务,服务保证两者的一致性,对外暴露API接口,调用者调用API,无需知道自己操作的是数据库还是缓存,不关心一致性

                一致性优秀、实现复杂、性能一般

        Write Back:缓存调用者的CRUD都针对缓存完成。有独立线程异步将缓存数据写到数据库,实现最终一致

                一致性差、性能好、实现复杂

Cache Aside模式选择

更新缓存还是删除缓存?

        更新缓存会产生无效更新,并且存在较大的线程安全问题

        删除缓存本质是延迟更新,没有无效更新,线程安全问题相对较低

先操作数据库还是缓存?

        先更新数据,再删除缓存——在满足原子性的情况下,安全问题概率较低

        先删除缓存,再更新数据库——安全问题概率极高

如何确保数据库与缓存操作原子性?

        单体系统——利用事务机制

        分布式系统——利用分布式事务机制

最佳实践

查询数据时

1.先查询缓存

2.如果缓存命中,直接返回

3.如果缓存未命中,则查询数据库

4.将数据库数据写入缓存

5.返回结果

修改数据库时:

1.先修改数据库

2.然后删除缓存

===>确保两者的原子性

缓存穿透

产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

解决方案:

①缓存空对象:

        思路:对于不存在的数据也在redis建立缓存,值为空,并设置一个较短的TTL时间

        优点:实现简单,便于维护

        缺点:额外的内存消耗、短期的数据不一致问题

②布隆过滤:

        思路:利用布隆过滤算法,在请求进入Redis之前先判断是否存在,如果不存在则直接拒绝请求

        优点:内存占用少

        缺点:实现复杂、存在误判的可能性

③其他:

        做好数据的基础格式校验

        加强用户权限校验

        做好热点参数的限流

缓存雪崩

产生原因:在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

        给不同的Key的TTL添加随机值

        利用Redis集群提高服务的可用性

        给缓存业务添加降级限流策略

        给业务添加多级缓存

缓存击穿(热点key)

产生原因:

        热点key:①在某一时段被高并发访问 ②缓存重建耗时较长

        热点key突然过期,因为缓存重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击

解决方案:

互斥锁:

        思路:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待

        优点:①实现简单 ②没有额外内存消耗 ③一致性好

        缺点:①等待导致性能下降

        缺点:有死锁风险

逻辑过期:

        思路:

                ①热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存

                ②重建缓存也通过互斥锁来保证单线程执行

                ③重建缓存利用独立线程异步执行

                ④其他线程无需等待,直接查询到旧数据即可

        优点:现成无需等待,性能较好

        缺点:

                ①不保证一致性

                ②有额外内存消耗

                ③实现复杂

三、优惠券秒杀

 全局ID生成器

每个店铺都可以发布优惠券:

 当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:

id的规律性太明显

受单表数据量的限制

全局ID生成器:

全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

 ID的组成部分:

符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,可以支持每秒产生2^32个不同的ID

Redis实现全局唯一ID

@Component
public class RedisIdWorker {
    //开始时间戳 2023-01-01 00:00:00
    private static final long BEGIN_TIMESTAMP=1672531200L;
    private static final int COUNT_BITS=32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }

    public long nextId(String keyPrefix){
        //1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp=nowSecond-BEGIN_TIMESTAMP;
        //2.生成序列号
        //2.1 获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        //2.2 自增长
        long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
        //3.拼接并返回
        return timestamp<<COUNT_BITS | count;
    }

}

测试代码:

    @Resource
    private RedisIdWorker redisIdWorker;

    private ExecutorService es= Executors.newFixedThreadPool(500);
    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(300);
        Runnable task=()->{
            for (int i = 0; i < 100; i++) {
                long id = redisIdWorker.nextId("order");
                System.out.println("id="+id);
            }
            latch.countDown();
        };
        latch.countDown();
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 300; i++) {
            es.submit(task);
        }
        latch.await();
        long end=System.currentTimeMillis();
        System.out.println("time="+(end-begin));
    }

总结:

全局唯一ID生成策略:

UUID

Redis自增

snowflake算法

数据库自增

Redis自增ID策略:

每天一个key,方便统计订单量

ID构造是 时间戳+计数器

添加优惠券

每个店铺都可以发布优惠券,分为评价券和特价劵。平价券可以任意购买,而特价券需要秒杀抢购:

表关系如下:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等

tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。

通过postman添加优惠券:

{
    "shopId":1,
    "title":"100元代金券",
    "subTitle":"周一至周五均可使用",
    "rules":"全场通用\\n无需预约\\n可无限叠加\\n不兑换、不找零\\n仅限堂食",
    "payValue":8000,
    "actualValue":10000,
    "type":1,
    "stock":100,
    "beginTime":"2023-04-29T10:09:17",
    "endTime":"2023-04-29T23:09:04"
}

实现秒杀下单

下单时需要判断两点:

秒杀是否开始或结束,如果尚未开始或已经结束则无法下单

库存是否充足,不足则无法下单

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //2.判断秒杀是否开始
        if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        //3.判断秒杀是否结束
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){
            //已经结束
            return Result.fail("秒杀已经结束!");
        }
        //4.判断库存是否充足
        if(voucher.getStock()<1){
            //库存不足
            return Result.fail("库存不足!");
        }
        //5.扣减库存
        boolean success=seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id",voucherId)
                .update();
        if(!success){
            //扣减失败
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1 订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2 用户id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //6.3 代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        //7.返回订单id
        return Result.ok(orderId);
    }
}

 

超卖问题

用jmeter模拟高并发:

 模拟结果:

 原因:

 超卖问题就是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

        版本号法

        CAS法

 乐观锁解决超卖问题

        boolean success=seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
                .update();

乐观锁问题:成功率太低

乐观锁优化:

        boolean success=seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id",voucherId).gt("stock",0)
                .update();

运行结果:50%的失败率,订单恰好没有超卖

超卖这样的线程安全问题,解决方案有哪些?

1.悲观锁:添加同步锁,让线程串行执行

        优点:简单粗暴

        缺点:性能一般

2.乐观锁:不加锁,在更新时判断是否有其他线程在修改

        优点:性能好

        缺点:存在成功率低的问题

实现一人一单功能

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

 代码实现:

        //一人一单
        Long userId = UserHolder.getUser().getId();
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if(count>0){
            //用户已经存在
            return Result.fail("该用户已经购买过一次!");
        }

对于高并发,解决线程安全问题:

(先提交事务,再释放锁)

第一步:添加依赖

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>

第二步:暴露代理对象 @EnableAspectJAutoProxy(exposeProxy=true)

@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {

    public static void main(String[] args) {
        SpringApplication.run(HmDianPingApplication.class, args);
    }

}

第三步:对方法加锁

        如果用this.createVoucherOrder(voucherId); 首先我们要知道事务如果想生效,需要Spring对该类做动态代理,拿到了代理对象,来对事务进行处理

        这个this是没有事务功能的,因为拿到的目标对象(非代理对象)

        所以需要拿到事务代理对象AopContext.currentProxy()

        synchronized (userId.toString().intern()) {
            //获取代理对象(事务)
            IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }
    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        //一人一单
        Long userId = UserHolder.getUser().getId();
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0) {
            //用户已经存在
            return Result.fail("该用户已经购买过一次!");
        }
        //5.扣减库存
        boolean success = seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherId).gt("stock", 0)
                .update();
        if (!success) {
            //扣减失败
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1 订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2 用户id
        voucherOrder.setUserId(userId);
        //6.3 代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        //7.返回订单id
        return Result.ok(orderId);

    }

压测结果:

集群模式下线程锁失效

因为集群模式下,每个JVM有各自的锁监视器

四、分布式锁

分布式锁基本原理

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

 

分布式锁的实现:

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见有三种:

Redis分布式锁的基本实现

实现分布式锁时需要实现的两个基本方法:

获取锁:

        互斥:确保只能有一个线程获取锁

        #添加锁,利用setnx的互斥特性

        setnx lock thread1

        #添加锁过期时间,避免服务宕机引起的死锁

        expire lock 10

        非阻塞:尝试一次,成功返回true,失败返回false

释放锁:

        手动释放:del key

        超时释放:获取锁时添加一个超时时间

实现Redis分布式锁初级版本

需求:定义一个类,实现下面的接口,利用Redis实现分布式锁功能

@AllArgsConstructor
public class SimpleRedisLock implements ILock{

    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX="lock:";

    @Override
    public boolean tryLock(long timeoutSec) {
        long threadId = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        //释放锁
        stringRedisTemplate.delete(KEY_PREFIX+name);
    }
}

 优化一人一单:

        Long userId=UserHolder.getUser().getId();
        //创建锁对象
        SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
        boolean isLock = lock.tryLock(1200);
        if(!isLock) {
            //获取锁失败,返回错误或重试
            return Result.fail("一个人只能下一单!");
        }
        try {
            //获取代理对象(事务)
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }finally {
            lock.unlock();
        }

Redis分布式锁误删

 解决方案:在释放锁前判断这个锁是否还属于自己

 

改进Redis的分布式锁

需求:修改之前的分布式锁实现,满足:

1.在获取锁时存入线程标识(可以用UUID表示)

2.在释放锁时现货区锁中的线程标识,判断是否与当前线程标识一致

        如果一致则释放锁

        如果不一致则不释放锁

@AllArgsConstructor
public class SimpleRedisLock implements ILock{

    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX="lock:";
    private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";

    @Override
    public boolean tryLock(long timeoutSec) {
        String threadId =ID_PREFIX + Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        //获取线程标识
        String threadId=ID_PREFIX + Thread.currentThread().getId();
        //获取锁中的标识
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        //判断标识是否一致
        if(threadId.equals(id)) {
            //释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

分布式锁的原子性问题

判断和释放锁是两个动作,这中间可能会发生并发问题

Lua脚本解决多条命令原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。

这里重点介绍Redis提供的调用函数,语法如下:

#执行redis命令
redis.call('命令名称','key','其它参数',...)
#比如,我们要执行set name jack
redis.call('set','name','jack')

#如果我们要执行set name Rose,再执行get name,则脚本如下
redis.call('set','name','Rose')
local name = redis.call('get','name')
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本常见命令如下:

执行无参脚本:

执行有参脚本:

 如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入keys数组,其它参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数:

基于Redis的分布式锁

释放锁的业务流程:

        1.获取锁中的线程标识

        2.判断是否与指定的标识(当前线程标识)一致

        3.如果一致则释放锁(删除)

        4.如果不一致则什么都不做

-- 锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]

-- 获取锁中的线程标识 get key
local id = redis.call('get',key)
--比较线程标识与锁中的标识是否一致
if(id == threadId) then
    -- 释放锁
   return redis.call('del',key)
end
return 0

脚本优化:

if(redis.call('get',KEYS[1]==ARGV[1])) then
    return redis.call('del',KEYS[1])
end
return 0

再次改进Redis的分布式锁

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate调用Lua脚本的API如下:

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT=new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
    @Override
    public void unlock() {
        //调用lua脚本
        stringRedisTemplate.execute(UNLOCK_SCRIPT, 
                Collections.singletonList(KEY_PREFIX+name),
                ID_PREFIX+Thread.currentThread().getId());
    }

基于Redis的分布式锁实现思路:

        利用setnx nx ex获取锁,并设置过期时间,保存线程标识

        释放锁时先判断线程标识与自己是否一致,一致则删除锁

特性:

        利用set nx满足互斥性

        利用set ex保证故障时依然能释放,避免死锁,提高安全性

        利用Redis集群保证高可用和高并发特性

Redisson功能介绍

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redisson快速入门

第一步:引入依赖

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.20.0</version>
        </dependency>

第二步:配置Redisson

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        //配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.202.128:6379").setPassword("123321");
        //创建RedissonClient对象
        return Redisson.create(config);
    }
}

3.使用Redisson分布式锁

使用jmeter压测通过,一个用户只能下一单:

Redisson可重入锁原理

redis无法实现可重入锁的原因:

 Redisson可以实现可重入锁的原理:

 lua脚本实现:

 测试代码:

@SpringBootTest
@Slf4j
public class RedissonTest {

    @Autowired
    private RedissonClient redissonClient;

    private RLock lock;

    @BeforeEach
    void setup(){
        lock=redissonClient.getLock("order");
    }

    @Test
    void method1(){
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("获取锁失败……1");
            return;
        }
        try{
            log.info("获取锁成功……1");
            method2();
            log.info("开始执行业务……1");
        }finally{
            log.warn("准备释放锁……1");
            lock.unlock();
        }
    }

    void method2(){
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("获取锁失败……2");
            return;
        }
        try{
            log.info("获取锁成功……2");
            log.info("开始执行业务……2");
        }finally{
            log.warn("准备释放锁……2");
            lock.unlock();
        }
    }
}

运行截图:

 查看源码实现:

trylock:

 unlock:

Redisson的锁重试和WatchDog机制

 Redisson分布式锁原理:

可重入:利用hash结构记录线程id和重入次数

可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制

超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

修改method1:

        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);

跟入源码:

注:redis.call('pttl',KEYS[1]);中的pttl返回以毫秒为单位,而ttl返回以秒为单位

如果该线程拿到锁,则返回nil;否则返回剩余有效期

代码精华部分:

            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }

怎么确保锁是因为业务结束而释放,而非阻塞导致的超时释放呢?

自动更新续期:

                    scheduleExpirationRenewal(threadId);

 renewExpiration:更新有效期

 实现永不过期的代码:

        Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock {} expiration", getRawName(), e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

那么,何时取消计时刷新呢?

在锁释放的时候结束计时

Redisson的multiLock问题

redisson分布式锁主从一致问题:如果主节点宕机,而未将数据同步给从节点,可能会导致并发问题

解决方案:所有节点都变成独立的redis节点

总结:

1)不可重入的Redis分布式锁:

原理:利用setnx的互斥性,利用ex避免死锁;释放锁时判断线程标识

缺陷:不可重入、无法重试、锁超时失败

2)可重入的Redis分布式锁:

原理:利用hash结果,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待

缺陷:redis宕机引起锁失效问题

3)Redisson的multiLock:

原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功

缺陷:运维成本高、实现复杂

五、秒杀优化

异步秒杀思路

 怎么保证一人一单呢?Set集合

 怎么确保代码执行的原子性?lua脚本

 基于Redis完成秒杀资格判断

需求:

①新增秒杀优惠券的同时,将优惠券的信息保存到Redis中


    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存秒杀库存到Redis中
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());

    }

新增优惠券:

②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey))<=0) then
    -- 3.2 库存不足,返回1
    return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then
    -- 3.3 存在,说明是重复下单,返回2
    return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0

③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 取出用户
        Long userId = UserHolder.getUser().getId();
        //1. 执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString()
        );
        int r=result.intValue();
        //2. 判断结果是否为0
        if(r != 0) {
            //2.1 不为0,没有购买资格
            return Result.fail(r==1 ? "库存不足" : "不能重复下单");
        }
        //2.1 为0,有购买资格,把下单信息保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");
        // TODO 保存阻塞队列
        //3.返回订单id
        return Result.ok(orderId);

    }

④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

基于阻塞队列实现秒杀业务

源代码有大改动,包括之前写好的createVoucherOrder,所以附上完整源码:

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
   private IVoucherOrderService proxy;
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    private class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
            while(true){
                try {
                    //1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();//没有任务时会自动阻塞
                    //2.创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (InterruptedException e) {
                    log.error("处理订单异常",e);
                }
            }
        }
    }

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //1.获取用户
        Long userId = voucherOrder.getUserId();
        //2.创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //3.获取锁
        boolean isLock = lock.tryLock();
        //4.判断是否获取锁成功
        if(!isLock){
            log.error("不允许重复下单");
            return;
        }
        try{
            proxy.createVoucherOrder(voucherOrder);
        }finally{
            lock.unlock();
        }

    }

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 取出用户
        Long userId = UserHolder.getUser().getId();
        //1. 执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString()
        );
        int r=result.intValue();
        //2. 判断结果是否为0
        if(r != 0) {
            //2.1 不为0,没有购买资格
            return Result.fail(r==1 ? "库存不足" : "不能重复下单");
        }
        //2.1 为0,有购买资格,把下单信息保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");
        // TODO 保存阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        orderTasks.add(voucherOrder);
        //3.返回订单id
        return Result.ok(orderId);

    }

    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        //一人一单
        Long userId = voucherOrder.getUserId();
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        if (count > 0) {
            //用户已经存在
            log.error("用户已经购买过一次");
            return;
        }
        //5.扣减库存
        boolean success = seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                .update();
        if (!success) {
            //扣减失败
            log.error("库存不足");
        }
        save(voucherOrder);

    }
}

秒杀业务的优化思路是什么?

①先利用Redis完成库存余量、一人一单判断,完成抢单业务

②再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

内存限制问题

数据安全问题

六、Redis消息队列

认识消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理(Message Broker)

生产者:发送消息到消息队列

消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:

list结构:基于List结构模拟消息队列

PubSub:基本的点对点消息模型

Stream:比较完善的消息队列模型

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果

队列的入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用的是BRPOP或者BLPOP来实现阻塞效果。

示例:

基于List的消息队列有哪些优缺点?

优点:

        利用Redis存储,不受限于JVM内存上限

        基于Redis的持久化机制,数据安全性有保证

        可以满足消息有序性

缺点:

        无法避免消息丢失

        只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

SUBSCRIBE channel [channel] :订阅一个或多个频道

PUBLISH channel msg:向一个频道发送消息

PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道

        pattern:

        ?代表一个字符

        * 代表零个或多个字符

        [ae] 代表可以是a也可以是e

示例:

 

基于PubSub的消息队列有哪些优缺点?

优点:

        采用发布订阅模型,支持多生产、多消费

缺点:

        不支持数据持久化

        无法避免消息丢失

        消息堆积有上限,超出时数据丢失

 基于Stream的消息队列

Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列

示例1:

 

===>结论 :基于stream的消息队列,消息会被持久化存储,可以被多个消费者读取,也可以被一个消费者读取多次

示例2:阻塞等待,block 0 代表一直阻塞直到有新消息的到来

 

消费者应用:

 bug:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

消息漏读:

STREAM类型消息队列的XREAD命令特点:

消息可回溯

一个消息可以被多个消费者读取

可以阻塞读取

有消息漏读的风险

基于Stream的消费队列---消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

 

 ==>队列中的消息如果还想用,ID从0开始;如果不想用了,ID从$开始

示例:

Java代码实现:

STREAM类型消息队列的XREADGROUP命令特点:

消息可回溯

可以多消费者争抢消息,加快消费速度

可以阻塞读取

没有消息漏读的风险

有消息确认机制,保证消息至少被消费一次

总结:

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

①创建一个Stream类型的消息队列,名为stream.orders

参数MKSTREAM,在创建组的时候,如果消息队列不存在,则自动创建组和队列

②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

添加orderId:

--1.3 订单id
local orderId = ARGV[3]

发送消息到队列中

-- 3.6 发送消息到队列中
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)

改造Java代码:

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 取出用户
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        //1. 执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString(),String.valueOf(orderId)
        );
        int r=result.intValue();
        //2. 判断结果是否为0
        if(r != 0) {
            //2.1 不为0,没有购买资格
            return Result.fail(r==1 ? "库存不足" : "不能重复下单");
        }
       proxy=(IVoucherOrderService) AopContext.currentProxy();
        //3.返回订单id
        return Result.ok(orderId);
    }

③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

    private class VoucherOrderHandler implements Runnable{
        String queueName = "stream.orders";
        @Override
        public void run() {
            while(true) {
                try {
                    //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2.判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        // 如果获取失败,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //3.创建订单
                    handleVoucherOrder(voucherOrder);
                    //4.ACK确认 SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }
        }

        private void handlePendingList(){
            while(true){
                try{
                    //1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2.判断消息是否获取成功
                    if(list == null || list.isEmpty()){
                        //没有消息,说明pending-list中没有异常消息,结束循环
                        break;
                    }
                    //3.解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //4.如果获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
                    //5.ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                }catch(Exception e){
                    log.error("处理pending-list订单异常",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

七、达人探店

发布探店笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:

tb_blog:探店笔记表,包含笔记中的标题、文字、图片等

tb_blog_comments:其他用户对探店笔记的评价

第一步:将SystemConstants的常量改为部署在nginx项目下的imgs目录

    public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";

测试效果:

 

实现查看发布探店笔记的接口

Controller接口:

    @GetMapping("/{id}")
    public Result queryBlogById(@PathVariable("id") Long id){
        return blogService.queryBlogById();
    }

Service实现:

    @Override
    public Result queryBlogById(Long id) {
        //1.查询blog
        Blog blog = getById(id);
        if(blog == null){
            return Result.fail("笔记不存在!");
        }
        //2.查询blog有关的用户
        queryBlogUser(blog);
        return Result.ok(blog);
    }

    private void queryBlogUser(Blog blog) {
        Long userId = blog.getUserId();
        User user = userService.getById(userId);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }

点赞

需求:

同一个用户只能点赞一次,再次点击则取消点赞

如果当前用户已经点赞,则点赞按钮高亮显示(前段已实现,判断字段Blog类的isLike属性)

实现步骤:

①给Blog类中添加一个isLike字段,标识是否被当前用户点赞

    /**
     * 是否点赞过了
     */
    @TableField(exist = false)
    private Boolean isLike;

②修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1

③修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值給isLike字段

④修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段

    @PutMapping("/like/{id}")
    public Result likeBlog(@PathVariable("id") Long id) {
        return blogService.likeBlog(id);
    }
    @Override
    public Result queryHotBlog(Integer current) {
        // 根据用户查询
        Page<Blog> page = query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog->{
            this.queryBlogUser(blog);
            this.isBlogLiked(blog);
        });
        return Result.ok(records);
    }

    @Override
    public Result queryBlogById(Long id) {
        //1.查询blog
        Blog blog = getById(id);
        if(blog == null){
            return Result.fail("笔记不存在!");
        }
        //2.查询blog有关的用户
        queryBlogUser(blog);
        //3.查询blog是否被点赞
        isBlogLiked(blog);
        return Result.ok(blog);
    }

    private void isBlogLiked(Blog blog) {
        Long userId = UserHolder.getUser().getId();
        String key = "blog:liked:" + blog.getId();
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        blog.setIsLike(BooleanUtil.isTrue(isMember));
    }

    @Override
    public Result likeBlog(Long id) {
        //1. 获取登录用户
        Long userId = UserHolder.getUser().getId();
        //2. 判断当前登录用户是否已经点赞
        String key = "blog:liked:" + id;
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        if(BooleanUtil.isFalse(isMember)) {
            //3. 如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的set集合
            if(isSuccess){
                stringRedisTemplate.opsForSet().add(key,userId.toString());
            }
        }else {
            //4. 如果已经点赞,则取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的set集合移除
            if (isSuccess) {
                stringRedisTemplate.opsForSet().remove(key, userId.toString());
            }
        }
        return Result.ok();
    }


点赞排行榜

需求:按照点赞时间先后排序,返回Top5的用户

一人只能点赞一次功能实现:

因为SortedSet中没有isMember的判断,所以在添加元素的时候加上时间戳;查询的时候如果对应的元素没有时间戳,则代表集合中没有这个元素

    private void isBlogLiked(Blog blog) {
        UserDTO user = UserHolder.getUser();
        if(user==null){
            return;
        }
        Long userId = UserHolder.getUser().getId();
        String key = BLOG_LIKED_KEY + blog.getId();
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score!=null);
    }

    @Override
    public Result likeBlog(Long id) {
        //1. 获取登录用户
        Long userId = UserHolder.getUser().getId();
        //2. 判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + id;
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        if(score==null) {
            //3. 如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的set集合 zadd key value score
            if(isSuccess){
                stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
            }
        }else {
            //4. 如果已经点赞,则取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的set集合移除
            if (isSuccess) {
                stringRedisTemplate.opsForZSet().remove(key, userId.toString());
            }
        }
        return Result.ok();
    }

查询点赞排行前五:

    @Override
    public Result queryBlogLikes(Long id) {
        String key=BLOG_LIKED_KEY+id;
        //1. 查询top5的点赞用户 zrange key 0 4
        Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
        if(top5 == null || top5.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        //2. 解析出其中的用户id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        String idStr = StrUtil.join(",", ids);
        //3. 根据用户id查询用户
        List<UserDTO> userDTOs = userService
                .query()
                .in("id",ids)
                .last("order by field(id,"+idStr+")")
                .list()
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        //4. 返回
        return Result.ok(userDTOs);
    }

八、好友关注

关注和取关

 需求:基于该表数据结构,实现两个接口:

①关注和取关接口

②判断是否关注的接口

Controller实现:

@RestController
@RequestMapping("/follow")
public class FollowController {
    @Resource
    private IFollowService followService;

    @PutMapping("/{id}/{isFollow}")
    public Result follow(@PathVariable("id")Long followUserId,@PathVariable("isFollow")Boolean isFollow){
        return followService.follow(followUserId,isFollow);
    }

    @GetMapping("/or/not/{id}")
    public Result follow(@PathVariable("id")Long followUserId){
        return followService.isFollow(followUserId);
    }
}

ServiceImpl具体实现:

@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {

    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
        // 获取登录用户
        Long userId = UserHolder.getUser().getId();
        //1.判断到底是关注还是取关
        if (isFollow) {
            //2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            save(follow);
        } else {
            //3.取关,删除
            remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
        }
        return Result.ok();
    }

    @Override
    public Result isFollow(Long followUserId) {
        //1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        //2.查询是否关注
        Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
        //3.判断
        return Result.ok(count>0);
    }
}

共同关注

将下面的代码插入到UserController:

    @GetMapping("/{id}")
    public Result queryUserById(@PathVariable("id")Long userId){
        User user = userService.getById(userId);
        if(user == null){
            return Result.ok();
        }
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        return Result.ok(userDTO);
    }

将下面的代码插入到BlogController

    @GetMapping("/of/user")
    public Result queryBlogByUserId(
            @RequestParam(value="current",defaultValue = "1")Integer current,
            @RequestParam("id")Long id){
        Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        List<Blog> records = page.getRecords();
        return Result.ok(records);
    }

要实现共同关注的查找,可以考虑集合set的交集

需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友

①修改follow代码,将用户和关注的用户加入集合

    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
        // 获取登录用户
        Long userId = UserHolder.getUser().getId();
        String key="follows:"+userId;
        //1.判断到底是关注还是取关
        if (isFollow) {
            //2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            boolean isSuccess = save(follow);
            //判断是否关注成功
            if(isSuccess){
                //如果关注成功,把关注用户的id放入redis的set集合
                stringRedisTemplate.opsForSet().add(key,followUserId.toString());
            }

        } else {
            //3.取关,删除
            boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId));
            if (isSuccess) {
                stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
            }
        }
        return Result.ok();
    }

②求共同关注

controller:

    @GetMapping("/commons/{id}")
    public Result followCommons(@PathVariable("id") Long id){
        return followService.followCommons(id);
    }

具体实现:

    @Resource
    private IUserService userService;
    @Override
    public Result followCommons(Long id) {
        //1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        String key="follows:"+userId;
        //2.求交集
        String key2="follows:"+id;
        Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
        if(intersect==null || intersect.isEmpty()){
            //无交集
            return Result.ok(Collections.emptyList());
        }
        //3.解析id集合
        List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
        //4.查询用户
        List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
        return Result.ok(users);

    }

运行效果:

关注推送

Feed流实践方案分析

关注推送也叫作Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无线下拉刷新获取新的信息。

Feed流产品有两种常见的模式:

Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。

        优点:信息全面,不会有缺失。并且实现也相对简单

        缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户

        优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷

        缺点:如果算法不精准,可能起到反作用

本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

①拉模式

②推模式

③推拉结合

拉模式:也叫读扩散

每次发送消息的时候都会加上一个时间戳,当用户打开个人页面,就会去关注的人的发件箱里拉取消息并按时间戳排序。缺点是耗时较长,优点是节省内存。

推模式:也叫作写扩散。

关注的博主发消息的时候会直接把消息推送到个人主页,并排好序。在本人阅读个人主页的时候,无需等待拉取信息等。优点是延时低,缺点是消耗内存。

推拉结合模式:也叫读写混合,兼具推和拉两种模式的优点。

对于普通人,发送的消息可以直接推送到粉丝的收件箱。

对于大V,发送的消息,对于活跃粉丝使用推模式,对于普通粉丝使用拉模式。

总结:

 基于推模式实现关注推送功能

需求:

①修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱

②收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现

③查询收件箱数据时,可以实现分页查询

Feed流的滚动分页:

Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

接口:

    @PostMapping
    public Result saveBlog(@RequestBody Blog blog) {
        return blogService.saveBlog(blog);
    }

具体实现:

    @Resource
    private IFollowService followService;
    @Override
    public Result saveBlog(Blog blog) {
        //1.获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        //2.保存探店笔记
        boolean isSuccess = save(blog);
        if(!isSuccess){
            return Result.fail("新增笔记失败!");
        }
        //3.查询笔记作者的所有粉丝
        List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
        //4.返回id
        for(Follow follow:follows){
            //4.1 获取粉丝id
            Long userId = follow.getUserId();
            //4.2 推送
            String key="feed:"+userId;
            stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
        }
        return Result.ok(blog.getId());
    }

实现关注推送页面的分页查询

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:

以z1为例复习相应命令:

①倒序按范围查询

 ②混乱插入(此时插入一条数据,继续查询)

        可以发现按角标查询,会出现内容重复

 ③解决方法:可以按照分数查询

zrevrangebyscore key max min withscores limit offsest count

其中offset中0代表小于等于max的第一条,如果要实现小于max的第一条,则应将offset置1

count代表一次查询多少条(数量)

 ④存在的问题:如果value不一样,但是分数值一样,可能会查询到重复的数据

将m7的分数值改为6:

 此时再按照记忆的最后一条分数去做查询,会发现数据重复:

 ⑤总结:

滚动分页查询参数:

max:当前时间戳或者上一次查询的最小时间戳

min:0

offset:0 或者 在上一次的结果中,与最小值一样的元素的个数

count:3(与前端约定好)

 

实现滚动查询

①定义滚动查询结果的实体类

@Data
public class ScrollResult {
    private List<?> list;
    private Long minTime;
    private Integer offset;
}

②定义接口

    @GetMapping("/of/follow")
    public Result queryBlogOfFollow(@RequestParam("lastId")Long max,@RequestParam(value = "offset",defaultValue = "0")Integer offset){
        return blogService.queryBlogOfFollow(max,offset);
    }

③具体实现

    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
        //1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        //2.查询收件箱
        String key=FEED_KEY+userId;
        Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
        //3.非空判断
        if(typedTuples==null || typedTuples.isEmpty()){
            return Result.ok();
        }
        //4.解析数据:blogId,minTime(时间戳),offset
        ArrayList<Object> ids = new ArrayList<>(typedTuples.size());
        long minTime=0;
        int os=1;
        for(ZSetOperations.TypedTuple<String> tuple:typedTuples) {
            //4.1 获取id
            ids.add(Long.valueOf(tuple.getValue()));
            //4.2 获取分数(时间戳)
            long time = tuple.getScore().longValue();
            if (time == minTime) {
                os++;
            } else {
                minTime = time;
                os = 1;
            }
        }
        //5.根据id查询blog
        String idStr = StrUtil.join(",", ids);
        List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
        for (Blog blog : blogs) {
            //5.1 查询blog有关的用户
            queryBlogUser(blog);
            //5.2 查询blog是否被点赞
            isBlogLiked(blog);
        }
        //6.封装并返回
        ScrollResult r = new ScrollResult();
        r.setList(blogs);
        r.setOffset(os);
        r.setMinTime(minTime);
        return Result.ok(r);
    }

九、附近商户

 GEO数据结构

 案例:练习Redis的GEO功能

需求:

1.添加下面几条数据:

北京南站(116.378248 39.865275)

北京站(116.42803 39.903738)

北京西站(116.322287 39.893729)

 查看redis客户端:

==>geo底层采用ZSET实现

2.计算北京西站到北京南站的距离

==>默认单位是米

3.搜索天安门(116.397904 39.909005)附近10km内的所有火车站,并按照距离升序排序

附近商户搜索

导入店铺数据到GEO

 按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

 导入数据(通过单元测试):

    @Test
    void loadShopData(){
        //1.查询店铺信息
        List<Shop> list = shopService.list();
        //2.把店铺按照typeId分组,id一致的放到一个集合
        Map<Long,List<Shop>> map=list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
        //3.分批完成写入Redis
        for(Map.Entry<Long,List<Shop>> entry:map.entrySet()){
            //3.1 获取类型id
            Long typeId = entry.getKey();
            String key="shop:geo:"+typeId;
            //3.2 获取同类型的店铺集合
            List<Shop> value = entry.getValue();
            //3.3 写入redis
            List<RedisGeoCommands.GeoLocation<String>> locations=new ArrayList<>(value.size());
            for(Shop shop:value){
                locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));
            }
            stringRedisTemplate.opsForGeo().add(key,locations);
        }
    }

实现附近商户功能

首先,项目中redis相关的依赖版本太低,不支持GEOSEARCH方法,需要更换版本

我们将5.3.7和1.3.9版本排除,观察代码:

 然后手动引入新版本:

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>6.1.6.RELEASE</version>
        </dependency>

接口:

    /**
     * 根据商铺类型分页查询商铺信息
     * @param typeId 商铺类型
     * @param current 页码
     * @return 商铺列表
     */
    @GetMapping("/of/type")
    public Result queryShopByType(
            @RequestParam("typeId") Integer typeId,
            @RequestParam(value = "current", defaultValue = "1") Integer current,
            @RequestParam(value = "x",required = false) Double x,
            @RequestParam(value = "y",required = false) Double y
    ) {
       return shopService.queryShopByType(typeId,current,x,y);
    }

具体实现:

    @Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
        //1.判断是否需要根据坐标查询
        if(x==null || y==null){
            Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE));
            return Result.ok(page.getRecords());
        }
        //2.计算分页参数
        int from=(current-1)*DEFAULT_PAGE_SIZE;
        int end=current*DEFAULT_PAGE_SIZE;
        //3.查询redis、按照距离排序、分页 结果:shopId、distance
        String key=SHOP_GEO_KEY+typeId;
        GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
                .search(
                        key,
                        GeoReference.fromCoordinate(x, y),
                        new Distance(5000),
                        RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
                );
        //4.解析出id
        if(results==null){
            return Result.ok(Collections.emptyList());
        }
        List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
        if(list.size()<=from){
            return Result.ok(Collections.emptyList());
        }
        //4.1 截取from ~ end的那部分
        List<Long> ids=new ArrayList<>(list.size());
        Map<String,Distance> distanceMap=new HashMap<>(list.size());
        list.stream().skip(from).forEach(result->{
            //4.2 获取店铺id
            String shopIdStr = result.getContent().getName();
            ids.add(Long.valueOf(shopIdStr));
            //4.3 获取距离
            Distance distance = result.getDistance();
            distanceMap.put(shopIdStr,distance);
        });
        //5.根据id查询店铺Shop
        String idStr = StrUtil.join(",", ids);
        List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
        for(Shop shop:shops){
            shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
        }
        //6.返回
        return Result.ok(shops);
    }

效果:

 注意,在stream流中跳过了一部分商铺,很有可能导致跳过以后没有商铺可查,从而出现问题,所以需要加上如下判断:

        if(list.size()<=from){
            return Result.ok(Collections.emptyList());
        }

十、用户签到

 BitMap用法

我们按月来统计用户签到信息,签到记录为1,未签到则记录为0

 把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)

Redis中就是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。

 练习:

BITFIELD bm1 GET u2 0含义:读取bm1,以无符号十进制的方式读取两位(u2),从第一位开始读取。返回值3代表读取的比特位是11

BITPOS bm1 0 :代表查找第一个0出现的位置

 

 签到功能

需求:实现签到接口,将当前用户当天签到信息保存到Redis中

 提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。(Redis的字符串)

接口:

    @PostMapping("/sign")
    public Result sign(){
        return userService.sign();
    }

具体实现:

    @Override
    public Result sign() {
        //1.获取当前登录的用户
        Long userId = UserHolder.getUser().getId();
        //2.获取日期
        LocalDateTime now = LocalDateTime.now();
        //3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key=USER_SIGN_KEY+userId+keySuffix;
        //4.获取今天是本月的第几天
        int dayOfMonth=now.getDayOfMonth();
        //5.写入Redis
        stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
        return Result.ok();
    }

测试:

 

签到统计

Q1:什么叫做连续签到天数?

A1:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

Q2:如何得到本月到今天为止的所有签到数据?

BITFIELD key GET u[dayOfMonth] 0

Q3:如何从后向前遍历每个bit位?

与1做与运算,就能得到最后一个bit位

随后右移1位,下一个bit位就成为了最后一个bit位

案例:实现签到统计功能

需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

 接口:

    @GetMapping("/sign/count")
    public Result signCount(){
        return userService.signCount();
    }

具体实现:

    @Override
    public Result signCount() {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.获取日期
        LocalDateTime now = LocalDateTime.now();
        //3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key=USER_SIGN_KEY+userId+keySuffix;
        //4.获取今天是本月的第几天
        int dayOfMonth = now.getDayOfMonth();
        //5.获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字
        List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
                BitFieldSubCommands
                        .create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
                        .valueAt(0));
        if(result==null || result.isEmpty()){
            //没有任何签到结果
            return Result.ok(0);
        }
        Long num = result.get(0);
        if(num==null || num==0){
            return Result.ok(0);
        }
        //6.循环遍历
        int count=0;
        while(true){
            //6.1 让这个数字与1做与运算,得到数字的最后一个bit位
            if((num&1)==0){
                //如果为0,说明未签到,结束
                break;
            }else{
                //如果不为0,说明已签到,计数器+1
                count++;
            }
            num >>>= 1; // >>>代表无符号右移
        }
        return Result.ok(count);
    }

测试:返回值为3

 实际上也确实是3个连续的1:

 测试通过!

十一、UV统计

HyperLogLog用法

UV:全称Unique Visitor,也叫独立访问量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次

PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断用户是否已经统计过了,要将统计过的用户信息保存,但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。

 命令练习:

 

实现UV统计

我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:

    @Test
    void testHyperLogLog(){
        String[] values=new String[1000];
        int j=0;
        for (int i = 0; i < 1000000; i++) {
            j=i%1000;
            values[j]="user_"+i;
            if(j==999){
                //发送到Redis
                stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
            }
        }
        //统计数量
        Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
        System.out.println("count="+count);
    }

运行结果:

内存消耗:

运行前内存:

 运行后内存:

1929640-1505552= 424088

424088/1024/1024=0.4M

 


Redis实战篇完结,恭喜大家~~❀

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/501436.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java 基础进阶篇(十三)—— 异常处理机制

文章目录 一、异常概述、体系二、异常的分类三、异常的默认处理流程四、异常的处理机制4.1 编译时异常的处理机制4.1.1 方式一&#xff1a;抛出异常4.1.2 方式二&#xff1a;捕获异常4.1.3 方式三&#xff1a;前两者结合 4.2 运行时异常的处理机制 五、自定义异常5.1 自定义编译…

程序员面试金典10.*

文章目录 10.1合并排序的数组10.02变位词组10.03搜索旋转数组10.05稀疏数组搜索10.09排序矩阵查找10.10 数字流的秩10.11 峰与谷 10.1合并排序的数组 这个就从后往前加入到新数组里就行。如果B的下标是-1则结束&#xff0c;A的下标是-1则一直加B的元素。 class Solution { pub…

挑战14天学完Python---初识python基本图形绘制

往期文章 目录 往期文章前言1."Python蟒蛇绘制"实例2.Python标准库 之turtle库3. 面向对象编程风格3.1 import更多玩法3.1.1使用from和import保留字共同完成3.1.2 使用import和as保留字共同完成 4.turtle的原(wan)理 (fa)4.1 turtle绘图窗体布局---turtul.setup()4.2…

京东小程序折叠屏适配探索 | 京东云技术团队

前言 随着近年来手机行业的飞速发展&#xff0c;手机从功能机进入到智能机&#xff0c;手机屏幕占比也随着技术和系统的进步越来越大&#xff0c;特别是Android 10推出以后&#xff0c;折叠屏逐渐成为Android手机发展的趋势。 图 1 Android手机屏幕发展趋势 京东小程序近年来…

Python程序员辞职后,如何踏出自由职业的第一步,聊聊我自己的看法

大家好&#xff0c;我是兴哥。有个广州的朋友说他辞职了&#xff0c;想要自由职业该怎么开始第一步呢&#xff1f;我问他你之前的收入月薪是多少&#xff0c;他说2万出头。我不得不说&#xff0c;对于写项目的自由职业程序员&#xff0c;2万是一个极高的门槛。但既然他已经辞职…

第三十章 React的路由基本使用

关于React路由&#xff0c;我们在学习之前先了解一下其他知识点&#xff1a;SPA应用、路由的理解、react中如何使用路由。 SPA应用的理解 我们知道React脚手架给我们构建的是一个单页应用程序&#xff08;SPA&#xff09;&#xff0c;在页面加载时&#xff0c;只会加载一个HT…

2.Redis入门概述

1.Redis是什么 Remote Dictionary Server(远程字典服务)是完全开源的&#xff0c;使用ANSIC语言编写遵守BSD协议&#xff0c; 是一个高性能的Key-Value数据库&#xff0c; 提供了丰富的数据结构&#xff0c;例如String、Hash、List、Set、SortedSet等等。 数据是存在内存中的&a…

学会这几个Word技巧,让你办公省时又省力(二)

Word是我们经常用到的办公软件&#xff0c;下面分享的几个小技巧&#xff0c;可以提高你的办公效率&#xff0c;一起看看吧。 1. 改变Word文档的背景颜色 有时候我们打开的Word文档是有颜色的&#xff0c;如果你想恢复白色背景&#xff0c;或者改成其他颜色&#xff0c;只…

《Linux 内核设计与实现》08. 下半部和推后执行的工作

文章目录 下半部软中断软中断的实现使用软中断 tasklettasklet 的实现使用 tasklet 工作队列工作队列的实现使用工作队列 下半部 中断处理程序的局限性&#xff1a; 中断处理程序以异步方式执行&#xff0c;并且可能打断其它代码&#xff0c;因此为了避免被打断的代码停止时间…

PR控制以及使用PR控制用于单相离/并网逆变器

文章目录 前言基本知识实际使用单相离网逆变器单相并网逆变器 PR控制器离散化基本知识 DSP实现总结 前言 最近想学习一下并网逆变器&#xff0c;需要用到PR控制&#xff0c;全网找遍了许多学习资料&#xff0c;终于掌握的差不多了&#xff0c;在此做个记录&#xff0c;以及个人…

【每日一题】23年4月

文章目录 C 技术点多边三角形剖分的最低得分&#xff08;dp思路&#xff0c;选不选问题&#xff09;移动石子到连续&#xff08;思路&#xff09;1027. 最长等差数列(动态规划)1105. 填充书架&#xff08;动态规划&#xff09;1031 两个非重叠子数组的最大和1163.按字典序排在最…

【Java 】从源码全面解析Java 线程池

文章目录 一、引言二、使用三、源码1、初始化1.1 拒绝策略1.1.1 AbortPolicy1.1.2 CallerRunsPolicy1.1.3 DiscardOldestPolicy1.1.4 DiscardPolicy1.1.5 自定义拒绝策略1.2 其余变量 2、线程池的execute方法3、线程池的addWorker方法3.1 校验3.2 添加线程 4、线程池的 worker …

PostgreSQL 基础知识:psql 提示和技巧

对于积极使用和连接到 PostgreSQL 数据库的任何开发人员或 DBA 来说&#xff0c;能够访问psql命令行工具是必不可少的。在我们的第一篇文章中&#xff0c;我们讨论了 psql的简要历史&#xff0c;并演示了如何在您选择的平台上安装它并连接到 PostgreSQL 数据库。 在本文中&…

使用腾讯云快速完成网站备案的详细过程

最近总是被备案弄得血压飙升&#xff0c;明明是一件很简单的事情&#xff0c;不知道大家为什么搞得那么复杂&#xff0c;首先了解下为什么要备案&#xff0c;根据国务院令第292号《互联网信息服务管理办法》和 《非经营性互联网信息服务备案管理办法》规定&#xff0c;国家对经…

【TCP四次挥手】

文章目录 TCP 四次挥手过程是怎样的&#xff1f;为什么挥手需要四次&#xff1f;第一次挥手丢失了&#xff0c;会发生什么&#xff1f;第二次挥手丢失了&#xff0c;会发生什么&#xff1f;第三次挥手丢失了&#xff0c;会发生什么&#xff1f;第四次挥手丢失了&#xff0c;会发…

Lecture 13(Extra Material):Q-Learning

目录 Introduction of Q-Learning Tips of Q-Learning Double DQN Dueling DQN Prioritized Reply Multi-step Noisy Net Distributional Q-function Rainbow Q-Learning for Continuous Actions Introduction of Q-Learning Critic: The output values of a critic…

为生信写的Python简明教程 | 视频3

开源生信 Python教程 生信专用简明 Python 文字和视频教程 源码在&#xff1a;https://github.com/Tong-Chen/Bioinfo_course_python 目录 背景介绍 编程开篇为什么学习Python如何安装Python如何运行Python命令和脚本使用什么编辑器写Python脚本Python程序事例Python基本语法 数…

PySpark基础入门(7):Spark SQL

概述 SparkSQL和Hive的异同 Hive和Spark 均是&#xff1a;“分布式SQL计算引擎”SparkSQL使用内存计算&#xff0c;而Hive使用磁盘迭代&#xff0c;所以SparkSQL性能较好二者都可以运行在YARN之上SparkSQL无元数据管理&#xff0c;但可以和hive集成&#xff0c;集成之后可以借…

极光笔记 | 极光推出“运营增长”解决方案,开启企业增长新引擎

摘要&#xff1a; 移动互联网流量红利见底&#xff0c;营销获客面临更多挑战 随着移动互联网流量红利见顶&#xff0c;越来越多的企业客户发现获取新客户的难度直线上升&#xff0c;获客成本持续攀高。 传统的移动互联网营销以PUSH为代表&#xff0c;采用简单粗暴的方式给用户…

PaddleVideo 简介以及文件目录详解

简介特性许可证书 PaddleVideo 文件目录总述applications 文件夹详述configs 文件夹详述docs 文件夹详述paddlevideo 文件夹详述utils 文件夹tasks 文件夹loader 文件夹modeling 文件夹solver 文件夹metrics 文件夹 简介 PaddleVideo 旨在打造一套丰富、领先且实用的 Video 工…