Redis队列自研组件

news2024/11/14 13:48:13

     背景

        年初的时候设计实践过一个课题:SpringBoot+Redis实现不重复消费的队列,并用它开发了一个年夜饭下单和制作的服务。不知道大家还有没有印象。完成这个课题后,我兴致勃勃的把它运用到了项目里面,可谁曾想,运行不久后运维人员就找到了我,说我的功能有问题,而且问题很大。具体表现在用户下单之后,很久都没有收到订单完成的消息,后台服务里的日志也是残缺的,只查到了开始制作年夜饭的日志,没有年夜饭制作过程和完成的日志。

        为此,我花了大量的时间和精力去分析程序,定位问题,最终发现,是运维人员在系统上线时将生产服务和UAT服务的redis服务地址给配成了同一个,这就导致生产上的订单进入Redis队列后,被UAT服务给消费了,而UAT和生产又是不同的数据库,自然导致UAT上通过队列中的主键在数据库中找不到相关数据,从而消费失败的结果。而这些失败的信息全都记录在了UAT的服务器上,生产服务器中自然很难分析和定位到问题。

        要解决这个问题其实很简单,只需要把channelTopic改为从配置文件中获取,然后生产、UAT环境配置不同的字符串即可。

        但实际上真的只做这些就够了吗?如果下次再发生不可预料的问题,我还要花那么多的时间吭哧吭哧的去看日志,调程序,定位问题吗?答案是否定的!

      原服务整改

        结合生产环境产生的问题,痛定思痛,我决定对原来的服务进行一轮大整改,优化服务的可维护性,可测试性。在我的构想中,新的服务需要有以下功能:

        1、保证原有的“年夜饭”功能稳定正常的运行。

        2、可以查询哪些订单还未开始处理

        3、可以查询哪些订单已经处理,以及处理结果。

        4、可以清空N天以前的处理成功的订单。

        5、可以清空待处理的订单

        6、对于已经处理但处理失败的订单,可以一键重新处理

        7、待处理订单插队

        我的设想是,通过redisTemplate.opsForZSet()方法创建两个新队列:待办队列和已办队列,在下单时,插入一条数据到待办队列,在处理任务时,从待办队列中删除该数据,在处理完成后,插入一条数据到已办队列,这样,通过查询待办队列,已办队列,就可以知道哪些任务还在排队,哪些任务已经完成了。

        以下是我的程序整改过程,老粉可以对比上一篇博客看看两者的不同之处。

#以下是application.yml配置

server:
  port: 19200
  servlet:
    context-path: /leixi
  max-http-header-size: 102400
spring:
  redis:
    database: 9
    host: 127.0.0.1
    port: 6379
    password:
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
leixi:
  redis-queue-key: NEW_YEAR_DINNER_DEV
