任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK

news2025/1/11 23:52:10

任务调度框架

Java中如何实现定时任务?

比如:
1.每天早上6点定时执行
2.每月最后一个工作日,考勤统计
3.每个月25号信用卡还款
4.会员生日祝福
5.每隔3秒,自动提醒

10分钟的超时订单的自动取消,每隔30秒或1分钟查询一次订单,拿当前的时间上前推10分钟
定时任务,资源会有误差的存在,如果使用定时任务
定时任务,用于统计的时候最多。

自动统计考勤,一般0点之后开始统计,可以使用定时任务

nacos心跳

晚上要求和采购部门生成采购单,达到最低预警值的时候,去发给采购部门

我们可以通过任务调度框架实现上述的需求
任务调度框架,可以实现定时任务,实现间隔多少时间的重复执行,实现指定日期的重复执行

电商自动好评,间隔时间长的,误差几分钟,影响不大。

java中任务调度框架:
1、Spring Task spring自带的
2、Quartz 古老的框架
3、XXL-Job
4、第三方云平台-比如说:阿里云-SchedulerX等等
选择一个:Spring Task
2个注解+
包:task任务、job

使用步骤:
1、开关类,使用注解
@EnableScheduling // 开启任务调度
在这里插入图片描述
2、定义定时任务

@Scheduled(cron = "0/3 * * * ?")

CORN表达式:
秒 分 时 日 月 星期几 年 其中,只有年可以省略
定时任务,需要重复执行的方法

CORN表达式:
特殊字符串,主要用来描述时间的,用于任务调度等
https://cron.qqe2.com/

/ 间隔
- 是连续
, 枚举值
L 最后,星期、日中用
W 有效工作日
LW 某个月最后一个工作日
# 用于确定每个月第几个星期几,母亲节或父亲节
4#2 某个月的第二个星期三  4代表星期三,中文的时候有可能不影响

在这里插入图片描述

在这里插入图片描述

项目名:SpringTask01
Spring Web、Lombok

RabbitMq实现延迟:

死信+延迟消息处理

死信:RabbitMQ的队列中的消息,满足以下条件任意其一就会成为死信消息:
1.消息被拒绝
2.消息过期
3.队列满了
死信交换器:专门用来转发队列中的死信消息,将死信消息转发到指定的队列中

十分钟未支付,取消订单?
十分钟之后,消息会过期,过期后,通过死信交换器转发队列中的死信消息,将死信消息转发到指定的队列中,由消费者去处理。

我们可以通过死信+死信交换器实现延迟消息处理
RabbitMQ实现延迟消息处理有2种方式:
1、死信+死信交换器 代码实现(可控,更方便一些)
2、延迟消息插件

1、死信+死信交换器 代码实现(可控,更方便一些)
发送消息,到队列1,(产生延迟队列,产生死信)
在这里插入图片描述
一个消息,过一段时间,实现消费。

核心:
1.队列 是2个队列,第1个队列: 目的 产生死信(1.设置有效期2.不设置消费者),借助死信交换器,把产生的死信发到指定的队列中
第二个对了:目的 消费死信,这里获取的信息,时间就是延迟的,延迟的就是上面的有效期
2.1个交换器
死信交换器,整个系统一般就一个,可以用来转发各个功能产生的死信,是Direct类型的交换,通过RK进行消息匹配到对应的队列中。
RabbitMQ的消息的有效期有2种设置方式:
1.设置队列上的有效期,整个队列中所有消息都使用
2.可以在每个消息上设置有效期,这种适用于有多个不同有效期的消息

在这里插入图片描述
如果队列和消息都有有效期,谁短听谁的。

代码:
RabbitMQ02
实现:
1.pom

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--        RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

2.代码:
config-RabbitMQConfig

package com.yd.rabbitmq02.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author MaoXiqi
 * @organization: Lucky
 * @create 2023-10-16 11:35:54
 * @description RabbitMQConfigApi
 */
@Configuration
public class RabbitMQConfig {
    // 1.创建2个队列
    @Bean
    public Queue createQ1() {
        // 1.设置队列 内部消息有效期 设置死信交换器 设置RK
        HashMap<String, Object> params = new HashMap<>();
        // 设置队列中每个消息的有效期 单位 毫秒
        params.put("x-message-ttl", 3000);
        // 设置对应的死信交换器
        params.put("x-dead-letter-exchange", "dead-ex-yd");
        // 设置交换器匹配的路由名称
        params.put("x-dead-letter-routing-key", "test");
        return QueueBuilder.durable("dl-q01").withArguments(params).build();
    }
    @Bean
    public Queue createQ2() {
        return new Queue("dl-q02");
    }
    // 2.创建1个交换器(1.fanout 2,direct 3.topic 4.header)-死信交换器direct类型
    @Bean
    public DirectExchange createDe() {
        return new DirectExchange("dead-ex-yd");
    }
    // 3.创建1个绑定
    @Bean
    public Binding createBd1(DirectExchange de) {
        return BindingBuilder.bind(createQ2()).to(de).with("test");
    }
}

