微服务项目收获和总结---第5天(定时发布)

news2024/9/20 16:46:13

延迟任务

目录

  • 延迟任务
  • 技术对比: 
  •  Redis实现定时任务:​编辑
  • 新增任务:
  • 取消任务:
  • 拉取任务:
  • Zset定时刷新数据到List中:
  • 分布式锁实现定时任务只刷新一次:

技术对比: 

 

 Redis实现定时任务:

 问题1:为什么任务需要存储在数据库中?

问题2:为什么使用两种Redis的数据结构(List和Zset)?

 问题3:添加Zset数据时,为什么需要预加载?

        

新增任务:

添加任务到数据库进行持久化----->添加任务到Redis队列中------>如果任务的执行时间小于等于当前时间,则添加到当前任务List队列中------>如果任务的执行时间大于当前时间并且小于预设时间,就把其放到未来队列Zset中

@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {

    /**
     * 添加延迟任务
     *
     * @param task
     * @return
     */
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        boolean success = addTaskToDb(task);
        if (success) {
            //2.添加任务到redis
            addTaskToCache(task);
        }
        return task.getTaskId();
    }
 

    @Autowired
    private CacheService cacheService;
    /**
     * 把任务添加到redis中
     *
     * @param task
     */
    private void addTaskToCache(Task task) {
        String key = task.getTaskType() + "_" + task.getPriority();
        //获取5分钟之后的时间  毫秒值
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, 5);
        long nextScheduleTime = calendar.getTimeInMillis();
 
        //2.1 如果任务的执行时间小于等于当前时间,存入list
        if (task.getExecuteTime() <= System.currentTimeMillis()) {
            cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
        } else if (task.getExecuteTime() <= nextScheduleTime) {
            //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
            cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
        }
    }
 
    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
 
    /**
     * 添加任务到数据库中
     *
     * @param task
     * @return
     */
    private boolean addTaskToDb(Task task) {
        boolean flag = false;
        try {
            //保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task, taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);
 
            //设置taskID
            task.setTaskId(taskinfo.getTaskId());
 
            //保存任务日志数据
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo, taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);
 
            flag = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }
}

取消任务:

把数据库的任务删除----->更新任务的日志------>删除redis队列中的数据

/**
     * 取消任务
     * @param taskId
     * @return
     */
@Override
public boolean cancelTask(long taskId) {
    boolean flag = false;
    //删除任务,更新日志
    Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
    //删除redis的数据
    if(task != null){
        removeTaskFromCache(task);
        flag = true;
    }
    return false;
}
 
/**
     * 删除redis中的任务数据
     * @param task
     */
private void removeTaskFromCache(Task task) {
    String key = task.getTaskType()+"_"+task.getPriority();
    if(task.getExecuteTime()<=System.currentTimeMillis()){
        cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
    }else {
        cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
    }
}
 
/**
     * 删除任务,更新任务日志状态
     * @param taskId
     * @param status
     * @return
     */
private Task updateDb(long taskId, int status) {
    Task task = null;
    try {
        //删除任务
        taskinfoMapper.deleteById(taskId);
        //更新日志
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(status);
        taskinfoLogsMapper.updateById(taskinfoLogs);
 
        task = new Task();
        BeanUtils.copyProperties(taskinfoLogs,task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    }catch (Exception e){
        log.error("task cancel exception taskid={}",taskId);
    }
    return task;
}

 

拉取任务:

 /**
     * 按照类型和优先级拉取任务
     * @return
     */
@Override
public Task poll(int type,int priority) {
    Task task = null;
    try {
        String key = type+"_"+priority;
        //从List中取出任务
        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.isNotBlank(task_json)){
            task = JSON.parseObject(task_json, Task.class);
            //更新数据库信息,删除任务并且更新日志
            updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
        }
    }catch (Exception e){
        e.printStackTrace();
        log.error("poll task exception");
    }
 
    return task;
}