//以下是Java程序代码:
/**
 * Redis配置
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Configuration
public class RedisConfig {

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;


    @Bean
    public ChannelTopic topic() {
        return new ChannelTopic(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX);
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) {
        return new MessageListenerAdapter(listener);
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
                                                        MessageListenerAdapter messageListenerAdapter,
                                                        ChannelTopic topic) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(messageListenerAdapter, topic);
        return container;
    }
}


/**
 * 订单处理控制层
 * 
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@RestController
public class DinnerController {

    private int i = 0;

    @Autowired
    private DinnerService service;

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;

    @GetMapping("/orderDinner")
    public Object orderDinner() {
        OrderEntity entity = new OrderEntity();
        entity.setOrderCode("Order" + (++i));
        entity.setCustomerName("第"+i+"位客户");
        return service.orderNewYearEveDinner(entity);
    }

    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @GetMapping("/getPendingOrder")
    public Object getPendingOrder() {
       return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
    }

    @GetMapping("/cleanPendingOrder")
    public Object cleanPendingOrder() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        if(set.size() > 0) {
            redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY + Constant.PENDING_SUFFIX, set.toArray());
        }
        return "待处理订单已被清空!";

    }

    @GetMapping("/getHandledOrder")
    public Object getHandledOrder() {
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
    }

    @GetMapping("/cleanOldSucceedOrder")
    public Object cleanHandledOrder(@RequestParam("day") Integer day) {
        Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, Constant.getScoreByDate() - day);
        set.forEach(s -> {
            JSONObject obj = JSONObject.parseObject(s);
            if (obj.getString("result").equals("SUCCESS")) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
            }
        });
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, System.currentTimeMillis());
    }

    /**
     * 这里还有最后一个问题, 把已办里的错误的信息摘除出来,重新走请求。并且反馈哪些信息重新走了请求。
     */
    @GetMapping("/restartFailedOrder")
    public Object restartFailedOrder() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
        StringBuilder sb = new StringBuilder();
        set.forEach(s -> {
            JSONObject obj = JSONObject.parseObject(s);
            if (!obj.getString("result").equals("SUCCESS")) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
                OrderEntity entity = JSON.parseObject(obj.getString("msg"), OrderEntity.class);
                service.orderNewYearEveDinner(entity);
                sb.append(entity.getOrderCode()).append(",");
            }
        });
        return "以下订单号被重启: "+ sb;
    }


    @GetMapping("/cutInLineJob")
    public Object cutInLineJob(@RequestParam("orderCode")  String orderCode) {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        for (String s : set) {
            OrderEntity obj = JSONObject.parseObject(s, OrderEntity.class);
            if (obj.getOrderCode().equals(orderCode)) {
                CompletableFuture.runAsync(() -> {
                    service.doListenerWork(s);
                });
                return "订单 " + orderCode + " 插队成功!";
            }
        }
        return " 插队失败,该订单已经在制作了!";
    }
}

/**
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Data
public class OrderEntity implements Serializable {

    /**
     * 客户姓名
     */
    private String customerName;

    /**
     * 订单号
     */
    private String orderCode;

    /**
     * 菜单
     */
    List<String> menus;

    /**
     * 出餐状态
     */
    private String dinnerState;

    /**
     * 做饭开始时间
     */
    private String dinnerStartTime;

    /**
     * 做饭结束时间
     */
    private String dinnerEndTime;

    /**
     * 备注
     */
    private String remark;
}

/**
 * 监听类
 *  
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Component
public class DinnerListener implements MessageListener {

    @Autowired
    private DinnerService service;
    private final Object lock = new Object();

    @Override
    public void onMessage(Message message, byte[] pattern)  {
        synchronized (lock) {
            service.doListenerWork(message.toString());
        }
    }
}


/**
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Slf4j
@Service
public class DinnerService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;

    /**
     * 年夜饭下单
     *
     * @param req 订单信息
     * @return
     */
    public Object orderNewYearEveDinner(OrderEntity req) {
        // 存储订单信息
        saveOrder(req);
        // 异步开始做菜
        redisTemplate.delete(JSON.toJSONString(req));
        redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, JSON.toJSONString(req), Constant.getScoreByDate());
        redisTemplate.convertAndSend(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX, JSON.toJSONString(req));
        return "您已成功下单,订单号为"+ req.getOrderCode()+",后厨正在准备预制菜!";
    }
    /**
     * 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
     *
     * @param req 订单信息
     */
    public void doNewYearEveDinner(OrderEntity req) throws Exception {
        System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
        Thread.sleep(10000);
        // 这里写个方法模拟报错的场景
        int i = new Random().nextInt(6) + 1;
        if (i ==4) {
            throw new Exception("厨师跑了");
        }
        if (i ==5) {
            throw new Exception("食物跑了");
        }
        if (i ==6) {
            throw new Exception("厨房着火了");
        }
        System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
    }

    private void saveOrder(OrderEntity req) {
        //这里假设做的是订单入库操作
        System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
    }

    /**
     * 根据订单编号修改订单信息
     *
     * @param orderCode 订单编号
     * @param dinnerStatus
     * @param remark
     */
    public void updateOrder(String orderCode, String dinnerStatus, String remark) {
        // 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
        System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
    }


    public void doListenerWork(String message) {
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(message, "1", 1, TimeUnit.DAYS);
        // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
        if (!flag) {
            return;
        }
        redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, message);
        OrderEntity param = CastUtils.cast(JSON.parseObject(message, OrderEntity.class));
        JSONObject obj = new JSONObject();
        obj.put("msg", message);

        try {
            obj.put("server", InetAddress.getLocalHost().getHostAddress());
            this.doNewYearEveDinner(param);
            this.updateOrder(param.getOrderCode(), "SUCCESS", "成功");
            obj.put("result", "SUCCESS");
        }catch (Exception e) {
            e.printStackTrace();
            this.updateOrder(param.getOrderCode(), "FAIL", e.getMessage());
            obj.put("result", "FAIL");
            obj.put("desc", e.getMessage());
        }finally {
            obj.put("endTime", new Date());
            redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, obj.toJSONString(), Constant.getScoreByDate());
        }
    }