在这里插入图片描述

2.1.创建两个队列
1、设置队列,内部消息有效期 设置死信交换器 设置RK 注意:参数名是固定的,值根据业务需求去改,值 单位是毫秒

2.2创建1个交换器(1.fanout 2.direct 3.tipic
4.header)-死信交换器direct类型

2.3.创建一个绑定

controller-DeadController

    @GetMapping("send")
    public String sendDead(String msg) {
        System.out.println("发送消息," + msg + ",发送时间:" + System.currentTimeMillis());
        template.convertAndSend("", "dl-q01", msg);
        return "ok";
    }

发送
在这里插入图片描述
监听消息-主要是为了消费:
listener-DeadListener

    @RabbitListener(queues = "dl-q02")
    public void handler(String m) {
        System.out.println("延迟消息,"+m+",接受时间:" +System.currentTimeMillis());
    }

在这里插入图片描述
yml:

spring:
  rabbitmq:
    host: 121.36.5.100
    port: 5672
    username: guest
    password: guest
server:
  port: 8082

测试:
在这里插入图片描述

RabbitMQ事务:

数据库中事务:保证数据一致性,特别是多个操作要么都成功,要么都失败
RabbitMQ事务:一次性发多条消息,需要开启事务,
RabbitMQ也有自己的事务,如果本次在这里插入图片描述

在这里插入图片描述

使用步骤:

源码:RabbitMQ02
1.创建配置类
2.使用基于事务发送
3.监听消息

1.创建配置类
config-RabbitMQTranConfig
1.准备一个队列
2.创建事务管理器

// 创建事务管理器
    @Bean
    public RabbitTransactionManager createTran(ConnectionFactory factory) {
        return new RabbitTransactionManager(factory);
    }

在这里插入图片描述

2.controller-TranController
开启事务
发送消息

    // 事务
   @Transactional // 需要开启SpringBoot的事务机制
   @GetMapping("sendmsg")
   public String sendMsg(String msg, int count) {
       // 开启 RabbitMQ的通道的事务
       template.setChannelTransacted(true);
       // 发送消息
       for (int i = 0; i < count; i++) {
           template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
           // 出错,看看 事务是否生效
           if (i==2) {
               System.out.println(1/0);
           }
       }
       return "ok";
   }

在这里插入图片描述

3.listener-DeadListener

    // 事务
    @RabbitListener(queues = "yd-tran-q01")
    public void handler2(String msg) {
        System.out.println("监听消息"+msg);
        // 处理业务逻辑 出错了
    }

在这里插入图片描述

在这里插入图片描述

手动ACK

RabbitMQ怎么防止消息丢失:
1.发送端没有发送过去
解决:
1.用事务
2.confirm消息确认机制 万能:转人工处理
2.MQ服务器丢失,MQ服务器蹦了,
解决:开启持久化
3.消费端消息丢失:
解决:自动应答,改成开始手动ACK

消息确认机制:默认是自动确认

消息的发送和接收是异步

RabbitMQ如何防止消息丢失:
1.
代码:
config-RabbitMqTranConfig

    //消费消息的⼿动应答
    @Bean
    public Queue createQ4() {
        return new Queue("yd-ack-q01");
    }

controller-DeadController

    // 事务
    @Transactional // 需要开启SpringBoot的事务机制
    @GetMapping("sendmsg")
    public String sendMsg(String msg, int count) {
        // 开启 RabbitMQ的通道的事务
        template.setChannelTransacted(true);
        // 发送消息
        for (int i = 0; i < count; i++) {
            template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
            // 出错,看看 事务是否生效
            if (i==2) {
                System.out.println(1/0);
            }
        }
        return "ok";
    }

listener-DeadListener
一般设置个上限,比如最多三次

    @RabbitListener(queues = "yd-ack-q01")
    public void handler3(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //消费者获取消息,默认采用的自动应答,就是获取就应答,这样MQ服务器就删除消息
        //还可以手动应答:结果:1.成功(MQ删除)2.失败(MQ消息)
        System.out.println("收到ACK消息,监听消息:" + msg);
        //拒绝消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝 3.是否把消息添加回队列中
        channel.basicNack(tag,false,true);

        //成功消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝
        //channel.basicAck(tag,true);
        // 处理业务逻辑 出错了
    }

在这里插入图片描述
在这里插入图片描述