Zset定时刷新数据到List中+分布式锁实现定时任务只刷新一次:

如何获取Zset中所有的key。方案一:keys()获取,不推荐。方案二:scan()获取。

通过定时任务,使用redis管道,把Zset中的数据往List中传输 

 
/**
 * 未来数据定时刷新
 */
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
    //获取锁
    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
    if(StringUtils.isNotBlank(token)){
        log.info("未来数据定时刷新---定时任务");
 
        //获取所有未来数据的集合key
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        for (String futureKey : futureKeys) {//future_100_50
 
            //获取当前数据的key  topic
            String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
 
            //按照key和分值查询符合条件的数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
 
            //同步数据
            if(!tasks.isEmpty()){
                cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                log.info("成功的将"+futureKey+"刷新到了"+topicKey);
            }
        }
    }
}

数据库同步到Redis中:

 清除缓存----->查询出小于未来5分钟的所有任务------>新增任务到Redis中

 
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
    clearCache();
    log.info("数据库数据同步到缓存");
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);
 
    //查看小于未来5分钟的所有任务
    List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
    if(allTasks != null && allTasks.size() > 0){
        for (Taskinfo taskinfo : allTasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToCache(task);
        }
    }
}
 
private void clearCache(){
    // 删除缓存中未来数据集合和当前消费者队列的所有key
    Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
    Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
    cacheService.delete(futurekeys);
    cacheService.delete(topickeys);
}

其他微服务通过Fegin远程调用Schedule服务 :
@FeignClient("leadnews-schedule")
public interface IScheduleClient {
 
    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult  addTask(@RequestBody Task task);
 
    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
 
    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")  int priority);
}

文章发布和审核:

 发布:
 
@Autowired
private WmNewsTaskService wmNewsTaskService;
 
/**
     * 发布修改文章或保存为草稿
     * @param dto
     * @return
     */
@Override
public ResponseResult submitNews(WmNewsDto dto) {
 
    //0.条件判断
    if(dto == null || dto.getContent() == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
 
    //1.保存或修改文章
 
    WmNews wmNews = new WmNews();
    //属性拷贝 属性名词和类型相同才能拷贝
    BeanUtils.copyProperties(dto,wmNews);
    //封面图片  list---> string
    if(dto.getImages() != null && dto.getImages().size() > 0){
        //[1dddfsd.jpg,sdlfjldk.jpg]-->   1dddfsd.jpg,sdlfjldk.jpg
        String imageStr = StringUtils.join(dto.getImages(), ",");
        wmNews.setImages(imageStr);
    }
    //如果当前封面类型为自动 -1
    if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
        wmNews.setType(null);
    }
 
    saveOrUpdateWmNews(wmNews);
 
    //2.判断是否为草稿  如果为草稿结束当前方法
    if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
 
    //3.不是草稿,保存文章内容图片与素材的关系
    //获取到文章内容中的图片信息
    List<String> materials =  ectractUrlInfo(dto.getContent());
    saveRelativeInfoForContent(materials,wmNews.getId());
 
    //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
    saveRelativeInfoForCover(dto,wmNews,materials);
 
    //审核文章
    //        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
    wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
 
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
 
}
 消费:
@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
 
/**
     * 消费延迟队列数据
     */
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {
 
    log.info("文章审核---消费任务执行---begin---");
 
    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200) && responseResult.getData() != null){
        String json_str = JSON.toJSONString(responseResult.getData());
        Task task = JSON.parseObject(json_str, Task.class);
        byte[] parameters = task.getParameters();
        WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
        System.out.println(wmNews.getId()+"-----------");
        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
    }
    log.info("文章审核---消费任务执行---end---");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1702776.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Windows] GIF动画、动图制作神器 ScreenToGif(免费)