/**
 * 静态工具类
 * 
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
public class Constant {

    // 工作队列后缀
    public static final String WORKING_QUEUE_SUFFIX = "_QUEUE";

    //待处理工作队列后缀
    public static final String PENDING_SUFFIX = "_PENDING";

    // 已处理工作后缀
    public static final String HANDLED_SUFFIX = "_HANDLED";

    //一天的毫秒数
    private static final Integer ONE_DAY_MINI = 86400000;

    /**
     * 根据当前日期计算队列的分数
     *
     * @return
     */
    public static Integer getScoreByDate() {
        return (int)System.currentTimeMillis()/ONE_DAY_MINI;
    }
}

      接口测试

        1、年夜饭下单

        

        2、查询待处理订单

        

        3、查询已处理订单

        

        4、清空已处理且成功的订单

        

        5、清空待处理订单

        

        6、一键重启处理失败的订单

        

        7、订单插队

        

      组件化封装

        完成了以上测试,基本上我想要的功能都已经实现了。但是仔细想了下,上述的功能里除了第一个下单接口是跟业务相关的,剩下的所有接口都是业务无关的。如果我们公司主营业务变了,从年夜饭变成中秋做月饼,端午包棕子,本服务中的大部分代码都可以在调整之后复用。那么,为什么我不整理出一个与业务无关的Redis队列工具出来呢,这样可以极大的提升代码的可复用性。后面有新的业务时,直接引入这个工具包,完善业务部分即可。

        以下是我在反思之后,对代码的整改(只包含有整改或新增的代码)

/**
 * 消息承载类
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Data
public class RedisQueueMsg<T> implements Serializable {

    /**
     * 消息Id
     */
    private String id;

    /**
     * 服务名
     */
    private String serverName;

    /**
     * 数据体
     */
    private T data;
}

package com.leixi.queue.pojo;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
 * 任务处理结果封装类
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Data
public class RedisResultVo implements Serializable {

    private String status;

    private Object data;

    private String desc;

    private Date startTime;

    private Date endTime;

    private String server;

    public RedisResultVo() {
        this.startTime = new Date();
    }

    public RedisResultVo(Object data) {
        this.data = data;
        this.startTime = new Date();
    }
}


/**
 * 抽象的业务处理服务
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
public abstract class QueueBusiBasicService {

    /**
     * 处理任务的方法
     *
     * @param obj 业务类
     */
    public abstract void handle(Object obj);

    /**
     * 处理失败的回调方法
     *
     * @param obj 业务类
     * @param e
     */
    public abstract void callBack(Object obj, Exception e);
}


