背景
年初的时候设计实践过一个课题:SpringBoot+Redis实现不重复消费的队列,并用它开发了一个年夜饭下单和制作的服务。不知道大家还有没有印象。完成这个课题后,我兴致勃勃的把它运用到了项目里面,可谁曾想,运行不久后运维人员就找到了我,说我的功能有问题,而且问题很大。具体表现在用户下单之后,很久都没有收到订单完成的消息,后台服务里的日志也是残缺的,只查到了开始制作年夜饭的日志,没有年夜饭制作过程和完成的日志。
为此,我花了大量的时间和精力去分析程序,定位问题,最终发现,是运维人员在系统上线时将生产服务和UAT服务的redis服务地址给配成了同一个,这就导致生产上的订单进入Redis队列后,被UAT服务给消费了,而UAT和生产又是不同的数据库,自然导致UAT上通过队列中的主键在数据库中找不到相关数据,从而消费失败的结果。而这些失败的信息全都记录在了UAT的服务器上,生产服务器中自然很难分析和定位到问题。
要解决这个问题其实很简单,只需要把channelTopic改为从配置文件中获取,然后生产、UAT环境配置不同的字符串即可。
但实际上真的只做这些就够了吗?如果下次再发生不可预料的问题,我还要花那么多的时间吭哧吭哧的去看日志,调程序,定位问题吗?答案是否定的!
原服务整改
结合生产环境产生的问题,痛定思痛,我决定对原来的服务进行一轮大整改,优化服务的可维护性,可测试性。在我的构想中,新的服务需要有以下功能:
1、保证原有的“年夜饭”功能稳定正常的运行。
2、可以查询哪些订单还未开始处理
3、可以查询哪些订单已经处理,以及处理结果。
4、可以清空N天以前的处理成功的订单。
5、可以清空待处理的订单
6、对于已经处理但处理失败的订单,可以一键重新处理
7、待处理订单插队
我的设想是,通过redisTemplate.opsForZSet()方法创建两个新队列:待办队列和已办队列,在下单时,插入一条数据到待办队列,在处理任务时,从待办队列中删除该数据,在处理完成后,插入一条数据到已办队列,这样,通过查询待办队列,已办队列,就可以知道哪些任务还在排队,哪些任务已经完成了。
以下是我的程序整改过程,老粉可以对比上一篇博客看看两者的不同之处。
#以下是application.yml配置
server:
port: 19200
servlet:
context-path: /leixi
max-http-header-size: 102400
spring:
redis:
database: 9
host: 127.0.0.1
port: 6379
password:
jedis:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
leixi:
redis-queue-key: NEW_YEAR_DINNER_DEV
//以下是Java程序代码:
/**
* Redis配置
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Configuration
public class RedisConfig {
@Value("${leixi.redis-queue-key}")
private String REDIS_QUEUE_KEY ;
@Bean
public ChannelTopic topic() {
return new ChannelTopic(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX);
}
@Bean
public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) {
return new MessageListenerAdapter(listener);
}
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter messageListenerAdapter,
ChannelTopic topic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(messageListenerAdapter, topic);
return container;
}
}
/**
* 订单处理控制层
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@RestController
public class DinnerController {
private int i = 0;
@Autowired
private DinnerService service;
@Value("${leixi.redis-queue-key}")
private String REDIS_QUEUE_KEY ;
@GetMapping("/orderDinner")
public Object orderDinner() {
OrderEntity entity = new OrderEntity();
entity.setOrderCode("Order" + (++i));
entity.setCustomerName("第"+i+"位客户");
return service.orderNewYearEveDinner(entity);
}
@Autowired
private RedisTemplate<String, String> redisTemplate;
@GetMapping("/getPendingOrder")
public Object getPendingOrder() {
return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
}
@GetMapping("/cleanPendingOrder")
public Object cleanPendingOrder() {
Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
if(set.size() > 0) {
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY + Constant.PENDING_SUFFIX, set.toArray());
}
return "待处理订单已被清空!";
}
@GetMapping("/getHandledOrder")
public Object getHandledOrder() {
return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
}
@GetMapping("/cleanOldSucceedOrder")
public Object cleanHandledOrder(@RequestParam("day") Integer day) {
Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, Constant.getScoreByDate() - day);
set.forEach(s -> {
JSONObject obj = JSONObject.parseObject(s);
if (obj.getString("result").equals("SUCCESS")) {
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
}
});
return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, System.currentTimeMillis());
}
/**
* 这里还有最后一个问题, 把已办里的错误的信息摘除出来,重新走请求。并且反馈哪些信息重新走了请求。
*/
@GetMapping("/restartFailedOrder")
public Object restartFailedOrder() {
Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
StringBuilder sb = new StringBuilder();
set.forEach(s -> {
JSONObject obj = JSONObject.parseObject(s);
if (!obj.getString("result").equals("SUCCESS")) {
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
OrderEntity entity = JSON.parseObject(obj.getString("msg"), OrderEntity.class);
service.orderNewYearEveDinner(entity);
sb.append(entity.getOrderCode()).append(",");
}
});
return "以下订单号被重启: "+ sb;
}
@GetMapping("/cutInLineJob")
public Object cutInLineJob(@RequestParam("orderCode") String orderCode) {
Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
for (String s : set) {
OrderEntity obj = JSONObject.parseObject(s, OrderEntity.class);
if (obj.getOrderCode().equals(orderCode)) {
CompletableFuture.runAsync(() -> {
service.doListenerWork(s);
});
return "订单 " + orderCode + " 插队成功!";
}
}
return " 插队失败,该订单已经在制作了!";
}
}
/**
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Data
public class OrderEntity implements Serializable {
/**
* 客户姓名
*/
private String customerName;
/**
* 订单号
*/
private String orderCode;
/**
* 菜单
*/
List<String> menus;
/**
* 出餐状态
*/
private String dinnerState;
/**
* 做饭开始时间
*/
private String dinnerStartTime;
/**
* 做饭结束时间
*/
private String dinnerEndTime;
/**
* 备注
*/
private String remark;
}
/**
* 监听类
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Component
public class DinnerListener implements MessageListener {
@Autowired
private DinnerService service;
private final Object lock = new Object();
@Override
public void onMessage(Message message, byte[] pattern) {
synchronized (lock) {
service.doListenerWork(message.toString());
}
}
}
/**
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Slf4j
@Service
public class DinnerService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Value("${leixi.redis-queue-key}")
private String REDIS_QUEUE_KEY ;
/**
* 年夜饭下单
*
* @param req 订单信息
* @return
*/
public Object orderNewYearEveDinner(OrderEntity req) {
// 存储订单信息
saveOrder(req);
// 异步开始做菜
redisTemplate.delete(JSON.toJSONString(req));
redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, JSON.toJSONString(req), Constant.getScoreByDate());
redisTemplate.convertAndSend(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX, JSON.toJSONString(req));
return "您已成功下单,订单号为"+ req.getOrderCode()+",后厨正在准备预制菜!";
}
/**
* 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
*
* @param req 订单信息
*/
public void doNewYearEveDinner(OrderEntity req) throws Exception {
System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
Thread.sleep(10000);
// 这里写个方法模拟报错的场景
int i = new Random().nextInt(6) + 1;
if (i ==4) {
throw new Exception("厨师跑了");
}
if (i ==5) {
throw new Exception("食物跑了");
}
if (i ==6) {
throw new Exception("厨房着火了");
}
System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
}
private void saveOrder(OrderEntity req) {
//这里假设做的是订单入库操作
System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
}
/**
* 根据订单编号修改订单信息
*
* @param orderCode 订单编号
* @param dinnerStatus
* @param remark
*/
public void updateOrder(String orderCode, String dinnerStatus, String remark) {
// 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
}
public void doListenerWork(String message) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(message, "1", 1, TimeUnit.DAYS);
// 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
if (!flag) {
return;
}
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, message);
OrderEntity param = CastUtils.cast(JSON.parseObject(message, OrderEntity.class));
JSONObject obj = new JSONObject();
obj.put("msg", message);
try {
obj.put("server", InetAddress.getLocalHost().getHostAddress());
this.doNewYearEveDinner(param);
this.updateOrder(param.getOrderCode(), "SUCCESS", "成功");
obj.put("result", "SUCCESS");
}catch (Exception e) {
e.printStackTrace();
this.updateOrder(param.getOrderCode(), "FAIL", e.getMessage());
obj.put("result", "FAIL");
obj.put("desc", e.getMessage());
}finally {
obj.put("endTime", new Date());
redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, obj.toJSONString(), Constant.getScoreByDate());
}
}
/**
* 静态工具类
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
public class Constant {
// 工作队列后缀
public static final String WORKING_QUEUE_SUFFIX = "_QUEUE";
//待处理工作队列后缀
public static final String PENDING_SUFFIX = "_PENDING";
// 已处理工作后缀
public static final String HANDLED_SUFFIX = "_HANDLED";
//一天的毫秒数
private static final Integer ONE_DAY_MINI = 86400000;
/**
* 根据当前日期计算队列的分数
*
* @return
*/
public static Integer getScoreByDate() {
return (int)System.currentTimeMillis()/ONE_DAY_MINI;
}
}
接口测试
1、年夜饭下单
2、查询待处理订单
3、查询已处理订单
4、清空已处理且成功的订单
5、清空待处理订单
6、一键重启处理失败的订单
7、订单插队
组件化封装
完成了以上测试,基本上我想要的功能都已经实现了。但是仔细想了下,上述的功能里除了第一个下单接口是跟业务相关的,剩下的所有接口都是业务无关的。如果我们公司主营业务变了,从年夜饭变成中秋做月饼,端午包棕子,本服务中的大部分代码都可以在调整之后复用。那么,为什么我不整理出一个与业务无关的Redis队列工具出来呢,这样可以极大的提升代码的可复用性。后面有新的业务时,直接引入这个工具包,完善业务部分即可。
以下是我在反思之后,对代码的整改(只包含有整改或新增的代码)
/**
* 消息承载类
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Data
public class RedisQueueMsg<T> implements Serializable {
/**
* 消息Id
*/
private String id;
/**
* 服务名
*/
private String serverName;
/**
* 数据体
*/
private T data;
}
package com.leixi.queue.pojo;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 任务处理结果封装类
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Data
public class RedisResultVo implements Serializable {
private String status;
private Object data;
private String desc;
private Date startTime;
private Date endTime;
private String server;
public RedisResultVo() {
this.startTime = new Date();
}
public RedisResultVo(Object data) {
this.data = data;
this.startTime = new Date();
}
}
/**
* 抽象的业务处理服务
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
public abstract class QueueBusiBasicService {
/**
* 处理任务的方法
*
* @param obj 业务类
*/
public abstract void handle(Object obj);
/**
* 处理失败的回调方法
*
* @param obj 业务类
* @param e
*/
public abstract void callBack(Object obj, Exception e);
}
/**
* Redis队列的服务层
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Slf4j
@Service
public class QueueCommonService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private Map<String, QueueBusiBasicService> serviceMap;
@Value("${leixi.redis-queue-key}")
private String REDIS_QUEUE_KEY ;
/**
* 插入消息到队列
*
* @param obj 业务对象
* @param serverName 服务名
* @return
*/
public RedisQueueMsg sendMessage(Object obj, String serverName) {
RedisQueueMsg msg = new RedisQueueMsg();
msg.setId(IdUtil.fastSimpleUUID());
msg.setServerName(serverName);
msg.setData(obj);
redisTemplate.delete(JSON.toJSONString(msg));
redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, JSON.toJSONString(msg), Constant.getScoreByDate());
redisTemplate.convertAndSend(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX, JSON.toJSONString(msg));
return msg;
}
/**
* 处理队列中的工作
*
* @param message
*/
public void handle(String message) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(message, "1", 1, TimeUnit.DAYS);
// 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
if (!flag) {
return;
}
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, message);
RedisQueueMsg param = CastUtils.cast(JSON.parseObject(message, RedisQueueMsg.class));
RedisResultVo result = new RedisResultVo(param);
try {
result.setServer(InetAddress.getLocalHost().getHostAddress());
serviceMap.get(param.getServerName()).handle(param.getData());
result.setStatus(Constant.SUCCESS);
}catch (Exception e) {
e.printStackTrace();
serviceMap.get(param.getServerName()).callBack(param.getData(), e);
result.setStatus(Constant.FAIL);
result.setDesc(e.getMessage());
}finally {
result.setEndTime(new Date());
redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, JSON.toJSONString(result), Constant.getScoreByDate());
}
}
/**
* 查询待处理任务
*
* @return
*/
public Object getPendingTask() {
return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
}
/**
* 清理待处理任务
*
* @return
*/
public Object cleanPendingTask() {
Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
if(set.size() > 0) {
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY + Constant.PENDING_SUFFIX, set.toArray());
}
return "待处理任务已被清空!";
}
/**
* 查询已处理任务
*
* @return
*/
public Object getHandledTask() {
return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
}
/**
* 清理某天前的处理任务
*
* @param day 天数
* @return
*/
public Object cleanHandledTask(Integer day) {
Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, Constant.getScoreByDate() - day);
set.forEach(s -> {
RedisResultVo obj = JSONObject.parseObject(s, RedisResultVo.class);
if (obj.getStatus().equals(Constant.SUCCESS)) {
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
}
});
return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, System.currentTimeMillis());
}
/**
* 重新处理已处理任务
*/
public String restartFailedTask() {
Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
StringBuilder sb = new StringBuilder();
set.forEach(s -> {
RedisResultVo obj = JSONObject.parseObject(s, RedisResultVo.class);
if (!obj.getStatus().equals(Constant.SUCCESS)) {
redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
RedisQueueMsg msg = JSON.parseObject(JSON.toJSONString(obj.getData()), RedisQueueMsg.class); ;
sendMessage(msg.getData(), msg.getServerName());
sb.append(msg.getId()).append(",");
}
});
return "以下任务被重启: "+ sb;
}
/**
* 任务插队
*
* @param msgId 要插队的消息ID
*/
public RedisQueueMsg cutInLineTask(String msgId) {
Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
for (String s : set) {
RedisQueueMsg msg = JSONObject.parseObject(s, RedisQueueMsg.class);
if (msg.getId().equals(msgId)) {
CompletableFuture.runAsync(() -> {
this.handle(s);
});
return msg;
}
}
throw new RuntimeException("未找到相关任务,该项任务已经在执行了!");
}
}
/**
* 业务服务类,继承抽象类务类,实现业务逻辑
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@Service(Constant.DINNER_SERVER)
public class QueueDinnerService extends QueueBusiBasicService {
@Autowired
private QueueCommonService queueCommonService;
/**
* 年夜饭下单
*
* @param entity 订单信息
* @return
*/
public Object orderNewYearEveDinner(OrderEntity entity) {
// 存储订单信息
saveOrder(entity);
queueCommonService.sendMessage(entity, Constant.DINNER_SERVER);
// 异步开始做菜
return "您已成功下单,订单号为"+ entity.getOrderCode()+",后厨正在准备预制菜!";
}
/**
* 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
*
* @param req 订单信息
*/
private void doNewYearEveDinner(OrderEntity req) throws Exception {
System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
Thread.sleep(10000);
int i = new Random().nextInt(6) + 1;
if (i ==4) {
throw new Exception("厨师跑了");
}
if (i ==5) {
throw new Exception("食物跑了");
}
if (i ==6) {
throw new Exception("厨房着火了");
}
System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
}
/**
* 保存订单信息
*
* @param req
*/
private void saveOrder(OrderEntity req) {
//这里假设做的是订单入库操作
System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
}
/**
* 根据订单编号修改订单信息
*
* @param orderCode 订单编号
* @param dinnerStatus
* @param remark
*/
private void updateOrder(String orderCode, String dinnerStatus, String remark) {
// 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
}
/**
* 处理订单
*
* @param obj 业务类
*/
@Override
@SneakyThrows
public void handle(Object obj) {
OrderEntity entity = JSON.parseObject(JSON.toJSONString(obj), OrderEntity.class);
doNewYearEveDinner(entity);
updateOrder(entity.getOrderCode(), Constant.SUCCESS, "出餐成功");
}
@Override
public void callBack(Object obj, Exception e) {
OrderEntity entity = JSON.parseObject(JSON.toJSONString(obj), OrderEntity.class);
System.out.println("更新订单 "+ entity.getOrderCode() +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为FAIL, 原因为 " +e.getMessage());
}
}
@Component
public class RedisQueueListener implements MessageListener {
@Autowired
private QueueCommonService service;
private final Object lock = new Object();
@Override
public void onMessage(Message message, byte[] pattern) {
synchronized (lock) {
service.handle(message.toString());
}
}
}
/**
* 业务控制层
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@RestController
@RequestMapping("/dinner")
public class DinnerController {
@Autowired
private QueueDinnerService dinnerService;
private int i = 0;
@GetMapping("/orderDinner")
public Object orderDinner() {
OrderEntity entity = new OrderEntity();
entity.setOrderCode("Order" + (++i));
entity.setCustomerName("第"+i+"位客户");
return dinnerService.orderNewYearEveDinner(entity);
}
}
/**
* Redis工具控制器
*
* @author leixiyueqi
* @since 2024/06/15 22:00
*/
@RestController
@RequestMapping("/redisQueue")
public class RedisQueueController {
@Autowired
private QueueCommonService service;
@GetMapping("/getPendingTask")
public Object getPendingTask() {
return service.getPendingTask();
}
@GetMapping("/cleanPendingTask")
public Object cleanPendingTask() {
return service.cleanPendingTask();
}
@GetMapping("/getHandledTask")
public Object getHandledTask() {
return service.getHandledTask();
}
@GetMapping("/cleanOldSucceedTask")
public Object cleanHandledTask(@RequestParam("day") Integer day) {
return service.cleanHandledTask(day);
}
/**
* 这里还有最后一个问题, 把已办里的错误的信息摘除出来,重新走请求。并且反馈哪些信息重新走了请求。
*/
@GetMapping("/restartFailedTask")
public Object restartFailedTask() {
service.restartFailedTask();
return "重启失败的服务成功";
}
@GetMapping("/cutInLineTask")
public Object cutInLineTask(@RequestParam("msgId") String msgId) {
RedisQueueMsg msg = service.cutInLineTask(msgId);
return "任务 "+ JSON.toJSONString(msg) + "插队成功!";
}
}
组件化的调整就是把属于Redis队列的操作与业务类操作完全分开,这样,以后有别的业务需要引入组件处理时,只需要写个业务服务继承QueueBusiBasicService即可,最大限度的复用了队列的这套机制和代码。
注意,本组件有它特定的适用场景:处理任务的频度不高,每次处理任务用时较长,而且任务有一定的小概率失败,失败之后重新处理不会影响最终处理结果。
完成这个工具研发后,我结合之前在网上查到的Redis队列的一些案例,发现用别的方案可以更简单的去实现我要的效果,比如直接用Redis队列详解(springboot实战)里的方案,仅仅是因为不想在代码里写 while(true) 这种不是优雅的代码,再加上用Listener的方式长时间没对消息进行消费时,消息会丢失,因此才额外花费了这么多的功夫来打补丁。果然方向选错了,工作量会成倍的增加,希望看到本文的读者能引以为戒,不要自误啊。