一、短信登录
基于session实现短信登录的流程
实现发送短信验证码功能
发送验证码功能:
@Override
public Result sendCode(String phone, HttpSession session) {
//1.校验手机号
if(RegexUtils.isPhoneInvalid(phone)){
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
//3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);
//4.保存验证码到session
session.setAttribute("code",code);
//5.发送验证码
log.debug("发送短信验证码成功,验证码:{}"+code);
//返回ok
return Result.ok();
}
登录功能:
登录表单的实体类:
@Data
public class LoginFormDTO {
private String phone;
private String code;
private String password;
}
登录逻辑代码实现:
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if(RegexUtils.isPhoneInvalid(phone)){
//如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
//2.校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if(cacheCode==null ||! cacheCode.toString().equals(code)){
//3.不一致,报错
return Result.fail("验证码错误!");
}
//4.一致,根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();
//5.判断用户是否存在
if(user==null) {
//6.不存在,创建新用户并保存
user=createUsrWithPhone(phone);
}
//7.保存用户信息到session中
session.setAttribute("user",user);
return null;
}
private User createUsrWithPhone(String phone) {
//1.创建用户
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
//2.保存用户
save(user);
return user;
}
实现登录校验拦截器
ThreadLocal
叫做本地线程变量,意思是说,ThreadLocal
中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal
为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。
注意,为了隐藏用户敏感信息,也为了节省ThreadLocal的空间,需要将User转为UserDTO返回给前端。
可以通过hutool工具类,在UserService里修改:
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
public static void saveUser(UserDTO user){
tl.set(user);
}
public static UserDTO getUser(){
return tl.get();
}
public static void removeUser(){
tl.remove();
}
}
拦截器:
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取session
HttpSession session = request.getSession();
//2.获取session中的用户
Object user = session.getAttribute("user");
//3.判断用户是否存在
if(user==null) {
//4.不存在,拦截,返回401状态码,代表未授权
response.setStatus(401);
return false;
}
//5.存在,保存用户信息到ThreadLocal
UserHolder.saveUser((UserDTO) user);
//6.放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
添加拦截器:
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/voucher/**","/upload/**");
}
}
登录并返回:
@GetMapping("/me")
public Result me(){
// TODO 获取当前登录的用户并返回
UserDTO user = UserHolder.getUser();
return Result.ok(user);
}
session共享的问题分析
session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题
session的替代方案应该满足:
数据共享
内存存储
key、value结构
===>redis
Redis代替session的业务流程
基于Redis实现短信登录
拦截器修改:
public class LoginInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate=stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取请求头中的token
String token = request.getHeader("authorization");
//判断token是否为空
if(StrUtil.isBlank(token)){
response.setStatus(401);
return false;
}
String key=LOGIN_USER_KEY+token;
//2.基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//3.判断用户是否存在
if(userMap.isEmpty()){
//不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
//5.存在,将查询到的Hash数据转为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//6.保存用户到ThreadLocal
UserHolder.saveUser(userDTO);
//7.刷新token有效期
stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
//8.放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
MvcConfig修改:
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/voucher/**",
"/upload/**");
}
}
UserServiceImpl修改:
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result sendCode(String phone, HttpSession session) {
//1.校验手机号
if(RegexUtils.isPhoneInvalid(phone)){
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
//3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);
//4.保存验证码到redis中
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
//5.发送验证码
log.debug("发送短信验证码成功,验证码:{}"+code);
//返回ok
return Result.ok();
}
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if(RegexUtils.isPhoneInvalid(phone)){
//如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
//2.校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if(cacheCode==null ||!cacheCode.equals(code)){
//3.不一致,报错
return Result.fail("验证码错误!");
}
//4.一致,根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();
//5.判断用户是否存在
if(user==null) {
//6.不存在,创建新用户并保存
user=createUsrWithPhone(phone);
}
//保存用户信息到redis中
//1.随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true); //true代表isSimple,即不带中划线
//2.将User对象转为Hash存储
UserDTO userDTO=BeanUtil.copyProperties(user,UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
String tokenKey=LOGIN_USER_KEY+token;
//7.存储
stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
//设置token有效期
stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
return Result.ok();
}
private User createUsrWithPhone(String phone) {
//1.创建用户
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
//2.保存用户
save(user);
return user;
}
}
Redis代替session需要考虑的问题:
选择合适的数据结构
选择合适的key
选择合适的存储粒度
解决登录状态刷新的问题
RefreshTokenInterceptor
public class RefreshTokenInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate=stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取请求头中的token
String token = request.getHeader("authorization");
//判断token是否为空
if(StrUtil.isBlank(token)){
return true;
}
String key=LOGIN_USER_KEY+token;
//2.基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//3.判断用户是否存在
if(userMap.isEmpty()){
return true;
}
//5.存在,将查询到的Hash数据转为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//6.保存用户到ThreadLocal
UserHolder.saveUser(userDTO);
//7.刷新token有效期
stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
//8.放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//判断是否需要拦截(ThreadLocal中是否有用户)
if(UserHolder.getUser()==null){
//没有,需要拦截,设置状态码
response.setStatus(401);
return false;
}
//有用户,则放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
配置添加拦截器
注意通过order控制拦截器执行顺序,order越小越先执行
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/voucher/**",
"/upload/**").order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
}
}
二、商户缓存查询
什么是缓存
缓存就是数据交换的缓冲区(称作Cache),是存储数据的临时地方,一般读写性能较高
添加Redis缓存
/**
* 根据id查询商铺信息
* @param id 商铺id
* @return 商铺详情数据
*/
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return shopService.queryById(id);
}
ShopServiceImpl:
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
String key=CACHE_SHOP_KEY+id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)) {
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
return Result.fail("店铺不存在!");
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
//7.返回
return Result.ok(shop);
}
}
练习:给店铺类型查询业务添加缓存
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
@Autowired
private ShopTypeMapper shopTypeMapper;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryBatch() {
String key="CACHE_SHOP_TYPE_KEY";
String jsonType = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isNotBlank(jsonType)){
List<ShopType> shopTypes = JSONUtil.toList(jsonType, ShopType.class);
return Result.ok(shopTypes);
}
List<ShopType> shopTypes = shopTypeMapper.selectList(new QueryWrapper<>());
if(shopTypes.isEmpty()){
return Result.fail("您查询的页面不存在!");
}
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopTypes));
return Result.ok(shopTypes);
}
}
缓存更新策略
业务场景:
低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
主动更新策略:
操作缓存和数据库时有三个问题需要考虑:
1.删除缓存还是更新缓存?
更新缓存:每次更新都更新缓存,无效写操作较多
删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)
2.如何保证缓存与数据库的操作的同时成功或失败?
单体系统:将缓存与数据库操作放在一个事务
分布式系统:利用TCC等分布式事务方案
3.先操作缓存还是先操作数据库?
总结:
实现缓存与数据库的双写一致
案例:给查询商铺的缓存添加超时剔除和主动更新的策略
修改ShopController中的业务逻辑,满足下面的需求:
①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
②根据id修改店铺时,先修改数据库,再删除缓存
更新操作:
@Override
@Transactional
public Result update(Shop shop) {
Long id=shop.getId();
if(id==null){
return Result.fail("店铺id不能为空!");
}
//1.更新数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY+shop.getId());
return Result.ok();
}
缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
编码解决商铺查询的缓存穿透问题
@Override
public Result queryById(Long id) {
String key=CACHE_SHOP_KEY+id;
//1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)) {
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//命中的是否为空值
if(shopJson!=null){
return Result.fail("店铺信息不存在!");
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺信息不存在!");
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
//7.返回
return Result.ok(shop);
}
总结:
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
缓存null值
布隆过滤
增强id的复杂度,避免被猜测id规律
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
缓存雪崩
缓存雪崩是指同一时刻大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
互斥锁
逻辑过期
互斥锁和逻辑过期对比:
互斥锁和逻辑过期优缺点:
利用互斥锁解决缓存击穿问题
需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
public Shop queryWithMutex(Long id){
String key = CACHE_SHOP_KEY+id;
//从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//判断缓存是否命中
if(StrUtil.isNotBlank(shopJson)){
//命中则直接返回数据
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断是否是缓存穿透
if(shopJson!=null){
return null;
}
//实现缓存重建
//1.获取互斥锁
String lockKey=LOCK_SHOP_KEY+id;
try{
boolean isLock = tryLock(lockKey);
//2.判断是否获取成功
if(!isLock) {
//3.失败,则休眠并重试
Thread.sleep(50);
queryWithMutex(id);
}
//4.成功,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
return shop;
}catch(InterruptedException e){
throw new RuntimeException(e);
}finally{
unlock(lockKey);
}
}
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
基于逻辑过期方式解决缓存击穿问题
需求:修改根据id删除商铺的业务,基于逻辑过期的方式来解决缓存击穿问题
添加逻辑过期时间:
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
整体实现逻辑:
private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire(Long id){
String key=CACHE_SHOP_KEY+id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//缓存未命中
if(StrUtil.isBlank(shopJson)){
return null;
}
//命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
//判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
//未过期,直接返回店铺信息
return shop;
}
//已过期,需要缓存重建
String lockKey=LOCK_SHOP_KEY+id;
//获取互斥锁
boolean isLock = tryLock(lockKey);
//判断是否获取锁成功
if(isLock) {
//成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
saveShop2Redis(id, CACHE_SHOP_TTL);
}catch(Exception e){
throw new RuntimeException(e);
}finally{
unlock(lockKey);
}
});
}
//返过期的商铺信息
}
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
public void saveShop2Redis(Long id,Long expireSeconds){
//1.查询店铺数据
Shop shop = getById(id);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
封装Redis工具类
基于StringRedisTemplate封装一个缓存工具类,满足下列要求:
方法1:任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate=stringRedisTemplate;
}
public void set(String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
//设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit){
String key=keyPrefix+id;
String json = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isNotBlank(json)){
return JSONUtil.toBean(json,type);
}
//判断命中的是否为空值
if(json!=null){
return null;
}
R r = dbFallback.apply(id);
if(r==null){
stringRedisTemplate.opsForValue().set(key,"",time,unit);
return null;
}
this.set(key,r,time,unit);
return r;
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
String key=keyPrefix+id;
String json = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isBlank(json)){
return null;
}
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
//判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
//未过期,直接返回店铺信息
return r;
}
//已过期,需要缓存重建
String lockKey=LOCK_SHOP_KEY+id;
boolean isLock = tryLock(lockKey);
if(isLock){
CACHE_REBUILD_EXECUTOR.submit(()->{
try{
//重建缓存
//1.查询数据库
R apply = dbFallback.apply(id);
//2.存入redis
this.setWithLogicalExpire(key,apply,time,unit);
}catch (Exception e){
throw new RuntimeException(e);
}finally{
unlock(key);
}
});
}
return r;
}
private boolean tryLock(String key){
Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
}
缓存总结
认识缓存
什么是缓存?
一种具备高效读写能力的数据暂存区域
缓存的作用?
降低后端负载
提高读写响应速度
缓存的成本?
开发成本
运维成本
一致性问题
缓存更新策略
三种策略:
内存淘汰:redis自带的内存淘汰机制
过期淘汰:利用expire命令给数据设置过期时间
主动更新:主动完成数据库与缓存的同时更新
策略选择:
低一致性需求:内存淘汰或过期淘汰
高一致性需求:
主动更新为主
过期淘汰兜底
主动更新方案
Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新
一致性良好、实现难度一般
Read/Write Through:缓存与数据库集成为一个服务,服务保证两者的一致性,对外暴露API接口,调用者调用API,无需知道自己操作的是数据库还是缓存,不关心一致性
一致性优秀、实现复杂、性能一般
Write Back:缓存调用者的CRUD都针对缓存完成。有独立线程异步将缓存数据写到数据库,实现最终一致
一致性差、性能好、实现复杂
Cache Aside模式选择
更新缓存还是删除缓存?
更新缓存会产生无效更新,并且存在较大的线程安全问题
删除缓存本质是延迟更新,没有无效更新,线程安全问题相对较低
先操作数据库还是缓存?
先更新数据,再删除缓存——在满足原子性的情况下,安全问题概率较低
先删除缓存,再更新数据库——安全问题概率极高
如何确保数据库与缓存操作原子性?
单体系统——利用事务机制
分布式系统——利用分布式事务机制
最佳实践
查询数据时
1.先查询缓存
2.如果缓存命中,直接返回
3.如果缓存未命中,则查询数据库
4.将数据库数据写入缓存
5.返回结果
修改数据库时:
1.先修改数据库
2.然后删除缓存
===>确保两者的原子性
缓存穿透
产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
解决方案:
①缓存空对象:
思路:对于不存在的数据也在redis建立缓存,值为空,并设置一个较短的TTL时间
优点:实现简单,便于维护
缺点:额外的内存消耗、短期的数据不一致问题
②布隆过滤:
思路:利用布隆过滤算法,在请求进入Redis之前先判断是否存在,如果不存在则直接拒绝请求
优点:内存占用少
缺点:实现复杂、存在误判的可能性
③其他:
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
缓存雪崩
产生原因:在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
缓存击穿(热点key)
产生原因:
热点key:①在某一时段被高并发访问 ②缓存重建耗时较长
热点key突然过期,因为缓存重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击
解决方案:
互斥锁:
思路:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待
优点:①实现简单 ②没有额外内存消耗 ③一致性好
缺点:①等待导致性能下降
缺点:有死锁风险
逻辑过期:
思路:
①热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存
②重建缓存也通过互斥锁来保证单线程执行
③重建缓存利用独立线程异步执行
④其他线程无需等待,直接查询到旧数据即可
优点:现成无需等待,性能较好
缺点:
①不保证一致性
②有额外内存消耗
③实现复杂
三、优惠券秒杀
全局ID生成器
每个店铺都可以发布优惠券:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:
id的规律性太明显
受单表数据量的限制
全局ID生成器:
全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,可以支持每秒产生2^32个不同的ID
Redis实现全局唯一ID
@Component
public class RedisIdWorker {
//开始时间戳 2023-01-01 00:00:00
private static final long BEGIN_TIMESTAMP=1672531200L;
private static final int COUNT_BITS=32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate=stringRedisTemplate;
}
public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp=nowSecond-BEGIN_TIMESTAMP;
//2.生成序列号
//2.1 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2 自增长
long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
//3.拼接并返回
return timestamp<<COUNT_BITS | count;
}
}
测试代码:
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es= Executors.newFixedThreadPool(500);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task=()->{
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id="+id);
}
latch.countDown();
};
latch.countDown();
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end=System.currentTimeMillis();
System.out.println("time="+(end-begin));
}
总结:
全局唯一ID生成策略:
UUID
Redis自增
snowflake算法
数据库自增
Redis自增ID策略:
每天一个key,方便统计订单量
ID构造是 时间戳+计数器
添加优惠券
每个店铺都可以发布优惠券,分为评价券和特价劵。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。
通过postman添加优惠券:
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五均可使用",
"rules":"全场通用\\n无需预约\\n可无限叠加\\n不兑换、不找零\\n仅限堂食",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2023-04-29T10:09:17",
"endTime":"2023-04-29T23:09:04"
}
实现秒杀下单
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock()<1){
//库存不足
return Result.fail("库存不足!");
}
//5.扣减库存
boolean success=seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId)
.update();
if(!success){
//扣减失败
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单id
return Result.ok(orderId);
}
}
超卖问题
用jmeter模拟高并发:
模拟结果:
原因:
超卖问题就是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:
版本号法
CAS法
乐观锁解决超卖问题
boolean success=seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId).eq("stock",voucher.getStock())
.update();
乐观锁问题:成功率太低
乐观锁优化:
boolean success=seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId).gt("stock",0)
.update();
运行结果:50%的失败率,订单恰好没有超卖
超卖这样的线程安全问题,解决方案有哪些?
1.悲观锁:添加同步锁,让线程串行执行
优点:简单粗暴
缺点:性能一般
2.乐观锁:不加锁,在更新时判断是否有其他线程在修改
优点:性能好
缺点:存在成功率低的问题
实现一人一单功能
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
代码实现:
//一人一单
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
//用户已经存在
return Result.fail("该用户已经购买过一次!");
}
对于高并发,解决线程安全问题:
(先提交事务,再释放锁)
第一步:添加依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
第二步:暴露代理对象 @EnableAspectJAutoProxy(exposeProxy=true)
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
第三步:对方法加锁
如果用this.createVoucherOrder(voucherId); 首先我们要知道事务如果想生效,需要Spring对该类做动态代理,拿到了代理对象,来对事务进行处理
这个this是没有事务功能的,因为拿到的目标对象(非代理对象)
所以需要拿到事务代理对象AopContext.currentProxy()
synchronized (userId.toString().intern()) {
//获取代理对象(事务)
IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//一人一单
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
//用户已经存在
return Result.fail("该用户已经购买过一次!");
}
//5.扣减库存
boolean success = seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherId).gt("stock", 0)
.update();
if (!success) {
//扣减失败
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id
voucherOrder.setUserId(userId);
//6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单id
return Result.ok(orderId);
}
压测结果:
集群模式下线程锁失效
因为集群模式下,每个JVM有各自的锁监视器
四、分布式锁
分布式锁基本原理
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁的实现:
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见有三种:
Redis分布式锁的基本实现
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
#添加锁,利用setnx的互斥特性
setnx lock thread1
#添加锁过期时间,避免服务宕机引起的死锁
expire lock 10
非阻塞:尝试一次,成功返回true,失败返回false
释放锁:
手动释放:del key
超时释放:获取锁时添加一个超时时间
实现Redis分布式锁初级版本
需求:定义一个类,实现下面的接口,利用Redis实现分布式锁功能
@AllArgsConstructor
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
@Override
public boolean tryLock(long timeoutSec) {
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
优化一人一单:
Long userId=UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
boolean isLock = lock.tryLock(1200);
if(!isLock) {
//获取锁失败,返回错误或重试
return Result.fail("一个人只能下一单!");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
lock.unlock();
}
Redis分布式锁误删
解决方案:在释放锁前判断这个锁是否还属于自己
改进Redis的分布式锁
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时现货区锁中的线程标识,判断是否与当前线程标识一致
如果一致则释放锁
如果不一致则不释放锁
@AllArgsConstructor
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
String threadId =ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//获取线程标识
String threadId=ID_PREFIX + Thread.currentThread().getId();
//获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//判断标识是否一致
if(threadId.equals(id)) {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
分布式锁的原子性问题
判断和释放锁是两个动作,这中间可能会发生并发问题
Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。
这里重点介绍Redis提供的调用函数,语法如下:
#执行redis命令
redis.call('命令名称','key','其它参数',...)
#比如,我们要执行set name jack
redis.call('set','name','jack')
#如果我们要执行set name Rose,再执行get name,则脚本如下
redis.call('set','name','Rose')
local name = redis.call('get','name')
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本常见命令如下:
执行无参脚本:
执行有参脚本:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入keys数组,其它参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数:
基于Redis的分布式锁
释放锁的业务流程:
1.获取锁中的线程标识
2.判断是否与指定的标识(当前线程标识)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
-- 锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]
-- 获取锁中的线程标识 get key
local id = redis.call('get',key)
--比较线程标识与锁中的标识是否一致
if(id == threadId) then
-- 释放锁
return redis.call('del',key)
end
return 0
脚本优化:
if(redis.call('get',KEYS[1]==ARGV[1])) then
return redis.call('del',KEYS[1])
end
return 0
再次改进Redis的分布式锁
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的API如下:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
基于Redis的分布式锁实现思路:
利用setnx nx ex获取锁,并设置过期时间,保存线程标识
释放锁时先判断线程标识与自己是否一致,一致则删除锁
特性:
利用set nx满足互斥性
利用set ex保证故障时依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
Redisson功能介绍
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redisson快速入门
第一步:引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
第二步:配置Redisson
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.202.128:6379").setPassword("123321");
//创建RedissonClient对象
return Redisson.create(config);
}
}
3.使用Redisson分布式锁
使用jmeter压测通过,一个用户只能下一单:
Redisson可重入锁原理
redis无法实现可重入锁的原因:
Redisson可以实现可重入锁的原理:
lua脚本实现:
测试代码:
@SpringBootTest
@Slf4j
public class RedissonTest {
@Autowired
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setup(){
lock=redissonClient.getLock("order");
}
@Test
void method1(){
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败……1");
return;
}
try{
log.info("获取锁成功……1");
method2();
log.info("开始执行业务……1");
}finally{
log.warn("准备释放锁……1");
lock.unlock();
}
}
void method2(){
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败……2");
return;
}
try{
log.info("获取锁成功……2");
log.info("开始执行业务……2");
}finally{
log.warn("准备释放锁……2");
lock.unlock();
}
}
}
运行截图:
查看源码实现:
trylock:
unlock:
Redisson的锁重试和WatchDog机制
Redisson分布式锁原理:
可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间
修改method1:
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
跟入源码:
注:redis.call('pttl',KEYS[1]);中的pttl返回以毫秒为单位,而ttl返回以秒为单位
如果该线程拿到锁,则返回nil;否则返回剩余有效期
代码精华部分:
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
怎么确保锁是因为业务结束而释放,而非阻塞导致的超时释放呢?
自动更新续期:
scheduleExpirationRenewal(threadId);
renewExpiration:更新有效期
实现永不过期的代码:
Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
那么,何时取消计时刷新呢?
在锁释放的时候结束计时
Redisson的multiLock问题
redisson分布式锁主从一致问题:如果主节点宕机,而未将数据同步给从节点,可能会导致并发问题
解决方案:所有节点都变成独立的redis节点
总结:
1)不可重入的Redis分布式锁:
原理:利用setnx的互斥性,利用ex避免死锁;释放锁时判断线程标识
缺陷:不可重入、无法重试、锁超时失败
2)可重入的Redis分布式锁:
原理:利用hash结果,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
五、秒杀优化
异步秒杀思路
怎么保证一人一单呢?Set集合
怎么确保代码执行的原子性?lua脚本
基于Redis完成秒杀资格判断
需求:
①新增秒杀优惠券的同时,将优惠券的信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
}
新增优惠券:
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey))<=0) then
-- 3.2 库存不足,返回1
return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then
-- 3.3 存在,说明是重复下单,返回2
return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
// 取出用户
Long userId = UserHolder.getUser().getId();
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),userId.toString()
);
int r=result.intValue();
//2. 判断结果是否为0
if(r != 0) {
//2.1 不为0,没有购买资格
return Result.fail(r==1 ? "库存不足" : "不能重复下单");
}
//2.1 为0,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存阻塞队列
//3.返回订单id
return Result.ok(orderId);
}
④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
基于阻塞队列实现秒杀业务
源代码有大改动,包括之前写好的createVoucherOrder,所以附上完整源码:
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private IVoucherOrderService proxy;
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();//没有任务时会自动阻塞
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("处理订单异常",e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
//2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//3.获取锁
boolean isLock = lock.tryLock();
//4.判断是否获取锁成功
if(!isLock){
log.error("不允许重复下单");
return;
}
try{
proxy.createVoucherOrder(voucherOrder);
}finally{
lock.unlock();
}
}
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
// 取出用户
Long userId = UserHolder.getUser().getId();
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),userId.toString()
);
int r=result.intValue();
//2. 判断结果是否为0
if(r != 0) {
//2.1 不为0,没有购买资格
return Result.fail(r==1 ? "库存不足" : "不能重复下单");
}
//2.1 为0,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
orderTasks.add(voucherOrder);
//3.返回订单id
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//一人一单
Long userId = voucherOrder.getUserId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
//用户已经存在
log.error("用户已经购买过一次");
return;
}
//5.扣减库存
boolean success = seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
.update();
if (!success) {
//扣减失败
log.error("库存不足");
}
save(voucherOrder);
}
}
秒杀业务的优化思路是什么?
①先利用Redis完成库存余量、一人一单判断,完成抢单业务
②再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
内存限制问题
数据安全问题
六、Redis消息队列
认识消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:
list结构:基于List结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型
基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果
队列的入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用的是BRPOP或者BLPOP来实现阻塞效果。
示例:
基于List的消息队列有哪些优缺点?
优点:
利用Redis存储,不受限于JVM内存上限
基于Redis的持久化机制,数据安全性有保证
可以满足消息有序性
缺点:
无法避免消息丢失
只支持单消费者
基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg:向一个频道发送消息
PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道
pattern:
?代表一个字符
* 代表零个或多个字符
[ae] 代表可以是a也可以是e
示例:
基于PubSub的消息队列有哪些优缺点?
优点:
采用发布订阅模型,支持多生产、多消费
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列
示例1:
===>结论 :基于stream的消息队列,消息会被持久化存储,可以被多个消费者读取,也可以被一个消费者读取多次
示例2:阻塞等待,block 0 代表一直阻塞直到有新消息的到来
消费者应用:
bug:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
消息漏读:
STREAM类型消息队列的XREAD命令特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
基于Stream的消费队列---消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
==>队列中的消息如果还想用,ID从0开始;如果不想用了,ID从$开始
示例:
Java代码实现:
STREAM类型消息队列的XREADGROUP命令特点:
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
总结:
基于Redis的Stream结构作为消息队列,实现异步秒杀下单
需求:
①创建一个Stream类型的消息队列,名为stream.orders
参数MKSTREAM,在创建组的时候,如果消息队列不存在,则自动创建组和队列
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
添加orderId:
--1.3 订单id
local orderId = ARGV[3]
发送消息到队列中
-- 3.6 发送消息到队列中
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
改造Java代码:
@Override
public Result seckillVoucher(Long voucherId) {
// 取出用户
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),userId.toString(),String.valueOf(orderId)
);
int r=result.intValue();
//2. 判断结果是否为0
if(r != 0) {
//2.1 不为0,没有购买资格
return Result.fail(r==1 ? "库存不足" : "不能重复下单");
}
proxy=(IVoucherOrderService) AopContext.currentProxy();
//3.返回订单id
return Result.ok(orderId);
}
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
private class VoucherOrderHandler implements Runnable{
String queueName = "stream.orders";
@Override
public void run() {
while(true) {
try {
//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 如果获取失败,说明没有消息,继续下一次循环
continue;
}
// 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3.创建订单
handleVoucherOrder(voucherOrder);
//4.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList(){
while(true){
try{
//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息是否获取成功
if(list == null || list.isEmpty()){
//没有消息,说明pending-list中没有异常消息,结束循环
break;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//5.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch(Exception e){
log.error("处理pending-list订单异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
七、达人探店
发布探店笔记
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价
第一步:将SystemConstants的常量改为部署在nginx项目下的imgs目录
public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";
测试效果:
实现查看发布探店笔记的接口
Controller接口:
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){
return blogService.queryBlogById();
}
Service实现:
@Override
public Result queryBlogById(Long id) {
//1.查询blog
Blog blog = getById(id);
if(blog == null){
return Result.fail("笔记不存在!");
}
//2.查询blog有关的用户
queryBlogUser(blog);
return Result.ok(blog);
}
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
点赞
需求:
同一个用户只能点赞一次,再次点击则取消点赞
如果当前用户已经点赞,则点赞按钮高亮显示(前段已实现,判断字段Blog类的isLike属性)
实现步骤:
①给Blog类中添加一个isLike字段,标识是否被当前用户点赞
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;
②修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
③修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值給isLike字段
④修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog->{
this.queryBlogUser(blog);
this.isBlogLiked(blog);
});
return Result.ok(records);
}
@Override
public Result queryBlogById(Long id) {
//1.查询blog
Blog blog = getById(id);
if(blog == null){
return Result.fail("笔记不存在!");
}
//2.查询blog有关的用户
queryBlogUser(blog);
//3.查询blog是否被点赞
isBlogLiked(blog);
return Result.ok(blog);
}
private void isBlogLiked(Blog blog) {
Long userId = UserHolder.getUser().getId();
String key = "blog:liked:" + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
@Override
public Result likeBlog(Long id) {
//1. 获取登录用户
Long userId = UserHolder.getUser().getId();
//2. 判断当前登录用户是否已经点赞
String key = "blog:liked:" + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if(BooleanUtil.isFalse(isMember)) {
//3. 如果未点赞,可以点赞
//3.1 数据库点赞数+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//3.2 保存用户到Redis的set集合
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else {
//4. 如果已经点赞,则取消点赞
//4.1 数据库点赞数-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//4.2 把用户从Redis的set集合移除
if (isSuccess) {
stringRedisTemplate.opsForSet().remove(key, userId.toString());
}
}
return Result.ok();
}
点赞排行榜
需求:按照点赞时间先后排序,返回Top5的用户
一人只能点赞一次功能实现:
因为SortedSet中没有isMember的判断,所以在添加元素的时候加上时间戳;查询的时候如果对应的元素没有时间戳,则代表集合中没有这个元素
private void isBlogLiked(Blog blog) {
UserDTO user = UserHolder.getUser();
if(user==null){
return;
}
Long userId = UserHolder.getUser().getId();
String key = BLOG_LIKED_KEY + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(score!=null);
}
@Override
public Result likeBlog(Long id) {
//1. 获取登录用户
Long userId = UserHolder.getUser().getId();
//2. 判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if(score==null) {
//3. 如果未点赞,可以点赞
//3.1 数据库点赞数+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//3.2 保存用户到Redis的set集合 zadd key value score
if(isSuccess){
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
}else {
//4. 如果已经点赞,则取消点赞
//4.1 数据库点赞数-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//4.2 把用户从Redis的set集合移除
if (isSuccess) {
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}
查询点赞排行前五:
@Override
public Result queryBlogLikes(Long id) {
String key=BLOG_LIKED_KEY+id;
//1. 查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5 == null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2. 解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
//3. 根据用户id查询用户
List<UserDTO> userDTOs = userService
.query()
.in("id",ids)
.last("order by field(id,"+idStr+")")
.list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4. 返回
return Result.ok(userDTOs);
}
八、好友关注
关注和取关
需求:基于该表数据结构,实现两个接口:
①关注和取关接口
②判断是否关注的接口
Controller实现:
@RestController
@RequestMapping("/follow")
public class FollowController {
@Resource
private IFollowService followService;
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id")Long followUserId,@PathVariable("isFollow")Boolean isFollow){
return followService.follow(followUserId,isFollow);
}
@GetMapping("/or/not/{id}")
public Result follow(@PathVariable("id")Long followUserId){
return followService.isFollow(followUserId);
}
}
ServiceImpl具体实现:
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取登录用户
Long userId = UserHolder.getUser().getId();
//1.判断到底是关注还是取关
if (isFollow) {
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
save(follow);
} else {
//3.取关,删除
remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
}
return Result.ok();
}
@Override
public Result isFollow(Long followUserId) {
//1.获取登录用户
Long userId = UserHolder.getUser().getId();
//2.查询是否关注
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
//3.判断
return Result.ok(count>0);
}
}
共同关注
将下面的代码插入到UserController:
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id")Long userId){
User user = userService.getById(userId);
if(user == null){
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
return Result.ok(userDTO);
}
将下面的代码插入到BlogController
@GetMapping("/of/user")
public Result queryBlogByUserId(
@RequestParam(value="current",defaultValue = "1")Integer current,
@RequestParam("id")Long id){
Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
List<Blog> records = page.getRecords();
return Result.ok(records);
}
要实现共同关注的查找,可以考虑集合set的交集
需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友
①修改follow代码,将用户和关注的用户加入集合
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取登录用户
Long userId = UserHolder.getUser().getId();
String key="follows:"+userId;
//1.判断到底是关注还是取关
if (isFollow) {
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
//判断是否关注成功
if(isSuccess){
//如果关注成功,把关注用户的id放入redis的set集合
stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
} else {
//3.取关,删除
boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId));
if (isSuccess) {
stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
}
}
return Result.ok();
}
②求共同关注
controller:
@GetMapping("/commons/{id}")
public Result followCommons(@PathVariable("id") Long id){
return followService.followCommons(id);
}
具体实现:
@Resource
private IUserService userService;
@Override
public Result followCommons(Long id) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
String key="follows:"+userId;
//2.求交集
String key2="follows:"+id;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
if(intersect==null || intersect.isEmpty()){
//无交集
return Result.ok(Collections.emptyList());
}
//3.解析id集合
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4.查询用户
List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(users);
}
运行效果:
关注推送
Feed流实践方案分析
关注推送也叫作Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无线下拉刷新获取新的信息。
Feed流产品有两种常见的模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。
优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户
优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
①拉模式
②推模式
③推拉结合
拉模式:也叫读扩散
每次发送消息的时候都会加上一个时间戳,当用户打开个人页面,就会去关注的人的发件箱里拉取消息并按时间戳排序。缺点是耗时较长,优点是节省内存。
推模式:也叫作写扩散。
关注的博主发消息的时候会直接把消息推送到个人主页,并排好序。在本人阅读个人主页的时候,无需等待拉取信息等。优点是延时低,缺点是消耗内存。
推拉结合模式:也叫读写混合,兼具推和拉两种模式的优点。
对于普通人,发送的消息可以直接推送到粉丝的收件箱。
对于大V,发送的消息,对于活跃粉丝使用推模式,对于普通粉丝使用拉模式。
总结:
基于推模式实现关注推送功能
需求:
①修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
②收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
③查询收件箱数据时,可以实现分页查询
Feed流的滚动分页:
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
接口:
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
return blogService.saveBlog(blog);
}
具体实现:
@Resource
private IFollowService followService;
@Override
public Result saveBlog(Blog blog) {
//1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
//2.保存探店笔记
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.fail("新增笔记失败!");
}
//3.查询笔记作者的所有粉丝
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
//4.返回id
for(Follow follow:follows){
//4.1 获取粉丝id
Long userId = follow.getUserId();
//4.2 推送
String key="feed:"+userId;
stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
}
return Result.ok(blog.getId());
}
实现关注推送页面的分页查询
需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
以z1为例复习相应命令:
①倒序按范围查询
②混乱插入(此时插入一条数据,继续查询)
可以发现按角标查询,会出现内容重复
③解决方法:可以按照分数查询
zrevrangebyscore key max min withscores limit offsest count
其中offset中0代表小于等于max的第一条,如果要实现小于max的第一条,则应将offset置1
count代表一次查询多少条(数量)
④存在的问题:如果value不一样,但是分数值一样,可能会查询到重复的数据
将m7的分数值改为6:
此时再按照记忆的最后一条分数去做查询,会发现数据重复:
⑤总结:
滚动分页查询参数:
max:当前时间戳或者上一次查询的最小时间戳
min:0
offset:0 或者 在上一次的结果中,与最小值一样的元素的个数
count:3(与前端约定好)
实现滚动查询
①定义滚动查询结果的实体类
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}
②定义接口
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId")Long max,@RequestParam(value = "offset",defaultValue = "0")Integer offset){
return blogService.queryBlogOfFollow(max,offset);
}
③具体实现
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
//2.查询收件箱
String key=FEED_KEY+userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
//3.非空判断
if(typedTuples==null || typedTuples.isEmpty()){
return Result.ok();
}
//4.解析数据:blogId,minTime(时间戳),offset
ArrayList<Object> ids = new ArrayList<>(typedTuples.size());
long minTime=0;
int os=1;
for(ZSetOperations.TypedTuple<String> tuple:typedTuples) {
//4.1 获取id
ids.add(Long.valueOf(tuple.getValue()));
//4.2 获取分数(时间戳)
long time = tuple.getScore().longValue();
if (time == minTime) {
os++;
} else {
minTime = time;
os = 1;
}
}
//5.根据id查询blog
String idStr = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
for (Blog blog : blogs) {
//5.1 查询blog有关的用户
queryBlogUser(blog);
//5.2 查询blog是否被点赞
isBlogLiked(blog);
}
//6.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}
九、附近商户
GEO数据结构
案例:练习Redis的GEO功能
需求:
1.添加下面几条数据:
北京南站(116.378248 39.865275)
北京站(116.42803 39.903738)
北京西站(116.322287 39.893729)
查看redis客户端:
==>geo底层采用ZSET实现
2.计算北京西站到北京南站的距离
==>默认单位是米
3.搜索天安门(116.397904 39.909005)附近10km内的所有火车站,并按照距离升序排序
附近商户搜索
导入店铺数据到GEO
按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可
导入数据(通过单元测试):
@Test
void loadShopData(){
//1.查询店铺信息
List<Shop> list = shopService.list();
//2.把店铺按照typeId分组,id一致的放到一个集合
Map<Long,List<Shop>> map=list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
//3.分批完成写入Redis
for(Map.Entry<Long,List<Shop>> entry:map.entrySet()){
//3.1 获取类型id
Long typeId = entry.getKey();
String key="shop:geo:"+typeId;
//3.2 获取同类型的店铺集合
List<Shop> value = entry.getValue();
//3.3 写入redis
List<RedisGeoCommands.GeoLocation<String>> locations=new ArrayList<>(value.size());
for(Shop shop:value){
locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));
}
stringRedisTemplate.opsForGeo().add(key,locations);
}
}
实现附近商户功能
首先,项目中redis相关的依赖版本太低,不支持GEOSEARCH方法,需要更换版本
我们将5.3.7和1.3.9版本排除,观察代码:
然后手动引入新版本:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>
接口:
/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x",required = false) Double x,
@RequestParam(value = "y",required = false) Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}
具体实现:
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
//1.判断是否需要根据坐标查询
if(x==null || y==null){
Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE));
return Result.ok(page.getRecords());
}
//2.计算分页参数
int from=(current-1)*DEFAULT_PAGE_SIZE;
int end=current*DEFAULT_PAGE_SIZE;
//3.查询redis、按照距离排序、分页 结果:shopId、distance
String key=SHOP_GEO_KEY+typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
.search(
key,
GeoReference.fromCoordinate(x, y),
new Distance(5000),
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
);
//4.解析出id
if(results==null){
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
if(list.size()<=from){
return Result.ok(Collections.emptyList());
}
//4.1 截取from ~ end的那部分
List<Long> ids=new ArrayList<>(list.size());
Map<String,Distance> distanceMap=new HashMap<>(list.size());
list.stream().skip(from).forEach(result->{
//4.2 获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
//4.3 获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr,distance);
});
//5.根据id查询店铺Shop
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
for(Shop shop:shops){
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
//6.返回
return Result.ok(shops);
}
效果:
注意,在stream流中跳过了一部分商铺,很有可能导致跳过以后没有商铺可查,从而出现问题,所以需要加上如下判断:
if(list.size()<=from){
return Result.ok(Collections.emptyList());
}
十、用户签到
BitMap用法
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)
Redis中就是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。
练习:
BITFIELD bm1 GET u2 0含义:读取bm1,以无符号十进制的方式读取两位(u2),从第一位开始读取。返回值3代表读取的比特位是11
BITPOS bm1 0 :代表查找第一个0出现的位置
签到功能
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。(Redis的字符串)
接口:
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
具体实现:
@Override
public Result sign() {
//1.获取当前登录的用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key=USER_SIGN_KEY+userId+keySuffix;
//4.获取今天是本月的第几天
int dayOfMonth=now.getDayOfMonth();
//5.写入Redis
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
return Result.ok();
}
测试:
签到统计
Q1:什么叫做连续签到天数?
A1:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
Q2:如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
Q3:如何从后向前遍历每个bit位?
与1做与运算,就能得到最后一个bit位
随后右移1位,下一个bit位就成为了最后一个bit位
案例:实现签到统计功能
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
接口:
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}
具体实现:
@Override
public Result signCount() {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key=USER_SIGN_KEY+userId+keySuffix;
//4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
//5.获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字
List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
BitFieldSubCommands
.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
.valueAt(0));
if(result==null || result.isEmpty()){
//没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if(num==null || num==0){
return Result.ok(0);
}
//6.循环遍历
int count=0;
while(true){
//6.1 让这个数字与1做与运算,得到数字的最后一个bit位
if((num&1)==0){
//如果为0,说明未签到,结束
break;
}else{
//如果不为0,说明已签到,计数器+1
count++;
}
num >>>= 1; // >>>代表无符号右移
}
return Result.ok(count);
}
测试:返回值为3
实际上也确实是3个连续的1:
测试通过!
十一、UV统计
HyperLogLog用法
UV:全称Unique Visitor,也叫独立访问量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断用户是否已经统计过了,要将统计过的用户信息保存,但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。
命令练习:
实现UV统计
我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:
@Test
void testHyperLogLog(){
String[] values=new String[1000];
int j=0;
for (int i = 0; i < 1000000; i++) {
j=i%1000;
values[j]="user_"+i;
if(j==999){
//发送到Redis
stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
}
}
//统计数量
Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
System.out.println("count="+count);
}
运行结果:
内存消耗:
运行前内存:
运行后内存:
1929640-1505552= 424088
424088/1024/1024=0.4M
Redis实战篇完结,恭喜大家~~❀