/**
 * Redis队列的服务层
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Slf4j
@Service
public class QueueCommonService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private Map<String, QueueBusiBasicService> serviceMap;

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;


    /**
     * 插入消息到队列
     *
     * @param obj 业务对象
     * @param serverName  服务名
     * @return
     */
    public RedisQueueMsg sendMessage(Object obj, String serverName) {
        RedisQueueMsg msg = new RedisQueueMsg();
        msg.setId(IdUtil.fastSimpleUUID());
        msg.setServerName(serverName);
        msg.setData(obj);
        redisTemplate.delete(JSON.toJSONString(msg));
        redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, JSON.toJSONString(msg), Constant.getScoreByDate());
        redisTemplate.convertAndSend(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX, JSON.toJSONString(msg));
        return msg;
    }

    /**
     * 处理队列中的工作
     *
     * @param message
     */
    public void handle(String message) {
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(message, "1", 1, TimeUnit.DAYS);
        // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
        if (!flag) {
            return;
        }
        redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, message);
        RedisQueueMsg param = CastUtils.cast(JSON.parseObject(message, RedisQueueMsg.class));
        RedisResultVo result = new RedisResultVo(param);
        try {
            result.setServer(InetAddress.getLocalHost().getHostAddress());
            serviceMap.get(param.getServerName()).handle(param.getData());
            result.setStatus(Constant.SUCCESS);
        }catch (Exception e) {
            e.printStackTrace();
            serviceMap.get(param.getServerName()).callBack(param.getData(), e);
            result.setStatus(Constant.FAIL);
            result.setDesc(e.getMessage());
        }finally {
            result.setEndTime(new Date());
            redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, JSON.toJSONString(result), Constant.getScoreByDate());
        }
    }

    /**
     * 查询待处理任务
     *
     * @return
     */
    public Object getPendingTask() {
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
    }

    /**
     * 清理待处理任务
     *
     * @return
     */
    public Object cleanPendingTask() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        if(set.size() > 0) {
            redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY + Constant.PENDING_SUFFIX, set.toArray());
        }
        return "待处理任务已被清空!";
    }

    /**
     * 查询已处理任务
     *
     * @return
     */
    public Object getHandledTask() {
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
    }

    /**
     * 清理某天前的处理任务
     *
     * @param day 天数
     * @return
     */
    public Object cleanHandledTask(Integer day) {
        Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, Constant.getScoreByDate() - day);
        set.forEach(s -> {
            RedisResultVo obj = JSONObject.parseObject(s, RedisResultVo.class);
            if (obj.getStatus().equals(Constant.SUCCESS)) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
            }
        });
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, System.currentTimeMillis());
    }

    /**
     * 重新处理已处理任务
     */
    public String restartFailedTask() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
        StringBuilder sb = new StringBuilder();
        set.forEach(s -> {
            RedisResultVo obj = JSONObject.parseObject(s, RedisResultVo.class);
            if (!obj.getStatus().equals(Constant.SUCCESS)) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
                RedisQueueMsg msg = JSON.parseObject(JSON.toJSONString(obj.getData()), RedisQueueMsg.class); ;
                sendMessage(msg.getData(), msg.getServerName());
                sb.append(msg.getId()).append(",");
            }
        });
        return "以下任务被重启: "+ sb;
    }

    /**
     * 任务插队
     *
     * @param msgId 要插队的消息ID
     */
    public RedisQueueMsg cutInLineTask(String msgId) {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        for (String s : set) {
            RedisQueueMsg msg = JSONObject.parseObject(s, RedisQueueMsg.class);
            if (msg.getId().equals(msgId)) {
                CompletableFuture.runAsync(() -> {
                    this.handle(s);
                });
                return msg;
            }
        }
        throw new RuntimeException("未找到相关任务,该项任务已经在执行了!");
    }
}

/**
 * 业务服务类,继承抽象类务类,实现业务逻辑
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Service(Constant.DINNER_SERVER)
public class QueueDinnerService extends QueueBusiBasicService {

    @Autowired
    private QueueCommonService queueCommonService;

    /**
     * 年夜饭下单
     *
     * @param entity 订单信息
     * @return
     */
    public Object orderNewYearEveDinner(OrderEntity entity) {
        // 存储订单信息
        saveOrder(entity);
        queueCommonService.sendMessage(entity, Constant.DINNER_SERVER);
        // 异步开始做菜
        return "您已成功下单,订单号为"+ entity.getOrderCode()+",后厨正在准备预制菜!";
    }

    /**
     * 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
     *
     * @param req 订单信息
     */
    private void doNewYearEveDinner(OrderEntity req) throws Exception {
        System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
        Thread.sleep(10000);
        int i = new Random().nextInt(6) + 1;
        if (i ==4) {
            throw new Exception("厨师跑了");
        }
        if (i ==5) {
            throw new Exception("食物跑了");
        }
        if (i ==6) {
            throw new Exception("厨房着火了");
        }
        System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
    }

    /**
     * 保存订单信息
     *
     * @param req
     */
    private void saveOrder(OrderEntity req) {
        //这里假设做的是订单入库操作
        System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
    }

    /**
     * 根据订单编号修改订单信息
     *
     * @param orderCode 订单编号
     * @param dinnerStatus
     * @param remark
     */
    private void updateOrder(String orderCode, String dinnerStatus, String remark) {
        // 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
        System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
    }


    /**
     * 处理订单
     *
     * @param obj 业务类
     */
    @Override
    @SneakyThrows
    public void handle(Object obj) {
        OrderEntity entity = JSON.parseObject(JSON.toJSONString(obj), OrderEntity.class);
        doNewYearEveDinner(entity);
        updateOrder(entity.getOrderCode(), Constant.SUCCESS, "出餐成功");
    }

    @Override
    public void callBack(Object obj, Exception e) {
        OrderEntity entity = JSON.parseObject(JSON.toJSONString(obj), OrderEntity.class);
        System.out.println("更新订单 "+ entity.getOrderCode() +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为FAIL, 原因为 " +e.getMessage());
    }
}

