一、引言
在电商、医疗预约等众多业务场景中,经常会遇到需要处理超时任务的情况。比如医疗预约订单,如果患者在支付成功后,到了预约结束时间还未报到,系统需要自动取消订单。为了实现这样的功能,我们可以利用 Redis 延时队列。本文将详细介绍 Redis 延时队列的使用,对比它与其他消息队列的优缺点,并结合实际的订单超时未报到业务代码进行分享。
二、Redis 延时队列详细介绍
2.1 什么是 Redis 延时队列
Redis 延时队列是一种特殊的队列,它允许元素在指定的时间后才被消费。在 Redis 中,通常可以使用有序集合(Sorted Set)或 Redisson 提供的延迟队列来实现。有序集合的分数可以用来表示元素的过期时间,通过不断轮询有序集合,当分数小于当前时间时,就将元素取出消费。而 Redisson 则提供了更方便的 API 来实现延时队列,它内部封装了很多复杂的操作,让开发者可以更简单地使用。
2.2 工作原理
以 Redisson 实现的延时队列为例,它基于 Redis 的 List 和 ZSet 数据结构。当我们向延时队列中添加元素时,Redisson 会将元素存储在一个 ZSet 中,分数为元素的过期时间。同时,会有一个后台线程不断轮询 ZSet,当发现有元素的分数小于当前时间时,就将元素从 ZSet 移动到 List 中,然后消费者就可以从 List 中获取元素进行消费。
三、Redis 延时队列与其他消息队列的对比
3.1 与 MQ(如 RabbitMQ)对比
- 优点
- 简单易用:Redis 延时队列的实现相对简单,不需要像 RabbitMQ 那样复杂的配置和管理。对于一些简单的业务场景,使用 Redis 延时队列可以快速实现功能。
- 性能高:Redis 是基于内存的数据库,读写速度非常快。在处理大量的延时任务时,Redis 延时队列可以提供更高的性能。
- 缺点
- 功能有限:相比 RabbitMQ,Redis 延时队列的功能相对较少。例如,RabbitMQ 支持多种消息模式(如发布 - 订阅、路由等),而 Redis 延时队列主要用于处理延时任务。
- 可靠性低:Redis 没有像 RabbitMQ 那样完善的消息确认机制和持久化策略。如果 Redis 出现故障,可能会导致部分消息丢失。
- 应用场景
- Redis 延时队列:适用于对性能要求较高、业务逻辑相对简单的延时任务场景,如订单超时未支付、缓存过期等。
- RabbitMQ:适用于对消息可靠性要求较高、业务逻辑复杂的场景,如分布式系统中的消息传递、异步任务处理等。
3.2 与 Kafka 对比
- 优点
- 低延迟:Redis 延时队列的响应速度非常快,可以在短时间内处理大量的延时任务。而 Kafka 主要用于高吞吐量的消息处理,在处理延时任务时可能会有一定的延迟。
- 易于集成:Redis 可以很方便地与各种编程语言和框架集成,对于开发者来说更加友好。
- 缺点
- 吞吐量低:Kafka 具有高吞吐量的特点,可以处理海量的消息。而 Redis 延时队列在处理大规模数据时,吞吐量相对较低。
- 数据持久化弱:Kafka 支持数据的持久化存储,即使服务器重启也不会丢失数据。而 Redis 的数据持久化策略相对较弱,可能会导致数据丢失。
- 应用场景
- Redis 延时队列:适用于对延迟要求较高、数据量较小的延时任务场景。
- Kafka:适用于大数据处理、日志收集等需要高吞吐量的场景。
3.3 为什么订单超时未报到使用延时队列
在订单超时未报到的场景中,我们需要在订单支付成功后,在预约结束时间到达时自动取消订单。这个场景对延迟要求较高,需要在指定的时间点准确地执行任务。Redis 延时队列可以很好地满足这个需求,它可以在指定的时间后将订单信息从队列中取出,然后进行相应的处理。而且,这个业务场景相对简单,不需要像 MQ 或 Kafka 那样复杂的功能,因此使用 Redis 延时队列更加合适。
四、订单超时未报到业务代码分享
4.1 Redis 延时队列工具类
import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author
* description: reids 延迟队列工具类
*/
@Slf4j
@Component
public class RedisDelayQueueUtil<T> implements ApplicationContextAware {
public static RedissonClient redissonClient;
private T obj;
public static Set<String> queueCodeSet = new ConcurrentHashSet<>();
//release发布后开启
@Scheduled(cron = "0 */10 * * * ?")
private void keepAlive() {
queueCodeSet.forEach((code) -> {
//云redis会主动断掉长期未使用的链接,主动激活
addDelayQueue("keepAlive", 1, TimeUnit.SECONDS, code);
});
}
/**
* 添加延迟队列
*
* @param value 队列值
* @param delay 延迟时间
* @param timeUnit 时间单位
* @param queueCode 队列键
* @param <T>
*/
public static <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode) {
try {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(value, delay, timeUnit);
log.debug("Redisson 添加延时队列成功 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
} catch (Exception e) {
log.error("Redisson 添加延时队列失败 {}", e.getMessage());
throw new RuntimeException("Redisson添加延时队列失败");
}
}
/**
* 获取延迟队列 - 会阻塞
*
* @param queueCode 队列名称
* @return <T> 数据
* @throws InterruptedException
*/
public static <T> Optional<T> getDelayQueue(String queueCode) throws InterruptedException {
queueCodeSet.add(queueCode);
RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);
try {
T t = blockingDeque.take();
if(Objects.equals("keepAlive",t)){
return Optional.empty();
}
return Optional.of(t);
} catch (Exception e) {
return Optional.empty();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RedisDelayQueueUtil.redissonClient = applicationContext.getBean(RedissonClient.class);
}
}
4.2 订单支付成功后入延时队列
ps:只是部分代码,主要展示延时队列的添加
@Slf4j
@Service
public class WechatPayCallbackServiceImpl implements WechatPayCallbackService {
@Resource
private HisFeign hisFeign;
@Resource
private HisV2Feign hisV2Feign;
@Resource
private OrderService orderService;
@Resource
private UserInfoService userInfoService;
@Resource
private TenantConfigApi tenantConfigApi;
@Resource
private MedicalRegOrderApi medicalRegOrderApi;
@Resource
private OrderAddressService orderAddressService;
@Resource
private OrderPrescriptionService orderPrescriptionService;
//订单超时未报到:支付成功后入延时队列
private final String ORDER_TIMEOUT_WITHOUT_REPORTING = "delayQueue:OrderTimeoutWithoutReportingDelayQueue";
@Override
public ProcessDto businessProcess(ProcessParam processParam) {
ProcessDto processDto = new ProcessDto();
Order order = orderService.getOne(Wrappers.<Order>lambdaQuery().eq(Order::getOrderSeq, processParam.getBusiTransactionNumber()));
if (ObjectUtil.isEmpty(order)) {
processDto.setBusiLogicSuccessful(YesNo.NO);
processDto.setErrorMsg(" 订单不存在");
return processDto;
}
//如果微信回调我们的状态不是成功,订单记录异常
if (processParam.getPaymentSuccessful().equals(YesNo.NO)) {
order.setExceptionFlag(true);
order.setExceptionDesc(" 订单微信回调的支付状态未成功");
order.setOrderStatus(5);
order.setCancelReason(CancelType.EXCEPTION_REFUND.getDesc());
orderService.updateById(order);
//TODO 异常退费逻辑
processDto.setBusiLogicSuccessful(YesNo.NO);
processDto.setErrorMsg(" 微信回调的支付状态未成功");
return processDto;
}
//获取租户token
TenantUserContextVO contextVO = userInfoService.getTenantToken(order.getHospitalId(), order.getPatientId(), order.getUserId());
switch (order.getOrderType()) {
case 1:
//挂号订单
log.info(" 挂号订单支付");
//如果微信支付回调的状态是成功,那么需要将成功的订单,入到超时未报到延时队列中,供超时未报到业务使用
createOrderCheckTask(order);
return registeredOrder(processParam, order, contextVO);
case 2:
//处方订单
log.info(" 处方订单支付");
return prescriptionOrder(processParam, order, contextVO);
default:
log.error(" 订单类型错误");
processDto.setBusiLogicSuccessful(YesNo.NO);
processDto.setErrorMsg(" 订单类型错误");
return processDto;
}
}
// 超时未报到订单检查延时任务
private void createOrderCheckTask(Order order) {
try {
log.info(" 开始创建超时未报到订单检查延时任务,订单信息为:{}", JSONUtil.toJsonPrettyStr(order));
Duration duration;
//todo 方案一:这里可以根据预约结束时间或者最晚报到时间,作为结束时间。如果超过结束时间就取消订单。【暂时使用方案一】
// 方案二:这里可以根据时间段,比如上午(12:00:00)、下午(17:00:00)、晚上(20:00:00)。全天的话,可以根据全天的结束时间为标准。
LocalDateTime prebookStartTime = order.getPrebookStartTime();
//预约结束时间
LocalDateTime prebookEndTime = order.getPrebookEndTime();
//最晚报到时间
LocalDateTime registerEndDate = order.getRegisterEndDate();
if (Objects.nonNull(prebookEndTime)) {
duration = Duration.between(LocalDateTime.now(), prebookEndTime);
} else if (Objects.nonNull(registerEndDate)) {
duration = Duration.between(LocalDateTime.now(), registerEndDate);
} else {
duration = Duration.between(prebookStartTime, LocalDateTime.now().plusMinutes(30));
}
if (duration.getSeconds() < 10) {
//避免时间过短出现问题
duration = Duration.ofSeconds(10L);
}
log.info(" 创建超时未报到订单检查延时任务,orderId为{},检查时间为{}", order.getId(), LocalDateTime.now().plusSeconds(duration.getSeconds()));
RedisDelayQueueUtil.addDelayQueue(order.getId(), duration.getSeconds(), TimeUnit.SECONDS, ORDER_TIMEOUT_WITHOUT_REPORTING);
} catch (Exception e) {
log.error(" 超时未报到订单检查延时任务创建异常:" + e);
GlobalException.throwEx(" 超时未报到订单检查延时任务创建异常");
}
}
}
4.3 订单超时业务处理
ps:这里用了线程池,但是搞复杂了,其实可以直接用定时去消费延时队列
/**
* 订单重构超时场景校验处理
*/
@Component
@Slf4j
public class OrderRefactorPayCheckSchedule implements CommandLineRunner {
//订单超时未支付:创建订单的时候入延时队列,和之前共用一个:超过支付限制时间自动取消
private final String ORDER_PAY_CHECK_DELAY_QUEUE = "delayQueue:OrderPayCheckDelayQueue";
//订单超时未报到:支付成功后入延时队列:超过预约结束时间后,自动取消
private final String ORDER_TIMEOUT_WITHOUT_REPORTING = "delayQueue:OrderTimeoutWithoutReportingDelayQueue";
//订单超时未接诊:报道后入延时队列:24小时后未接诊自动取消
private final String ORDER_TIMEOUT_NO_APPOINTMENT_RECEIVED = "delayQueue:OrderTimeoutNoAppointmentReceivedDelayQueue";
//订单自动结束问诊:医生接诊时入延时队列:24小时未结束问诊自动取消
private final String ORDER_CONSULTATION_AUTOMATIC_END = "delayQueue:OrderConsultationAutomaticEndDelayQueue";
//处方超时未支付 1小时后自动取消
private final String PRESCRIPTION_PAY_CHECK_DELAY_QUEUE = "delayQueue:PrescriptionPayCheckDelayQueue";
//医生停诊
private final String DOCTOR_SUSPEND_DELAY_QUEUE = "delayQueue:DoctorSuspendDelayQueue";
// 订单自动结束问诊:24小时未结束
@Value("${order.timeout.automatic_end_hours:24}")
private Double orderConsultationAutomaticEndHours;
@Resource
private OrderService orderService;
@Resource
private SessionApi sessionApi;
@Resource
private SessionRuntimeApi sessionRuntimeApi;
@Resource
private ConsultationRecordService consultationRecordService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ConsultationRecordApi consultationRecordApi;
@Resource
private UserApi userApi;
@Resource
private ShortMessageApi shortMessageApi;
@Resource
private SuspendService suspendService;
@Resource
private DoctorConfigApi doctorConfigApi;
// 在类变量区新增线程池配置
@Value("${schedule.pool.coreSize:5}")
private int corePoolSize;
@Value("${schedule.pool.maxSize:10}")
private int maxPoolSize;
@Value("${schedule.pool.queueCapacity:100}")
private int queueCapacity;
// 新增线程池bean(放在类变量声明之后)
private ThreadPoolTaskExecutor taskExecutor;
@PostConstruct
private void initThreadPool() {
taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setQueueCapacity(queueCapacity);
taskExecutor.setThreadNamePrefix("OrderSchedule-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
}
/**
* 开启线程接受阻塞队列内的待同步订单进行同步
*/
@Override
public void run(String... args) {
// 为每个队列启动一个独立的线程(超时未支付业务,OrderPayCheckSchedule中已经有了,这里可以注掉)
//taskExecutor.execute(() -> wrapTask(this::handleOrderPayCheck));
// 使用线程池提交任务
taskExecutor.execute(() -> wrapTask(this::handleOrderTimeoutWithoutReporting));
taskExecutor.execute(() -> wrapTask(this::handleOrderTimeoutNoAppointmentReceived));
taskExecutor.execute(() -> wrapTask(this::handleOrderConsultationAutomaticEnd));
taskExecutor.execute(() -> wrapTask(this::handlePrescriptionPayCheckDelayQueue));
taskExecutor.execute(() -> wrapTask(this::handleDoctorSuspendDelayQueue));
}
// 新增任务包装方法
private void wrapTask(Runnable task) {
while (!Thread.currentThread().isInterrupted()) {
try {
task.run();
} catch (Throwable e) {
log.error("定时任务执行异常", e);
try {
TimeUnit.SECONDS.sleep(5); // 异常后暂停5秒
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* 订单超时未支付业务处理
*/
private void handleOrderPayCheck() {
Optional<Long> optional = null;
try {
optional = RedisDelayQueueUtil.getDelayQueue(ORDER_PAY_CHECK_DELAY_QUEUE);
} catch (InterruptedException e) {
GlobalException.throwEx("获取延时队列异常:" + e);
}
log.info("获取延时队列成功,延时队列为:" + optional);
if (optional.isPresent()) {
Long orderId = optional.get();
log.info("开始检查支付状态,订单id:{}", orderId);
//调用订单超时未支付逻辑处理
//todo 延时任务没有traceId,不方便排查,这里手动设置traceId
Map<String, String> contextMap = new HashMap<>();
contextMap.put(Constants.TRACE_ID, "orderId_" + orderId);
MDC.setContextMap(contextMap);
checkOrderStatus(orderId, CancelTypeEnum.TIMEOUT.getCode());
}
}
/**
* 医生停诊业务处理
*/
private void handleDoctorSuspendDelayQueue() {
Optional<DoctorSuspendEvent> optional = null;
try {
optional = RedisDelayQueueUtil.getDelayQueue(DOCTOR_SUSPEND_DELAY_QUEUE);
} catch (InterruptedException e) {
GlobalException.throwEx("获取延时队列异常:" + e);
}
log.info("获取延时队列成功,延时队列为:" + optional);
if (optional.isPresent()) {
DoctorSuspendEvent doctorSuspendEvent = optional.get();
log.info("开始处理停诊业务,医生停诊信息为:{}", doctorSuspendEvent);
doctorSuspend(doctorSuspendEvent);
}
}
/**
* 订单超时未报到业务处理
*/
private void handleOrderTimeoutWithoutReporting() {
Optional<Long> optional = null;
try {
optional = RedisDelayQueueUtil.getDelayQueue(ORDER_TIMEOUT_WITHOUT_REPORTING);
} catch (InterruptedException e) {
GlobalException.throwEx("获取延时队列异常:" + e);
}
log.info("获取延时队列成功,延时队列为:" + optional);
if (optional.isPresent()) {
Long orderId = optional.get();
log.info("处理超时未报到逻辑,订单id:{}", orderId);
//todo 延时任务没有traceId,不方便排查,这里手动设置traceId
Map<String, String> contextMap = new HashMap<>();
contextMap.put(Constants.TRACE_ID, "orderId_" + orderId);
MDC.setContextMap(contextMap);
// 调用超时未报到处理逻辑
checkOrderStatus(orderId, CancelTypeEnum.UN_REPORT.getCode());
}
}
/**
* 订单超时未接诊业务处理
*/
private void handleOrderTimeoutNoAppointmentReceived() {
Optional<Long> optional = null;
try {
optional = RedisDelayQueueUtil.getDelayQueue(ORDER_TIMEOUT_NO_APPOINTMENT_RECEIVED);
} catch (InterruptedException e) {
GlobalException.throwEx("获取延时队列异常:" + e);
}
log.info("获取延时队列成功,延时队列为:" + optional);
if (optional.isPresent()) {
Long orderId = optional.get();
log.info("处理超时未接诊逻辑,订单id:{}", orderId);
//todo 延时任务没有traceId,不方便排查,这里手动设置traceId
Map<String, String> contextMap = new HashMap<>();
contextMap.put(Constants.TRACE_ID, "orderId_" + orderId);
MDC.setContextMap(contextMap);
// 调用超时未接诊处理逻辑
checkOrderStatus(orderId, CancelTypeEnum.OVERTIME.getCode());
}
}
/**
* 订单自动结束问诊业务处理
*/
private void handleOrderConsultationAutomaticEnd() {
Optional<Long> optional = null;
try {
optional = RedisDelayQueueUtil.getDelayQueue(ORDER_CONSULTATION_AUTOMATIC_END);
} catch (InterruptedException e) {
GlobalException.throwEx("获取延时队列异常:" + e);
}
log.info("获取延时队列成功,延时队列为:" + optional);
if (optional.isPresent()) {
Long orderId = optional.get();
log.info("处理自动结束问诊逻辑,订单id:{}", orderId);
//todo 延时任务没有traceId,不方便排查,这里手动设置traceId
Map<String, String> contextMap = new HashMap<>();
contextMap.put(Constants.TRACE_ID, "orderId_" + orderId);
MDC.setContextMap(contextMap);
// 调用自动结束问诊处理逻辑
checkOrderStatus(orderId, CancelTypeEnum.AUTO_END.getCode());
}
}
private void handlePrescriptionPayCheckDelayQueue() {
Optional<Long> optional = null;
try {
optional = RedisDelayQueueUtil.getDelayQueue(PRESCRIPTION_PAY_CHECK_DELAY_QUEUE);
} catch (InterruptedException e) {
GlobalException.throwEx("获取延时队列异常:" + e);
}
log.info("获取延时队列成功,延时队列为:" + optional);
if (optional.isPresent()) {
Long orderId = optional.get();
log.info("处理处方结束问诊逻辑,订单id:{}", orderId);
//todo 延时任务没有traceId,不方便排查,这里手动设置traceId
Map<String, String> contextMap = new HashMap<>();
contextMap.put(Constants.TRACE_ID, "orderId_" + orderId);
MDC.setContextMap(contextMap);
// 调用自动结束问诊处理逻辑
checkOrderStatus(orderId, null);
}
}
/**
* 订单业务处理
*
* @param orderId
*/
public void checkOrderStatus(Long orderId, Integer code) {
log.info("开始检查订单状态,订单id:{}", orderId);
Order order = orderService.lambdaQuery()
.eq(Order::getId, orderId)
.one();
if (ObjectUtil.isEmpty(order)) {
GlobalException.throwEx("获取订单信息为空 订单id:" + orderId);
}
CancelOrderDTO cancelOrderDTO = new CancelOrderDTO();
cancelOrderDTO.setOrderId(order.getId());
cancelOrderDTO.setOrderSeq(order.getOrderSeq());
cancelOrderDTO.setSendShortMessage(true);
switch (Objects.requireNonNull(OrderType.of(order.getOrderType()))) {
case VISIT -> {
if (Objects.equals(code, CancelTypeEnum.TIMEOUT.getCode()) && Objects.equals(PayStatus.UN_PAY.getCode(), order.getPayStatus()) && !Objects.equals(ConsultationRecordStatus.CANCEL.getCode(), order.getOrderStatus())) {
log.info("超时未支付,订单取消,订单为{}", order);
} else if (Objects.equals(code, CancelTypeEnum.UN_REPORT.getCode()) && Objects.equals(PayStatus.PAID.getCode(), order.getPayStatus()) && Objects.equals(ConsultationRecordStatus.WAITFOR_CHECKIN.getCode(), order.getOrderStatus())) {
log.info("超时未报到,订单取消,订单为{}", order);
} else if (Objects.equals(code, CancelTypeEnum.OVERTIME.getCode()) && Objects.equals(PayStatus.PAID.getCode(), order.getPayStatus()) && Objects.equals(ConsultationRecordStatus.WAITFOR_INQUIRING.getCode(), order.getOrderStatus())) {
log.info("超时未接诊,订单取消,订单为{}", order);
} else if (Objects.equals(code, CancelTypeEnum.AUTO_END.getCode()) && Objects.equals(PayStatus.PAID.getCode(), order.getPayStatus()) && Objects.equals(ConsultationRecordStatus.INQUIRING.getCode(), order.getOrderStatus())) {
log.info("自动结束问诊,订单为{}", order);
//判断当前时间是否小于问诊结束时间,如果小于就说明延长问诊了,不执行后续业务
//获取im会话信息
//im诊室关闭时间
// 获取当前时间
//自动结诊:(1)更改订单与问诊订单状态,为已结束
//关闭im诊室
//短信通知
//获取当前医生结诊时常配置(默认24小时)
} else {
//其他场景待补充
log.info("问诊订单其他场景:订单信息:" + order);
}
}
case PRESCRIPTION -> {
// 校验订单状态
log.info("处方订单开始校验是否支付", JSONUtil.toJsonPrettyStr(order));
if (Objects.equals(RpStatus.UN_PAY.getCode(), order.getOrderStatus()) || Objects.equals(RpStatus.EXTERNAL_RP.getCode(), order.getOrderStatus())) {
// 作废处方订单
orderService.invalidPrescriptionByOrderId(orderId);
}
}
}
}
}
ps:后续会有超时场景的应用补充说明:Redis延时队列在订单超时未报到场景的应用补充说明-CSDN博客