消费消息的手动应答:
RabbitMQ默认的消费者消息获取模式采用的是手动应答
但是这种有缺陷,可能会出现,消息获取了但是业务出了问题,导致MQ也自动删除了消息,最终导致业务没有执行
所以为了解决这种问题,可以开启手动应答模式,结合自己的业务执行情况,如果业务执行成功,那么就成功应答,如果失败了,就拒绝消息,同时把消息再加回队列,这样就可以再次消息再次处理(加个上限)
在这里插入图片描述
测试
在这里插入图片描述

RabbitMQ如何保证消息的幂等性:
幂等性就是重复消费。
解决:
1.生成一个全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有小费过。
2.
在这里插入图片描述

用户充值,重复消费,相当如充值了多次,是一定要杜绝的。

在这里插入图片描述

不要返回值的时候,可以用RabbitMQ替代OpenFegin,因为消费完就不回了。MQ默认是单向的
短信发信可以用MQ,这个业务要做,做起来可能会很耗时,中间要经过运营商,过程不可控

RabbitMQ应用场景
消息通信,发送消息和接收消息,是异步
1.实现服务通信
用在微服务中,实现2个服务的通信,这种不带返回值的,只是为了执行另一个服务的方法执行
2.解决耗时操作
比如:邮件、短信、第三方接口等,比较耗时
3.解耦
4.提升性能
5.重复代码封装
6.削峰填谷(订单先下到redis中,再通过MQ和延迟队列,慢慢的从redis搬到mysql中)

关键词:异步、解耦、延迟

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1099902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Gson反序列化原理

前言 序列化和反序列化是日常工作中经常使用的工具&#xff0c;一般用于对象的持久化保存或者对象的网络传输&#xff0c;一般有两种情况&#xff0c;一种是对象本身实现了Serializable接口&#xff0c;这种情况下可以利用jdk自带的功能或者Kryo等这种封装好的序列化反序列化工…

10.Linear Map transformation rules

线性映射 从一个基底到另一个基底 所遵循的转换规则。 假设&#xff1a; 由一个矩阵给出的线性映射在这&#xff0c;并且是在基底e上表示&#xff0c; 该线性映射将e1变成0.5个e1&#xff0c;将e2变成2个e2&#xff1b; 假设有个向量V&#xff0c;其分量是【1&#xff0c;1…

福昕阅读器打开pdf文档时显示的标题不是文件名

0 Preface/Foreword 1 现象 文件名为&#xff1a;Demo-20231017 打开效果&#xff1a;显示名字为 word template 2 解决方法 2.1 利用打印方式将word生产pdf 在word生产pdf文件时&#xff0c;使用打印方式生成pdf文档。 2.2 删除word文档设置的标题 文件---》信息---》标…

“赔率”和“概率”

赔率与概率不同 Odds ! Probability 许多人将“赔率”和“概率”这两个词混为一谈。它们都表示对可能性或机会的估计。我可以理解普通人这样做&#xff0c;但我经常看到数据科学家和统计学家也混淆了这些概念&#xff0c;这是一种遗憾&#xff0c;因为在数学上它们有不同的含…

LVGL_基础控件btnmatrix

LVGL_基础控件btnmatrix 1、创建按钮矩阵 /* 分配按钮以及按钮上的文字 */ // 这里注意不要使用在函数退出时会被销毁的局部变量 // 可以用 "\n" 进行换行&#xff0c;最后一个按钮之后的元素必须是 NULL 或 "" // 换行 "\n" 和 "NULL&quo…

Z-Fighting问题解决(二) - Reverse-z

Z值相关技术概论理清 开始之前先简单理清科普一下涉及的跟三维图形学相关的深度Z相关的概念&#xff1a; Z-Buffer&#xff1a;上个世纪八十年代的图形学飞跃节点算法之一&#xff1b; Z缓冲是一种解决深度排序问题的方法&#xff0c;主要用于确定哪些物体在其他物体的前面。…

【Oracle】分析函数partition by,解决了使用group by后select语句中只能是分组的字段或者是一个聚合函数的问题

首先我们看一下group by的用法&#xff0c;比如根据省份分组。 select province, sum(persons) from t_person group by province;使用了group by后&#xff0c;select语句中只能是分组的字段&#xff08;比如上面的province&#xff09;或者是一个聚合函数&#xff08;比如co…

【特纳斯电子】基于单片机的火灾监测报警系统-仿真设计

视频及资料链接&#xff1a;基于单片机的火灾监测报警系统-仿真设计 - 电子校园网 (mcude.com) 编号&#xff1a; T0152203M-FZ 设计简介&#xff1a; 本设计是基于单片机的火灾监测报警系统&#xff0c;主要实现以下功能&#xff1a; 1.通过OLED显示温度、烟雾、是否有火…

jenkins pipeline使用