@Component
public class RedisQueueListener implements MessageListener {
    @Autowired
    private QueueCommonService service;

    private final Object lock = new Object();

    @Override
    public void onMessage(Message message, byte[] pattern)  {
        synchronized (lock) {
            service.handle(message.toString());
        }
    }
}


/**
 * 业务控制层
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@RestController
@RequestMapping("/dinner")
public class DinnerController {

    @Autowired
    private QueueDinnerService dinnerService;

    private int i = 0;


    @GetMapping("/orderDinner")
    public Object orderDinner() {
        OrderEntity entity = new OrderEntity();
        entity.setOrderCode("Order" + (++i));
        entity.setCustomerName("第"+i+"位客户");
        return dinnerService.orderNewYearEveDinner(entity);
    }

}



/**
 * Redis工具控制器
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@RestController
@RequestMapping("/redisQueue")
public class RedisQueueController {
    @Autowired
    private QueueCommonService service;

    @GetMapping("/getPendingTask")
    public Object getPendingTask() {
       return service.getPendingTask();
    }

    @GetMapping("/cleanPendingTask")
    public Object cleanPendingTask() {
        return service.cleanPendingTask();
    }

    @GetMapping("/getHandledTask")
    public Object getHandledTask() {
        return service.getHandledTask();
    }

    @GetMapping("/cleanOldSucceedTask")
    public Object cleanHandledTask(@RequestParam("day") Integer day) {
        return service.cleanHandledTask(day);
    }

    /**
     * 这里还有最后一个问题, 把已办里的错误的信息摘除出来,重新走请求。并且反馈哪些信息重新走了请求。
     */
    @GetMapping("/restartFailedTask")
    public Object restartFailedTask() {
        service.restartFailedTask();
        return "重启失败的服务成功";
    }

    @GetMapping("/cutInLineTask")
    public Object cutInLineTask(@RequestParam("msgId")  String msgId) {
        RedisQueueMsg msg = service.cutInLineTask(msgId);
        return "任务 "+ JSON.toJSONString(msg) + "插队成功!";
    }

}

        组件化的调整就是把属于Redis队列的操作与业务类操作完全分开,这样,以后有别的业务需要引入组件处理时,只需要写个业务服务继承QueueBusiBasicService即可,最大限度的复用了队列的这套机制和代码。

        注意,本组件有它特定的适用场景:处理任务的频度不高,每次处理任务用时较长,而且任务有一定的小概率失败,失败之后重新处理不会影响最终处理结果。

        完成这个工具研发后,我结合之前在网上查到的Redis队列的一些案例,发现用别的方案可以更简单的去实现我要的效果,比如直接用Redis队列详解(springboot实战)里的方案,仅仅是因为不想在代码里写 while(true) 这种不是优雅的代码,再加上用Listener的方式长时间没对消息进行消费时,消息会丢失,因此才额外花费了这么多的功夫来打补丁。果然方向选错了,工作量会成倍的增加,希望看到本文的读者能引以为戒,不要自误啊。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1858245.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1992-2019年全球实际国内生产总值数据集