ScreenToGif 是开源免费的 Gif 动画录制工具&#xff0c;小巧原生单文件&#xff0c;功能很实用。它有录制屏幕、录制摄像头、录制画板、图像编辑器等功能&#xff0c;可以将屏幕任何区域及操作过程录制成 GIF 格式的动态图像。保存前还可对 GIF 图像编辑优化&#xff0c;支持自…

七年之痒!一个 PHP 程序员职业生涯的自述

大家好&#xff0c;我是码农先森。 今年刚好是我毕业的第七个年头&#xff0c;在婚姻感情当中都有一种「七年之痒」的说法&#xff0c;这次我把这个词「七年之痒」用一次在我的职业生涯复盘上。七年前我从告别校园&#xff0c;踏入互联网编程行业&#xff0c;七年后我依旧在编…

多线程笔记

1. run() VS start() run()方法&#xff1a; run()方法是java.lang.Runnable接口中定义的一个方法。当一个类实现了Runnable接口&#xff0c;并创建了一个线程对象时&#xff0c;你需要覆盖run()方法来定义线程要执行的任务。run()方法定义了线程的主体逻辑&#xff0c;当线程…

变异系数法

前言 变异系数法是一种根据统计学方法计算系统各指标变化程度的客观赋权法&#xff0c; 变异系数法在金融行业主要应用于风险评估、资产配置和绩效评价。 变异系数法是通过计算数据中包含的信息来确定各指标的权重。该方法认为&#xff0c;变化差异较大的指标应该被赋予较大的…

消息回复及时,客户不流失!这个微信自动回复设置快快码住!

你是不是也遇到过由于回复不及时&#xff0c;导致客户流失的情况发生&#xff1f;或是好友申请太多&#xff0c;来不及通过&#xff1f; 别担心&#xff0c;试试个微管理系统&#xff0c;让你实现自动回复&#xff0c;提高回复效率&#xff01; 1、自动通过好友 当有新的好友…

Qt QScript 之 C++/JavaScript相互调用

文章目录 Qt Script什么是ECMAScriptQt 中JavaScriptclass 详解Basic UsageQObject对脚本引擎可用使用信号槽connect 三种模式访问属性, 子对象使c++对象可用于用Qt Script编写的脚本C++ 类成员函数可用于脚本C++ 类属性可用于脚本对脚本中的c++对象信号的反应函数对象和本机函…

Day37 代码随想录打卡|二叉树篇---对称二叉树

题目&#xff1a; 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 方法&#xff1a;本体可以用递归和迭代两种方法&#xff0c;但我更喜欢迭代的方式&#xff0c;因此使用迭代的方式做一下。首先我们分析一下不对称的情况。因为对称的情况很简单&#xff0c;即两…

新购入的读码器该如何测试呢?

物联网技术的飞速发展&#xff0c;条码二维码作为一种高效、便捷的数据传输方式&#xff0c;已经广泛应用于仓储、物流配送、零售与结算、MES系统等生活和工业领域。新购的条码二维码读码器&#xff0c;在使用前要了解它的使用方法和性能&#xff0c;以确保其性能稳定、读取准确…

【练手项目】基于STM32的智能空调系统

项目设计说明&#xff1a; 所用到的知识点&#xff1a; GPIO、串口通信、 定时器、ADC采样、 LCD显示屏、 DHT11的通信协议。 功能概述&#xff1a; LCD显示屏&#xff1a;开机显示开启界面&#xff0c;设备自检成功后显示温湿度&#xff0c; 风机开关情况 &#xff0c;制冷片…

python数据处理与分析入门-Pandas数据可视化例子