1、jenkins全局配置 1.1、maven配置 1.2、jdk配置 1.3、git配置 2、构建环境配置 2.1、安装时间插件 Date Parameter 2.2、Git Parameter 插件安装 3、pipeline如下 pipeline {agent anyenvironment {image_name "192.168.122.150/ken-test/price-service:${date}&…

jmeter监控服务器的资源使用

一. 下载安装包SeverAgent-x.x.x.zip并安装到被监控服务器 下载地址&#xff1a;https://github.com/undera/perfmon-agent 下载完解压后执行运行&#xff0c;windows运行startAgent.bat, linux运行startAgent.sh 二. 在jmeter上添加插件jpgc-PerfMon Metrics Collector监听器…

电商API接口的发展,电商API接口主要应用场景:

随着互联网技术的不断进步和电商行业的迅猛发展&#xff0c;电商API接口在商品交易、物流配送、客户服务等方面发挥着越来越重要的作用。本文将深入探讨电商API接口的技术原理、应用场景、开发方法以及优缺点。 一、技术原理 电商API接口是基于HTTP、TCP、IP等网络协议实现的…

代码随想录二刷 Day41

509. 斐波那契数 这个题简单入门&#xff0c;注意下N小于等于1的情况就可以 class Solution { public:int fib(int n) {if (n < 1) return n; //这句不写的话test能过但是另外的过不了vector<int> result(n 1); //定义存放dp结果的数组&#xff0c;还要定义大小r…

基于复合优化加速算法研究实际问题

import optimtool as oo from optimtool.base import np, sp, pltpip install optimtool>2.5.0加载hybird.nesterov.accer方法 import optimtool.hybrid as oh nes_acc oh.nesterov.accer初始化输入数据 f ( x ) ∑ i 1 n ( ( n − ∑ j 1 n cos ⁡ x j ) i ( 1 − co…

jenkins出错与恢复

如果你的jenkins出现了如下图所示问题&#xff08;比如不能下载插件&#xff0c;无法保存任务等&#xff09;&#xff0c;这个时候就需要重新安装了。 一、卸载干净jenknis 要彻底卸载 Jenkins&#xff0c;您可以按照以下步骤进行操作&#xff1a; 1、停止 Jenkins 服务&…

学信息系统项目管理师第4版系列29_信息系统治理

1. IT治理 1.1. 描述组织采用有效的机制对信息技术和数据资源开发利用&#xff0c;平衡信息化发展和数字化转型过程中的风险&#xff0c;确保实现组织的战略目标的过程 1.2. 驱动因素 1.2.1. 信息孤岛 1.2.2. 信息资源整合目标空泛 1.3. 高质量IT治理因素 1.3.1. 良好的I…

flask基础开发知识学习

之前做了一些LLM的demo&#xff0c;接口用flask写的&#xff0c;但是涉及到后端的一些业务就感觉逻辑写的很乱&#xff0c;代码变成屎山&#xff0c;于是借助官方文档和GPT迅速补了一些知识&#xff0c;总结一下一个很小的模板 于是决定边学边重构之前的代码… 文章目录 代码结…

吃鸡达人必备神器,提升战斗力享受顶级游戏干货!

大家好&#xff01;今天我为大家介绍一款专为吃鸡玩家打造的神器——吃鸡盒子。无论您是新手还是老玩家&#xff0c;吃鸡盒子都能帮助您提升游戏的战斗力&#xff0c;分享顶级游戏作战干货&#xff0c;并且还能方便吃鸡作图、查询库存和保护账号安全。 让我们先来说说提升战斗力…

使用Premiere、PhotoShop和Audition做视频特效

今天接到一个做视频的任务&#xff0c;给一个精忠报国的视频&#xff0c;要求&#xff1a;   ①去掉人声&#xff0c;就是将唱歌的人声去掉&#xff0c;只留下伴奏&#xff1b;   ②截图视频中的横幅&#xff0c;做一个展开的效果&#xff0c;类似卷纸慢慢展开&#xff1b;…

Golang学习:基础知识篇(二)—— 数组及切片

Golang学习&#xff1a;基础知识篇&#xff08;二&#xff09;—— 数组及切片 前言什么是Golang&#xff1f;Go语言的基础语法数组声明数组初始化数组访问数组知识点补充 切片定义切片切片初始化len() 和 cap() 函数空(nil)切片切片截取append() 和 copy() 函数知识点补充 前言…

【版本控制】Git(学习笔记)

一、Git工作流程图 clone&#xff08;克隆&#xff09;: 从远程仓库中克隆代码到本地仓库checkout &#xff08;检出&#xff09;&#xff1a;从本地仓库中检出一个仓库分支然后进行修订add&#xff08;添加&#xff09;: 在提交前先将代码提交到暂存区commit&#xff08;提交&…