国内生产总值(GDP)&#xff0c;可以有效评价居民的经济状况和生活水平&#xff0c;在各项研究中都非常常用&#xff01;然而目前在全球不少国家中&#xff0c;对GDP统计存在着统计方法不当、蓄意操纵的行为&#xff0c;导致官方GDP统计数据难以真正反映出真实的经济发展水平。 …

推荐三款必备软件,个个五星好评,你一定不要错过

WiseCare365 WiseCare365是一款由WiseCleaner推出的综合性Windows系统优化和加速工具。它集成了多种功能&#xff0c;旨在帮助用户清理、优化和维护电脑系统&#xff0c;提升电脑性能和安全性。 WiseCare365的主要功能包括&#xff1a; 系统清理&#xff1a;它可以清理各种缓存…

【多模态】BEIT: BERT Pre-Training of Image Transformers

论文&#xff1a;BEIT: BERT Pre-Training of Image Transformers 链接&#xff1a;https://arxiv.org/pdf/2301.00184 Introduction BEIT&#xff08;Bidirectional Encoder representation from Image Transformers&#xff09;Motivation: 启发于BERT的自编码方式&#xf…

【网络协议】精讲TCP通信原理!图解超赞超详细!!!

亲爱的用户&#xff0c;打开微信&#xff0c;搜索公众号&#xff1a;“风云说通信”&#xff0c;即可免费阅读该文章~~ 目录 1. 建立连接 2. 数据传输 3. 断开连接 4. 抓包分析 前言 TCP 把连接作为最基本的对象&#xff0c;每一条 TCP 连接都有两个端点&#xff0c;这种端…

仓库管理系统03--设计登录页面

1、添加登录窗体 2、整体布局UI 1&#xff09;设计三个白底的边框&#xff0c;其中2个旋转角度&#xff0c;形成以下效果 3、设计登录控件 <Window x:Class"West.StoreMgr.Windows.LoginWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presenta…

【ETAS CP AUTOSAR基础软件】DCM模块详解(诊断)

文章包含了AUTOSAR基础软件&#xff08;BSW&#xff09;中DCM模块相关的内容详解。本文从ISO标准&#xff0c;AUTOSAR规范解析&#xff0c;ISOLAR-AB配置以及模块相关代码分析四个维度来帮读者清晰的认识和了解DCM这一基础软件模块。文中涉及的ISOLAR-AB配置以及模块相关代码都…

JetBrains WebStorm 2024 mac/win版:效率至上,编码无忧

JetBrains WebStorm 2024 是一款专为前端开发者和全栈工程师打造的高 级集成开发环境(IDE)。它提供了对JavaScript、TypeScript、HTML5、CSS等技术的全面支持&#xff0c;帮助开发者更高效地进行前端开发。 WebStorm 2023 mac/win版获取 WebStorm 2024 拥有强大的智能代码补全…

SpringBoot+Vue集成富文本编辑器

1.引入 我们常常在各种网页软件中编写文档的时候&#xff0c;常常会有富文本编辑器&#xff0c;就比如csdn写博客的这个页面&#xff0c;包含了富文本编辑器&#xff0c;那么怎么实现呢&#xff1f;下面来详细的介绍&#xff01; 2.安装wangeditor插件 在Vue工程中&#xff0c;…

锐捷统一上网行为管理与审计系统 static_convert.php 前台RCE漏洞复现

0x01 产品简介 锐捷统一上网行为管理与审计RG-UAC系列是星网锐捷网络有限公司自主研发的上网行为管理与审计产品,具备的上网行为日志审计功能,能够全面、准确、细致的审计并记录多种上网行为日志,包括网页、搜索、外发文件、邮件、论坛、IM等等,并对日志数据进行统计分析,…

开发基于Java语言的SaaS(Software-as-a-Service,软件即服务)模式的HIS系统详解 HIS系统源码 支持二开

开发基于Java语言的SaaS&#xff08;Software-as-a-Service&#xff0c;软件即服务&#xff09;模式的HIS系统详解 HIS系统源码 支持二开 开发基于Java语言的SaaS&#xff08;Software-as-a-Service&#xff0c;软件即服务&#xff09;模式的HIS&#xff08;Hospital Informat…

