黑马点评项目笔记
一:数据交互:
1.把String解析成Java对象集合并且存入Redis及Java对象集合转换成JSON。
@Override
public Result queryTypeList() {
String s = stringRedisTemplate.opsForValue().get("cache:list:");
System.out.println("s = " + s);
if(!StrUtil.isBlank(s)){
List<ShopType> cachedList = JSONUtil.toList(s, ShopType.class);
return Result.ok(cachedList);
}
LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.orderByAsc(ShopType::getSort);
List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);
System.out.println("list = " + list);
stringRedisTemplate.opsForValue().set("cache:list:", JSONUtils.toJSONString(list));
return Result.ok(list);
}
2.List存储:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);
System.out.println("stringlist = " + stringlist);
if (stringlist != null && !stringlist.isEmpty()) {
List<ShopType> shopTypeList = new ArrayList<>();
for (String jsonString : stringlist) {
ShopType shopType = JSONUtil.toBean(jsonString, ShopType.class);
shopTypeList.add(shopType);
}
return Result.ok(shopTypeList);
}
LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.orderByAsc(ShopType::getSort);
List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);
System.out.println("list = " + list);
for (int i = 0; i < list.size(); i++) {
stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));
System.out.println(list.get(i));
}
return Result.ok(list);
}
当然,亦可以用Lambda表达式进一步简化:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);
System.out.println("stringlist = " + stringlist);
if (stringlist != null && !stringlist.isEmpty()) {
List<ShopType> shopTypeList = stringlist.stream()
.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
.collect(Collectors.toList());
return Result.ok(shopTypeList);
}
LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.orderByAsc(ShopType::getSort);
List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);
System.out.println("list = " + list);
for (int i = 0; i < list.size(); i++) {
stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));
System.out.println(list.get(i));
}
return Result.ok(list);
}
注意,前端一般要的是Json数组,在这里,我们只需要一个对象集合就行,Springboot会默认JSON格式返回。
对于下面的代码:
List<ShopType> shopTypeList = stringlist.stream()
.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
.collect(Collectors.toList());
return Result.ok(shopTypeList)
- 声明并初始化
shopTypeList
:
List<ShopType> shopTypeList = ...;
这行代码声明了一个ShopType
对象的列表shopTypeList
。它会在后续代码中通过Stream API操作进行初始化。
- 使用Stream API处理
stringlist
:
stringlist.stream()
这行代码将stringlist
(一个包含JSON字符串的列表)转换为一个Stream,以便进行流式处理。
3.映射每个JSON字符串到 ShopType
对象:
.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
使用map
操作,将Stream中的每个JSON字符串元素映射为对应的ShopType
对象。这里JSONUtil.toBean
是一个可以将JSON字符串转换为指定类型对象的方法。
4. 收集结果到新的列表:
.collect(Collectors.toList());
使用collect
操作,将Stream中的元素收集到一个新的列表中。这里使用了Collectors.toList()
作为收集器,它会创建一个新的列表来存储转换后的ShopType
对象。
二:缓存策略:
2.1.更新:
一般我们选用第一种。
对于三,两种都存在问题:
但是,第二种发生问题的概率极低。所以我们用第二种:
@Override
@Transactional
public Result updateshop(Shop shop) {
Long id = shop.getId();
if (id == null){
return Result.fail("店铺id不能为空");
}
updateById(shop);
stringRedisTemplate.delete("cache:shop:" + id);
return Result.ok();
}
2.2.穿透:
我们这里用第一种方法:
if(StrUtil.isNotBlank(s)){
Shop shop = JSONUtil.toBean(s,Shop.class);
return Result.ok(shop);
}
//注意,null不等于""
if(s != null){
return Result.fail("店铺不存在。");
}
//......
if(shop == null){
//写空值
stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺不存在。");
}
2.3.雪崩:
2.4.击穿:
对比:
注意,逻辑过期是假的过期,但是互斥锁是真的没有该数据的缓存了。
互斥锁:
我们用setnx来实现该锁:
private Boolean tryLock(String key){
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(result);
}
private void unLock(String key){
stringRedisTemplate.delete(key);
}
防止大量缓存穿透及锁的使用代码:
public Shop queryWithLock(Long id){
String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);
log.info(s);
if(StrUtil.isNotBlank(s)){
Shop shop = JSONUtil.toBean(s,Shop.class);
return shop;
}
if(s != null){
return null;
}
//缓存重建
String lockKey = "lock:shop:"+id;
Shop shop = null;
try {
if(!tryLock(lockKey)){
Thread.sleep(50);
return queryWithLock(id);
}
shop = getById(id);
log.info("shop = " +shop);
if(shop == null){
//写空值
stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
String key = "cache:shop:" + id;
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
return shop;
}
注意,try-catch-finally是保障锁必须被释放的关键。方法调用如下:
@Override
public Result queryById(Long id) {
//缓存穿透
//Shop shop =queryWithPassTrough(id);
//互斥锁
Shop shop = queryWithLock(id);
if(shop == null){
return Result.fail("店铺不存在。");
}
return Result.ok(shop);
}
逻辑过期:
首先是怎么实现设置逻辑过期:
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
然后我们接下来模拟一下将热点事件提前写入redis:
public void saveShopRedis(Long id,Long expires){
Shop shop = getById(id);
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expires));
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
先给出独立线程代码:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
- ExecutorService: 这是Java的并发包
java.util.concurrent
中的一个接口,它代表了一个用于执行任务的线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的ThreadFactory创建一个新线程。 - CACHE_REBUILD_EXECUTOR: 这是我们创建的ExecutorService对象的变量名。根据命名,它可能用于重建或刷新某种缓存。
- Executors.newFixedThreadPool(10): 这是创建线程池的方法。
Executors
是Java中的一个工具类,它提供了创建线程池的静态方法。newFixedThreadPool(10)
创建了一个固定大小的线程池,其中包含10个线程。这意味着无论提交多少任务,线程池中的线程数量都不会超过10个。当提交的任务数超过线程数时,任务会在队列中等待,直到线程空闲并可以处理它们。
简而言之,这行代码定义了一个私有的、静态的、不可变的ExecutorService
对象,该对象是一个包含10个线程的固定线程池,用于处理与缓存重建相关的任务。
接下来就是业务代码:
public Shop queryWithExpire(Long id){
String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);
log.info(s);
if(StrUtil.isBlank(s)){
return null;
}
RedisData bean = JSONUtil.toBean(s, RedisData.class);
JSONObject object = (JSONObject) bean.getData();
Shop shop = JSONUtil.toBean(object, Shop.class);
LocalDateTime expirationTime = bean.getExpireTime();
if(expirationTime.isAfter(LocalDateTime.now())){
return shop;
}
//缓存重建
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if(isLock){
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
saveShopRedis(id,20L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});
}
return shop;
}
记得在finally中释放锁。
2.5.自定义工具类:
@Slf4j
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public void set(String key, Object value, Long time , TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
private Boolean tryLock(String key){
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(result);
}
private void unLock(String key){
stringRedisTemplate.delete(key);
}
public void setWithLogicalExpire (String key, Object value, Long time , TimeUnit unit) {
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
//穿透代码
public <R,ID> R queryWithPassTrough (String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit) {
String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);
log.info(s);
if(StrUtil.isNotBlank(s)){
return JSONUtil.toBean(s,type);
}
if(s != null){
return null;
}
R r = dbFallback.apply(id);
if(r == null){
//写空值
stringRedisTemplate.opsForValue().set(keyPrefix + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
String key = keyPrefix + id;
this.set(key,r,time,unit);
return r;
}
public <R,ID>R queryWithExpire (String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);
log.info(s);
if(StrUtil.isBlank(s)){
return null;
}
RedisData bean = JSONUtil.toBean(s, RedisData.class);
JSONObject object = (JSONObject) bean.getData();
R r = JSONUtil.toBean(object, type);
LocalDateTime expirationTime = bean.getExpireTime();
if(expirationTime.isAfter(LocalDateTime.now())){
return r;
}
//缓存重建
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if(isLock){
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
R apply = dbFallback.apply(id);
this.setWithLogicalExpire(keyPrefix + id,apply,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});
}
return r;
}
}
补充:
这里的StringRedisTemplate没有调用set方法,也没有注解,但是依然实现了依赖注入,这是因为在spring中,如果注入对象,被注入对象都加入IOC容器,并且有一个单一的构造函数,那么就可以实现不需要注解与配置的依赖注入。
使用举例:
@Override
public Result queryById(Long id) {
Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);
if(shop == null){
return Result.fail("店铺不存在。");
}
return Result.ok(shop);
}
注意,如果是缓存击穿,一定要数据预热。
三:秒杀业务:
在这种业务中,我们用全局ID生成器来生成ID:
我们用redis来处理,可以满足要求。无论在哪里操作数据库,都是redis。
常见全局唯一ID策略:
注意,这里的数据库自增并不是数据库单纯的自增,因为我们知道,在后面的学习中,是分布式的,到时候,不同的线程可能会导致乱序,重序,这里是单独的那另外一张表做自增,其他的线程都从这里取id。
下面我们来看看基于redis自增的策略:
public class RedisIDWorker {
public static final long BEGIN_TIMESTAMP = 1640995200L;
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIDWorker (StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId (String keyPrefix) {
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
return timestamp << COUNT_BITS | count;
}
}
increment(String key): 这个方法是对指定的 key 进行自增操作。如果该 key 不存在,那么它的初始值会被设为 0,然后进行自增,结果为 1。如果 key 存在且其值可以转换为整数,则该值会进行自增。
3.2.超卖问题:
但是,这里的票的变化是与版本同步的,所以我们没有必要用版本号,直接用票作为乐观锁。
CAS是指"比较和交换"(Compare and Swap)的缩写,是一种原子操作,通常用于多线程编程中实现同步。CAS操作包括三个参数:需要进行操作的内存位置、旧的预期值和新的值。CAS操作会比较内存位置的当前值与旧的预期值,如果相同则将内存位置的值更新为新的值,否则不做任何操作。CAS操作是一种乐观锁的实现方式,可以避免使用传统的锁机制带来的性能开销和死锁问题。CAS操作在Java中的实现包括AtomicInteger、AtomicLong等类。
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
SeckillVoucher byId = seckillVoucherService.getById(voucherId);
if(byId.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀还没开始");
}
if(byId.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
if(byId.getStock()<=0){
return Result.fail("库存不足");
}
boolean success = seckillVoucherService.update().setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.eq("stock",byId.getStock())
.update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
save(voucherOrder);
return Result.ok(orderId);
}
}
注意,乐观锁也有弊端,他在多个线程同时买票时只能允许一个线程成功,其他线程都会失败,所以我们要解决这个问题:
boolean success = seckillVoucherService.update().setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
.eq("voucher_id", voucherId)
:- 这是一个条件语句,表示只更新那些
voucher_id
字段值等于voucherId
的记录。voucherId
应该是一个变量,存储了要更新的优惠券的ID。
- 这是一个条件语句,表示只更新那些
.gt("stock",0)
:- 这是另一个条件语句,表示只更新那些
stock
字段值大于0的记录。gt
是“greater than”的缩写,所以这个条件确保不会更新那些库存已经为0的记录。
- 这是另一个条件语句,表示只更新那些
.update();
:- 最后,调用
update
方法执行实际的更新操作。
- 最后,调用
注意,这里事实上是利用了数据库自带的行锁,当某个事务尝试更新某一行记录时,数据库会为该行加上行锁,阻止其他事务同时修改该行,直到当前事务完成并释放锁。
3.3.一人一单:
Long userid = UserHolder.getUser().getId();
long count = query().eq("user_id",userid).eq("voucher_id",voucherId).count();
if(count > 0){
return Result.fail("您已经购买过一次!");
}
但是依然有问题,用户多线程操作时,依然会存在都判断通过的问题,所以我们先把代码分成几个方法,然后加锁:
@Transactional
public Result createVoucher(Long voucherId){
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {
long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经购买过一次!");
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
save(voucherOrder);
return Result.ok(orderId);
}
}
这里有两个细节:
1.userid.toString().intern()
,当处理一个用户的多个请求时,如果只用toString()
,该方法在Java中是new一个新的对象,那么即使是同一个用户,可能他的多次请求的锁都不一样,这是有问题的,但是intern()
是在字符串常量池中判断有没有这个这个,如果有,就用这个对象。
2.不能在方法上加锁:因为所有的线程都走一个方法,可能会导致其他问题。
但是这样还是有问题:
因为这里又有事务又有锁,锁在事务的内部,如果锁先释放,事务还没有提交,然后其他线程去查询,有可能查询不到,就会存在问题:
所以我们把锁加在外面:
@Transactional
public Result createVoucher(Long voucherId){
Long userid = UserHolder.getUser().getId();
long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经购买过一次!");
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
save(voucherOrder);
return Result.ok(orderId);
}
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {
return createVoucher(voucherId);
}
但是这样还是有问题:
在外部同步代码块中直接调用带有@Transactional注解的方法可能会导致事务管理不生效的原因,主要是因为Spring的AOP(面向切面编程)代理机制。@Transactional注解的事务管理是通过Spring的AOP代理来实现的,具体来说:
代理模式:**Spring使用代理(通常是JDK动态代理或者CGLIB代理)来拦截那些标有@Transactional注解的方法调用。**当一个被代理的对象调用这些方法时,Spring会在调用前后插入事务管理的逻辑,比如开始事务、提交或回滚事务。
自我调用问题:在同一个类内部,如果一个方法直接调用另一个带有@Transactional注解的方法,这种调用是“直接调用”,而非通过代理对象进行。因此,Spring的AOP代理无法拦截到这种内部调用,从而不会触发事务管理的行为。
在代码示例中,虽然createVoucher方法上有@Transactional注解,但如果它是直接在类内部通过同步代码块被调用,那么这个直接调用就绕过了Spring的AOP代理,事务管理因此可能不会生效。
为了解决这个问题,确保事务管理能够正常工作,可以考虑以下几种方案:
1.通过代理对象调用:如果必须在类内部调用事务方法,可以通过注入当前类的bean实例来间接调用,这样Spring的AOP代理就能生效。重构方法结构:将事务逻辑和同步逻辑整合到一个方法中,确保事务和同步都在同一个由Spring管理的方法体内执行。
2.使用AspectJ编织:Spring也支持使用AspectJ来实现更细粒度的切面编织,包括对类内部方法调用的拦截,但这需要额外的配置和可能的编译期织入。
综上所述,直接在同步代码块中调用事务方法的问题在于它规避了Spring AOP代理,从而可能忽略了事务管理的预期效果。
修改如下:
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucher(voucherId);
}
AopContext.currentProxy(): 这是Spring提供的一个方法,用于在AOP代理方法内部获取当前代理对象。这一步很关键,因为它允许在类内部调用时仍然通过代理来执行,从而确保了@Transactional注解的事务管理能够生效。通过代理调用createVoucher: 通过强制类型转换得到的代理对象proxy来调用createVoucher方法,而不是直接调用。这样做使得事务管理逻辑能够被Spring的AOP代理捕获并正确执行,包括事务的开始、提交或回滚。
导入依赖:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
暴露代理对象,在启动类上面加上注解:
@EnableAspectJAutoProxy(exposeProxy = true)
集群的并发问题:
每个jvm内部共享一个synchronized锁监视器。如果在同一个tomcat,那肯定是没有问题的,但是如果是多个服务器,就有多个tomcat,JVM,就会出现问题。
3.4.分布式锁:
所以我们使用Redis。
为了避免释放错误,所以我们加上过期时间。注意,设置过期时间与setnx是两个语句,我们必须确保它们具有原子性。所以我们可以用set语句:
set lock k1 EX 10 NX
我们用非阻塞式实现这个操作。
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public void unLock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
@Override
public boolean tryLock(long timeoutSec) {
long id = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);
return Boolean.TRUE.equals(success);
}
}
使用方法:
Long userid = UserHolder.getUser().getId();
SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);
boolean isLock = lock.tryLock(1200);
if(!isLock){
return Result.fail("不允许重复下单");
}
try {
synchronized(userid.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucher(voucherId);
}
} finally {
lock.unLock();
}
可能存在的问题:
所以,要判断这个锁是不是自己的,不要把别人的锁释放了。
注意,不要用本身的线程Id作为判断标准,因为不同的JVM有不同的线程Id,代码改进如下:
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
//......其他代码
@Override
public void unLock() {
String id = ID_PREFIX + Thread.currentThread().getId();
String lock = stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);
if(id.equals(lock)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
@Override
public boolean tryLock(long timeoutSec) {
String id = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);
return Boolean.TRUE.equals(success);
}
但是事实上,还是有问题的(md,怎么这么多问题):
下面是一个极端情况:
如果线程一判断相同通过了,但是删除锁的时候被阻塞了,还是会删除其他线程的锁。但是为什么这样的情况我们可以删掉呢:
注意这里存入redis的key,是前缀加用户ID,而前缀,后缀都是static修饰的,在同一个服务器内前缀后缀是一样的,而我们这里解决的是一人一单,也就是说,用户的ID也是一样的,因此在同一台服务器下的同一个用户的key就是固定的。其次,我们上面将后缀加上线程设置为key值,这是为了应对多台服务器下的问题,而我们用的redis,也是为了解决多台服务器下的并发问题,因此在这里,反而在同一台服务器下,在判断通过后删除之前的间隙中,删除动作迟迟不执行,然后老线程的key过期了,然后另外一个线程创建了一个相同的key,此时,老线程执行删除动作,就会删除掉新线程的key。注意,删除是按照key的!!!
所以我们引入Lua脚本:
新建一个unlock.lua文件:
if(redis.call('get',KEYS[1]) == ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0
然后使用静态代码块加载脚本文件:
private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
使用脚本:
@Override
public void unLock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
3.5.分布式锁优化——Redisson:
因为自己写实在是太烦琐了,所以我们引入了其他第三方服务:
导入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.6.128").setPassword("kz32330981");
return Redisson.create(config);
}
}
使用分布式锁:
举例:
@Override
public Result seckillVoucher(Long voucherId) {
SeckillVoucher byId = seckillVoucherService.getById(voucherId);
if(byId.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀还没开始");
}
if(byId.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
if(byId.getStock()<=0){
return Result.fail("库存不足");
}
boolean success = seckillVoucherService.update().setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",byId.getStock())
.update();
if(!success){
return Result.fail("库存不足");
}
Long userid = UserHolder.getUser().getId();
//SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userid);
boolean isLock = lock.tryLock();
if(!isLock){
return Result.fail("不允许重复下单");
}
try {
synchronized(userid.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucher(voucherId);
}
} finally {
lock.unlock();
}
}
这里boolean isLock = lock.tryLock();
没有设置参数,默认失败一次就马上返回。
3.6.Redissson可重入锁原理:
为什么一般情况下不能实现可重入锁:
string只能有一个标识,当我们实现可重入锁时,如果是当前的线程,我们把它的value++,释放锁的时候–,所以我们需要两个标识,所以我们使用hash:
Redisson内部也是这么实现的。
3.6.Redissson可重试与避免超时释放原理:
注意,这里为null说明获取锁成功了。为null说明没有存在这个锁,所以当然会成功了。如果失败,返回的是当前存在的锁的有效期。
重试机制:
在Redisson中,可重试机制通常与分布式锁的实现紧密相关。当客户端尝试获取分布式锁但失败时(如锁已被其他客户端持有),Redisson会利用Redis的发布/订阅机制来实现锁的可重试。下面我将简要解释Redisson是如何结合消息订阅来实现可重试机制的:
- 锁获取失败与等待
- 当客户端尝试获取分布式锁但失败时(比如调用
lock.tryLock(waitTime, leaseTime, TimeUnit.unit)
方法),Redisson会记录当前线程并启动一个定时任务或监听器来等待锁的释放。
- 当客户端尝试获取分布式锁但失败时(比如调用
- 消息订阅
- 为了实现等待锁的释放,Redisson会利用Redis的发布/订阅机制。具体来说,Redisson会为每个锁维护一个特定的频道(channel)或主题(topic)。
- 当客户端尝试获取锁但失败时,它会订阅该锁的频道。这样,一旦锁的持有者释放了锁,Redisson就会在该频道上发布一个消息。
- 监听器与重试
- 客户端在订阅了锁的频道后,会设置一个监听器来监听该频道上的消息。
- 当锁的持有者释放锁并在频道上发布消息时,监听器会立即收到通知。
- 监听器在收到通知后,会触发一个重试机制,即再次尝试获取锁。
避免超时释放:
-
Redis中的看门狗策略通常与Redisson库一起使用,用于自动检测并处理过期键的机制。
-
应用程序使用Redisson库监视Redis服务器上的指定键。当这些键被修改时,看门狗策略会自动触发相应的操作,如更新本地缓存或重新计算依赖项。
-
这有助于应用程序实时响应Redis中的数据变化,并防止在数据发生变化时出现问题,如死锁和其他并发问题。
-
具体来说,
leaseTime
是锁的有效期,也就是锁的持有时间。当调用lock(leaseTime, unit)
方法并传入一个非-1的leaseTime
时,这个特定的锁将会在一个固定的时间后自动释放,即使锁的持有者仍然在运行。在这种情况下,Redisson不会启动看门狗机制,因为锁的过期时间是由应用程序明确指定的。然而,如果调用lock()
方法(没有传入leaseTime
参数),或者传入-1
作为leaseTime
的值,那么锁将没有固定的过期时间。此时,Redisson会启动看门狗机制。看门狗会定期检查锁的持有者是否仍然活跃,并在锁即将过期时自动延长锁的有效期,从而确保锁不会被意外释放,直到锁的持有者显式地释放锁或者持有者不再活跃。
注意,只用我们没有设置时间(或者-1)才会触发这个策略。
3.7.主从一致性:
为什么以前会出现问题:
当主节点出现问题时,从从节点获取锁就会成功。
而Redisson是怎么解决这个问题的呢:
以下是如何使用Redisson的MultiLock
的基本步骤:
-
配置Redisson客户端:
首先,你需要配置Redisson客户端以连接到Redis集群或哨兵模式,如之前所述。 -
创建RLock对象:
对于每个Redis节点,你需要创建一个RLock
对象。这些RLock
对象将用于组成MultiLock
。RLock lock1 = redisson1.getLock("lock1"); // 假设redisson1连接到第一个Redis节点 RLock lock2 = redisson2.getLock("lock2"); // 假设redisson2连接到第二个Redis节点 // ... 为其他节点创建RLock对象 ...
注意:虽然锁的名称(如
"lock1"
和"lock2"
)可以不同,但出于一致性和可维护性的考虑,最好使用相同的锁名称。 -
组合RLock对象为MultiLock:
使用这些RLock
对象创建一个MultiLock
。List<RLock> locks = Arrays.asList(lock1, lock2, /* ... 其他locks ... */); RLock multiLock = redisson.getMultiLock(locks);
-
使用MultiLock:
现在你可以像使用普通的RLock
一样使用MultiLock
了。multiLock.lock(); try { // 临界区代码,只有获得锁的线程才能执行 } finally { multiLock.unlock(); }
注意:在
finally
块中解锁是非常重要的,以确保锁总是被释放,即使发生异常。 -
关闭Redisson客户端:
完成所有操作后,确保关闭所有Redisson客户端实例以释放资源。redisson1.shutdown(); redisson2.shutdown(); // ... 关闭其他Redisson客户端 ...
-
测试:
为了测试MultiLock
在多节点环境中的行为,你可以模拟节点故障、网络分区等情况,并观察MultiLock
是否仍然能够正确工作。
总结:
3.7.异步秒杀:
原来的串行执行:
优化如下:
完整的流程如下:
Part 1:保存到redis:增加redis中的优惠券数量:
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), voucher.getStock().toString());
}
Part 2:编写Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) then
return 1
end
if(redis.call('sismember',orderKey,userId) == 1) then
return 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
- 参数定义:
ARGV[1]
:voucherId
,表示优惠券ID。ARGV[2]
:userId
,表示用户的ID。
- 定义键名:
stockKey
:这是优惠券库存的键名,格式为'seckill:stock:'
加上voucherId
。orderKey
:这是已购买(或已下单)该优惠券的用户的集合的键名,格式为'seckill:order:'
加上voucherId
。
- 检查库存:
- 使用
redis.call('get', stockKey)
获取库存数量。 - 使用
tonumber
函数将返回的字符串转换为数字。 - 如果库存数量小于或等于0,则返回
1
,表示库存不足。
- 使用
- 检查用户是否已经购买:
- 使用
redis.call('sismember', orderKey, userId)
检查用户是否已经在orderKey
集合中。 - 如果用户已经购买(即用户ID在集合中),则返回
2
,表示用户已购买。
- 使用
- 扣减库存并记录订单:
- 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
- 使用
redis.call('incrby', stockKey, -1)
扣减库存。 - 使用
redis.call('sadd', orderKey, userId)
将用户ID添加到已购买集合中。
- 使用
- 这两个操作是原子性的,因为它们都在同一个Lua脚本中执行。
- 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
- 返回值:
- 如果成功扣减库存并记录订单,则返回
0
,表示操作成功。 - 如果库存不足,返回
1
。 - 如果用户已购买,返回
2
。
- 如果成功扣减库存并记录订单,则返回
Part 3:基于阻塞队列的秒杀业务:
private BlockingQueue<VoucherOrder >orderTasks = new ArrayBlockingQueue<>(1024*1024);
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
Long resultT =stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
String.valueOf(UserHolder.getUser().getId())
);
int res = resultT.intValue();
if(res!=0){
return Result.fail(res==1?"库存不足":"不能重复下单");
}
long id = redisIDWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(id);
voucherOrder.setUserId(UserHolder.getUser().getId());
//在父线程就先获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
orderTasks.add(voucherOrder);
return Result.ok(id);
}
Part 4:异步下单业务:
private static final ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
private IVoucherOrderService proxy;
private class VoucherOrderHandler implements Runnable{
@PostConstruct
public void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}
@Override
public void run() {
while (true){
try {
VoucherOrder voucherOrder = orderTasks.take();
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常",e);
}
}
}
}
-
@PostConstruct注解:
这个注解标记在init()方法上,表示该方法会在类的实例创建并注入到Spring容器后立即调用。这里是用来启动任务处理的逻辑。
-
init()方法:
seckill_order_executor是一个线程池,可能是ExecutorService类型的实例。
方法中提交了一个新的VoucherOrderHandler实例到线程池执行。这意味着每次初始化VoucherOrderHandler,都会启动一个新的线程来处理订单任务,形成一个循环执行的任务队列。 -
run()方法:
作为Runnable接口的实现,run()方法包含实际的业务逻辑。使用一个无限循环while (true),确保线程将持续运行,直到程序停止或线程被显式中断。
-
try-catch块:
在循环中,从orderTasks队列(可能是BlockingQueue类型)中获取一个VoucherOrder对象并调用handleVoucherOrder(voucherOrder)进行处理。如果在处理过程中出现任何异常,捕获并记录错误日志,但不会中断循环。这样可以确保即使处理过程中出现问题,线程仍能继续尝试处理下一个订单。
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock){
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucher(voucherOrder);
}finally {
lock.unlock();
}
}
注意这里的两处细节,userId与createVoucher都与父线程有关系,而在这里是异步的子线程,所以代理对象要在父线程就创建好,而userid也不能通过线程本地直接获取。这里用Redisson重复验证是为了让线程更加安全。
@Transactional
public void createVoucher (VoucherOrder voucherOrder){
Long userid = voucherOrder.getId();
long count = query().eq("user_id", userid).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("用户已经购买过一次!");
return ;
}
boolean success = seckillVoucherService.update().setSql("stock = stock -1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock",0)
.update();
if(!success){
log.error("库存不足");
return;
}
save(voucherOrder);
}
重复验证道理同上。
3.8.阻塞队列优化——消息队列:
基于上面的阻塞队列存在两个问题:1.内存问题 2.服务宕机的安全问题,所以我们必须进行优化。
1.独立于JVM,解决内存问题。
2.内部做了数据的持久化,避免安全问题。
(1):基于List
(2):PubSub:
如果没有被任何频道订阅,会直接丢失。
(3):Stream:
发送消息:
读取消息:
3.9.基于Stream的消费者组:
基于Java代码:
总结:
3.10.代码改造:
(1):创建消息队列:
XGROUP CREATE stream.orders g1 0 MKSTREAM
这是创建消费者组,自动创建了队列。
(2):修改Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) then
return 1
end
if(redis.call('sismember',orderKey,userId) == 1) then
return 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
redis.call('xadd','strem.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
这行代码是使用 Redis 的 XADD 命令向名为 “stream.orders” 的 Stream 数据结构中添加一条新的消息。该消息包含了三个字段:“userId”、“voucherId” 和 “id”,分别对应传入的参数 userId、voucherId 和 orderId 的值。这样可以将订单相关的信息存储在 Redis 中,并且可以通过 Stream 数据结构的功能对这些信息进行处理和查询。这种方式可以方便地实现消息队列、事件日志等功能。
(3):修改发消息业务代码:
@Override
public Result seckillVoucher(Long voucherId) {
long id = redisIDWorker.nextId("order");
Long resultT =stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
String.valueOf(UserHolder.getUser().getId()),String.valueOf(id)
);
int res = resultT.intValue();
if(res!=0){
return Result.fail(res==1?"库存不足":"不能重复下单");
}
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(id);
}
(4):收消息:
private class VoucherOrderHandler implements Runnable{
String queueName = "stream.orders";
@PostConstruct
public void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}
@Override
public void run() {
while (true){
try {
List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
if(list == null || list.isEmpty()){
continue;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
//ack
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
handlePendinglist();
}
}
}
private void handlePendinglist() {
while (true){
try {
List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
if(list == null || list.isEmpty()){
break;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
//ack
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理pending异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException interruptedException) {
e.printStackTrace();
}
}
}
}
}
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock){
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucher(voucherOrder);
}finally {
lock.unlock();
}
}
我们分成三个部分来看:
4.1.正常接受处理消息:
String queueName = "stream.orders";
@PostConstruct
public void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}
@Override
public void run() {
while (true){
try {
List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
if(list == null || list.isEmpty()){
continue;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
//ack
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
handlePendinglist();
}
}
}
- 参数详解:
- Consumer.from(“g1”, “c1”)
- 这表示从消费者组 “g1” 中的消费者 “c1” 读取数据。在 RedisStreams 中,消费者组用于允许多个消费者并发地读取同一个 Stream。注意,这里因为没有c1,所以先创建了c1。
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
StreamReadOptions.empty()
创建一个空的读取选项对象。.count(1)
设置读取的条目数量为 1。这意味着,无论 Stream 中有多少未读的数据,此方法仅返回一个条目。.block(Duration.ofSeconds(2))
设置读取操作的阻塞时间。如果 Stream 中没有新的数据可读,那么这个方法会阻塞最多 2 秒,然后返回一个空列表。
- StreamOffset.create(queueName, ReadOffset.lastConsumed())
- 这定义了从哪个位置开始读取 Stream。
queueName
是 Stream 的名称。ReadOffset.lastConsumed()
表示从消费者 “c1” 最后消费的位置开始读取。如果消费者 “c1” 还没有消费过任何数据,那么这个位置可能是 Stream 的开始位置或任何其他预定义的位置(例如 Stream 的起始点)。
- Consumer.from(“g1”, “c1”)
等价于:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS mystream >
4.2.处理因异常而位于pending-list的消息:
private void handlePendinglist() {
while (true){
try {
List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
if(list == null || list.isEmpty()){
break;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
//ack
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理pending异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException interruptedException) {
e.printStackTrace();
}
}
}
}
与上面同理,只是这里是获取未确认的消息,这里的异常不做递归处理,因为外面本身就是true循环。
4.3.新增:
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock){
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucher(voucherOrder);
}finally {
lock.unlock();
}
}
四:其他业务:
4.1.不存在该表中字段处理方法如下,后续需要自己维护:
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
4.2.点赞排序:
改造前:
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//......
stringRedisTemplate.opsForSet().add(key,userId.toString());
//......
stringRedisTemplate.opsForSet().remove(key,userId.toString());
改造后:
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//...
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
//......
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
部分代码如下:
@Override
public Result queryBlogLikes(Long id) {
String key = "blog:liked:" + id;
Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(range == null || range.isEmpty()){
return Result.ok(Collections.emptyList());
}
List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
List <UserDTO> users = userService.listByIds(ids).stream().map(
user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());
return Result.ok(users);
}
注意,这里有个问题,取出的结果是有序的,都是我们在进行userService.listByIds(ids)
时,数据库内部用的是WHERE id IN(1,5)
,而它在数据库内部的索引中,就优化了,导致无论怎么样,都是先1再5。可以用下面的sql语句优化:
在MP中,就可以这么写:
@Override
public Result queryBlogLikes(Long id) {
String key = "blog:liked:" + id;
Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(range == null || range.isEmpty()){
return Result.ok(Collections.emptyList());
}
List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
List <UserDTO> users = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list()
.stream().map(
user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());
return Result.ok(users);
}
4.3.共同关注:
基于Set集合的交集:
@Override
public Result followCommons(Long id) {
Long userId = UserHolder.getUser().getId();
Set<String> intersect = stringRedisTemplate.opsForSet().intersect("follows:" + userId, "follows:" + id);
if(intersect==null||intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
List<Long> collect = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
List<UserDTO> userDTOS = userService.listByIds(collect).stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(userDTOS);
}
4.4.关注推送——Feed流
以下是基于推模式:
分页问题:
滚动查询:
代码实现如下:
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
Long userId = UserHolder.getUser().getId();
String key = "feed:" + userId;
Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
if(typedTuples == null || typedTuples.isEmpty()){
return Result.ok();
}
List<Long>ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int os = 1;
for(ZSetOperations.TypedTuple <String> tuple : typedTuples){
ids.add(Long.valueOf(tuple.getValue()));
long time = tuple.getScore().longValue();
if(time == minTime){
os++;
}
else{
minTime = time;
os = 1;
}
}
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();
for (Blog blog : blogs) {
queryBlogUser(blog);
queryBlogLikes(blog);
}
ScoreResult r = new ScoreResult();
r.setList(blogs);
r.setMinTime(minTime);
r.setOffset(os);
return Result.ok(r);
}
下面是关键代码解释:
Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
参数
key
: 这是 Redis 中有序集合的键名。在这个例子中,它是通过"feed:" + userId
构建的,其中userId
是从UserHolder
获取的当前用户的 ID。0
: 分数范围查询的起始分数(包含)。在这个例子中,查询从分数 0 开始。max
: 分数范围查询的结束分数(不包含)。这是你要查询的最高分数(不包括这个分数)。offset
: 分页查询的偏移量。这表示从有序集合中跳过多少个元素后再开始返回结果。2
: 这是一个限制参数,表示最多返回 2 个元素。然而,在实际应用中,这个限制可能不是非常有用,因为它会限制返回结果的数量。通常,你可能会根据分页参数(如每页大小)来动态设置这个值。
在这个例子中,通过 reverseRangeByScoreWithScores(key, 0, max, offset, 2)
,你正在查询从分数 0 到 max
(不包含)之间的博客,按分数从高到低排序,并跳过 offset
个元素,最后最多返回 2 个结果。每个结果都是一个 TypedTuple
,它包含博客的 ID 和其发布时间(作为分数)。
注意,分布时间越前的,id越小,所以从零开始,而且这里的os,及偏移量,是上次发布时间最早的时间的重复个数,不是在所有结果中的个数,比如时间戳为4 4 4 3,每次两个,第一次查出4与4,4虽然有三个,但是这里记录的是上一次查询中重复的最小时间,及两个,下一次,就从0到最大时间4,并且跳过2个,即第三个4开始查询。
4.5.附近商铺:
我们可以先做数据预热,先在test中存入redis:
@Test
public void test(){
List<Shop> list = shopService.list();
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
for(Map.Entry<Long, List<Shop>> entry : map.entrySet()){
Long typeId = entry.getKey();
String key = "shop:geo:" + typeId;
List<Shop> value = entry.getValue();
for(Shop shop : value){
stringRedisTemplate.opsForGeo().add(key,new org.springframework.data.geo.Point(shop.getX(),shop.getY()),shop.getId().toString());
}
}
}
或者可以这么写:
代码如下:
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
if (x == null || y == null){
// 根据类型分页查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = (current + 1) * SystemConstants.DEFAULT_PAGE_SIZE;
String key = SHOP_GEO_KEY + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String> >search = stringRedisTemplate.opsForGeo()
.search(key,
GeoReference.fromCoordinate(x, y),
new Distance(5000),
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance()
.limit(end));
if(search == null){
return Result.ok();
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>>contents = search.getContent();
if(contents.size()<=from){
return Result.ok();
}
//截取
List <Long> ids = new ArrayList<>(contents.size());
Map<String,Distance>distanceMap = new HashMap<>(contents.size());
contents.stream().skip(from).forEach(result->{
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
Distance distance = result.getDistance();
distanceMap.put(shopIdStr,distance);
});
List<Shop> shops = query().in("id",ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();
for (Shop shop : shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
return Result.ok(shops);
}
4.6.用户签到:
@Override
public Result sign() {
Long userId = UserHolder.getUser().getId();
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
int day = now.getDayOfMonth() - 1;
stringRedisTemplate.opsForValue().setBit(key, day, true);
return Result.ok();
}
String keySuffix = now.format(DateTimeFormatter.ofPattern(“:yyyyMM”));: 根据当前日期的年月(格式为"yyyyMM",例如"202302")创建一个字符串后缀,用于构建Redis键。
int day = now.getDayOfMonth() - 1;: 获取当前日期减1(因为数组或位数组通常从0开始计数,所以这里将日期减1来对应位数组的索引)。
stringRedisTemplate.opsForValue().setBit(key, day, true);: 使用Spring Data Redis的StringRedisTemplate操作位值。在Redis的键key对应的位数组中,设置第day位为true,表示用户在这一天签到了。
连续签到:
@Override
public Result signCount() {
Long userId = UserHolder.getUser().getId();
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
int day = now.getDayOfMonth();
List <Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(day))
.valueAt(0)
);
if(result == null || result.isEmpty()){
return Result.ok(0);
}
Long num = result.get(0);
if(num == null || num == 0){
return Result.ok(0);
}
int count = 0;
while (true){
if((num & 1) == 0){
break;
}else {
count++;
}
num >>>= 1;
}
return Result.ok(count);
}
关键代码如下:
List <Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(day))
.valueAt(0)
);