相关内容 Matplotlib可视化练习 Pandas 数据可视化总结 柱状图 reviews[points].value_counts().sort_index().plot.bar()散点图 reviews[reviews[price] < 100].sample(100).plot.scatter(xprice, ypoints)蜂窝图 reviews[reviews[price] < 100].plot.hexbin(xprice…

Day08:CSS 高级

目标&#xff1a;掌握定位的作用及特点&#xff1b;掌握 CSS 高级技巧 一、定位 作用&#xff1a;灵活的改变盒子在网页中的位置 实现&#xff1a; 1.定位模式&#xff1a;position 2.边偏移&#xff1a;设置盒子的位置 leftrighttopbottom 水平方向偏移&#xff1a;left、…

图论(四)—最短路问题(Dijkstra)

一、最短路 概念&#xff1a;从某个点 A 到另一个点B的最短距离&#xff08;或路径&#xff09;。从点 A 到 B 可能有多条路线&#xff0c;多种距离&#xff0c;求其中最短的距离和相应路径。 最短路径分类&#xff1a; 单源最短路&#xff1a;图中的一个点到其余各点的最短路径…

成功案例(IF=7.4)| 代谢组+16s联合分析助力房颤代谢重构的潜在机制研究

研究背景 心房颤动&#xff08;AF&#xff09;是临床上最常见的持续性心律失常&#xff0c;具有显著的发病率和死亡率。高龄是房颤发病率、患病率和进展最显著的危险因素。与年龄在50-59岁之间的参与者相比&#xff0c;80-89岁之间的参与者患房颤的风险增加了9.33倍。目前尚不…

【第4章】SpringBoot整合Lombok

文章目录 前言一、准备1. 安装插件2. 引入库 二、使用1.实体类2.测试类3. 输出 总结 前言 Project Lombok是一个java库&#xff0c;它可以自动插入编辑器和构建工具&#xff0c;为您的java程序锦上添花。 再也不要写另一个getter或equals方法了&#xff0c;只要有一个注释&…

国内AI大模型的下半场-「百模大战免费篇」上线,让我们直接梦回十年前

| 我们正在经历第九次「烧钱」大战。 这个故事&#xff0c;大概要从2024年5月6号&#xff0c;一个叫DeepSeek-V2的模型开始说起。 那一天&#xff0c;DeepSeek宣布开源他们的第二代MoE大模型——DeepSeek-V2。根据披露的信息显示&#xff0c;该模型在性能上比肩GPT-4 Turbo。刚…

加仓硬核科技最好的时刻来了!

5月27日消息&#xff0c;港股三大指数V型转涨&#xff0c;恒指涨1.17%&#xff0c;科指涨1.72%&#xff0c;国指涨1.25%。板块方面&#xff0c;大型科技股走势分化&#xff0c;美团、阿里、腾讯小幅上涨&#xff0c;网易跌1.97%&#xff0c;京东跌0.67%&#xff0c;联想股价创历…

算术运算符

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 算术运算符是处理四则运算的符号&#xff0c;在数字的处理中应用得最多。常用的算术运算符如表4所示。 表4 常用的算术运算符 运 算 符 说 明…

Java:IO

首 java.io中有百万计的类&#xff0c;如何找到自己需要的部分&#xff1f; 流 IO涉及到一个“流”stream的概念&#xff0c;可以简单理解成数据从一个源头到一个目的地。明白数据从哪来&#xff0c;要到哪里去&#xff0c;数据流中是字节还是字符之后&#xff0c;才能找到自…

家政预约小程序06服务展示

目录 1 首页展示2 团购详情总结 在家政小程序中&#xff0c;最重要的信息就是各项服务的内容。顾客通过服务的信息&#xff0c;了解家政公司可以提供什么样的服务以及相关的收费。本篇我们介绍一下服务展示功能如何开发。 1 首页展示 在首页我们已经开发了活动展示、服务分类展…

来自Java的“菱形继承“,你听说过吗?

一、菱形继承的概念 菱形继承又叫做钻石继承&#xff0c;指的是不同的类同时继承自相同的父类&#xff0c;存在一个子类同时继承这些不同的类&#xff0c;即我们常说的“多继承”问题。 例如&#xff1a;B类和C类分别继承A类&#xff0c;而D类同时继承B类和C类。 如此图所示 二…