深入探索Jetpack数据绑定(DataBinding)

Jetpack的数据绑定&#xff08;DataBinding&#xff09;库为我们提供了一个强大而灵活的工具&#xff0c;用于将UI组件与数据源绑定在一起。本文将深入探讨数据绑定的高级用法&#xff0c;包括双向绑定、自定义Binding Adapter、使用LiveData和ViewModel&#xff0c;以及如何处…

爬虫笔记15——爬取网页数据并使用redis数据库set类型去重存入,以爬取芒果踢V为例

下载redis数据库 首先需要下载redis数据库&#xff0c;可以直接去Redis官网下载。或者可以看这里下载过程。 pycharm项目文件下载redis库 > pip install redis 然后在程序中连接redis服务&#xff1a; from redis import RedisredisObj Redis(host127.0.0.1, port6379)…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-27含并行连结的网络GoogLeNet

27含并行连结的网络GoogLeNet import torch from torch import nn from torch.nn import functional as F import liliPytorch as lp import matplotlib.pyplot as pltclass Inception(nn.Module):# c1--c4是每条路径的输出通道数def __init__(self, in_channels, c1, c2, c3, …

Linux高并发服务器开发(一)GCC和Make

文章目录 1 工作流程2 静态库和动态库连接2.1 静态连接2.2 动态链接 3 静态库制作和使用4 动态库的制作和使用5 GDB 调试器6 Makefile 1 工作流程 第一步预处理&#xff0c;生成.i 第二部生成汇编文件.s 第三部生成目标代码.o 第四部生成可执行文件 2 静态库和动态库连接 2.1…

充电桩---特斯拉NACS接口介绍

一、NACS接口发展 NACS是由特斯拉内部开发的&#xff0c;作为交流和直流充电的专有充电解决方案。2022年11月11日&#xff0c;特斯拉在官网上开放了自家的充电接口设计&#xff0c;并将特斯拉充电接口更名为NACS&#xff08;North American Charging Standard&#xff09;&…

Python列表比较:判断两个列表是否相等的多种方法

&#x1f4d6; 正文 1 通过排序的方式实现判断 list_a [a, b, c, d] list_b [c, d, a, b]if sorted(list_a) sorted(list_b):print(list_a与list_b的元素相等) else:print(list_a与list_b的元素不相等)通过排序&#xff0c;让两组列表中元素一直后进行判断&#xff0c;得到…

LONGHEADS:无需训练的多头注意力长文本处理框架

大模型&#xff08;LLMs&#xff09;在处理海量文本数据时展现出了前所未有的能力。然而这些模型在面对超出其训练时所见序列长度的长文本时存在两个主要问题&#xff1a;一是模型对于超出预训练长度的文本难以有效泛化&#xff0c;二是注意力机制的二次方时间复杂度导致计算成…

Marin说PCB之如何在CST仿真软件中添加三端子的电容模型?--02

小编我在上期文章的结尾给大家留下一个小问题就是&#xff1a;在三端子电容创建模型中间的部分我有说了一句就是&#xff1a;&#xff08;其中有一个creat reference pin 设置我们也默认不勾选&#xff09;&#xff0c;这个勾选不勾选有啥区别呢&#xff1f;这期文章就来给大家…

『 Linux 』 进程间通信 - 匿名管道 (万字)

文章目录 什么是管道匿名管道的直接原理pipe( )系统调用接口匿名管道代码示例匿名管道的特征总结 什么是管道 管道(Pipe) 是一种基本的进程间通信(IPC)机制,允许一个进程与另一个进程之间进行数据传输; 管道工作方式类似于生活中的水管因此命名为管道,数据从一端流入另一段流出…

学习笔记——动态路由——RIP(距离矢量协议)

一、距离矢量协议 1、距离矢量协议 矢量行为&#xff1a;协议收到一个路由之后&#xff0c;查看是否可以加入到本地的路由表中&#xff0c;如果可以加入&#xff0c;则可以传递&#xff0c;如果不可以加入&#xff0c;则无法传递。 距离矢量路由协议 RIP基于距离矢量算法(又…