RabbitMQ之TTL机制

news2025/1/9 17:12:39

        在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 

该如何实现?

  • 定期轮询(数据库等)
    • 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
    • 优点:设计实现简单
    • 缺点:需要对数据库进行大量的IO操作,效率低下。
  • Timer
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
            Timer timer = new Timer();
            TimerTask timerTask = new TimerTask() {
                @Override
                public void run() {
                    System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
                    timer.cancel();
                }
            };
            System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
            // 10秒后执行timerTask
            timer.schedule(timerTask, 10 * 1000);
    • 缺点
      • Timers没有持久化机制
      • Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
      • Timers 不能利用线程池,一个timer一个线程
      • Timers没有真正的管理计划
  • ScheduledExecutorService
            SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
            // 线程工厂
            ThreadFactory factory = Executors.defaultThreadFactory();
            // 使用线程池
            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
            System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
            service.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("用户未付款,交易取消:" + format.format(new Date()));
                }// 等待10s 单位秒
            }, 10, TimeUnit.SECONDS);
    • 优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。
    • 在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。
  • RabbitMQ:使用TTL
  • Quartz
  • Redis Zset
  • JCronTab
  • SchedulerX
  • 。。。

TTL,Time to Live 的简称,即过期时间。

RabbitMQ 可以对消息和队列两个维度来设置TTL。

   任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。

目前有两种方法可以设置消息的TTL。

  1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息自身进行单独设置,每条消息的TTL 可以不同。

        如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的

1、原生API案例

package com.lagou.rabbitmq.demo;

import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.80.121:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<>();
        // 消息队列中消息的过期时间
        arguments.put("x-message-ttl", 10 * 1000);
        // 如果消息队列中没有消费者,则10s后消息过期,队列将会被自动删除
        arguments.put("x-expires", 60 * 1000);

        channel.queueDeclare("queue.ttl.waiting",
                true,
                false,
                false,
                arguments);

        channel.exchangeDeclare("ex.ttl.waiting",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                false,
                null);

        channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentEncoding("utf-8")
                .deliveryMode(2) // 设置消息持久化,2代表设置消息持久化
                .build();

        channel.basicPublish("ex.ttl.waiting", "key.ttl.waiting", properties, "等待的订单号".getBytes(StandardCharsets.UTF_8));

        channel.close();
        connection.close();
    }
}

此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

  • 如果不设置TTL,则表示此消息不会过期;
  • 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

        注意理解message-ttlx-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

2、springboot案例

(1)pom.xml添加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

(2)application.properties添加rabbitmq连接信息

spring.application.name=ttl
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

(3)主入口类

package com.lagou.rabbitmq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo07.class, args);
    }
}

(4)RabbitConfig类

package com.lagou.rabbitmq.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queueTTLWaiting() {
        Map<String, Object> props = new HashMap<>();
        // 对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    }

    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", false, false, false);
        return queue;
    }

    @Bean
    public Exchange exchangeTTLWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
        return exchange;
    }

    /**
     * 该交换器使用的时候,需要给每个消息设置有效期
     *
     * @return
     */
    @Bean
    public Exchange exchangeWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
        return exchange;
    }

    @Bean
    public Binding bindingTTLWaiting() {
        return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl - waiting").noargs();
    }

    @Bean
    public Binding bindingWaiting() {
        return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
    }
}

(5)PayController类

package com.lagou.rabbitmq.demo.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

@RestController
public class PayController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/pay/queuettl")
    public String sendMessage() {
        rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttlwaiting", "发送了TTL-WAITING-MESSAGE");
        return "queue-ttl-ok";
    }

    @RequestMapping("/pay/msgttl")
    public String sendTTLMessage() throws UnsupportedEncodingException {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");
        Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        return "msg-ttl-ok";
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/50711.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【自然语言处理(NLP)】聊天机器人模块实现

【自然语言处理&#xff08;NLP&#xff09;】聊天机器人模块实现 作者简介&#xff1a;在校大学生一枚&#xff0c;华为云享专家&#xff0c;阿里云专家博主&#xff0c;腾云先锋&#xff08;TDP&#xff09;成员&#xff0c;云曦智划项目总负责人&#xff0c;全国高等学校计算…

《异常检测——从经典算法到深度学习》20 HotSpot:多维特征 Additive KPI 的异常定位

《异常检测——从经典算法到深度学习》 0 概论 1 基于隔离森林的异常检测算法 2 基于LOF的异常检测算法 3 基于One-Class SVM的异常检测算法 4 基于高斯概率密度异常检测算法 5 Opprentice——异常检测经典算法最终篇 6 基于重构概率的 VAE 异常检测 7 基于条件VAE异常…

[附源码]计算机毕业设计JAVA鲜花销售管理系统

[附源码]计算机毕业设计JAVA鲜花销售管理系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybati…

【学习笔记02】node的模块化和内置模块

一、nodejs的模块化 &#x1f602; nodejs 所有的东西都是模块 &#xff08;一&#xff09;node模块的分类 1. 自定义模块&#xff1a;自己写的模块 2. 内置模块&#xff1a;nodejs提供的模块 3. 第三方模块 别人写好的东西, 上传到某一个位置(npm), 我们去 npm 去下载到我们本…

SpringBoot+MyBatis和MyBatisPlus+vue+ElementUl实现批量删除 我只能说太简单了

目录准备工作MySQL数据库表Result返回结果1、SpringBootMyBatisPlusvueElementUl实现批量删除1.1、演示GIF动态图1.2、实体类1.3、Dao接口1.4、Service接口1.5、ServiceImpl接口实现层1.6、Controller控制层1.7、Vue前端2、SpringBootMyBatisvueElementUl实现批量删除2.1、演示…

关于数据权限的设计

在项目实际开发中我们不光要控制一个用户能访问哪些资源&#xff0c;还需要控制用户只能访问资源中的某部分数据。 控制一个用户能访问哪些资源我们有很成熟的权限管理模型即RBAC&#xff0c;但是控制用户只能访问某部分资源&#xff08;即我们常说的数据权限&#xff09;使用R…

云计算-JavaAPI与Hadoop的互联的实现

云计算-JavaAPI与Hadoop的互联的实现 文章目录云计算-JavaAPI与Hadoop的互联的实现一、环境准备二、HDFS 基本的命令操作三、HDFS客户端操作IntelliJ IDEA 环境准备通过API操作HDFS主函数程序进行连接测试1. 初始化hdfs连接获得FileSystem对象1. HDFS获取文件系统2. HDFS创建文…

Redis集群方案备忘录

文章目录哨兵模式官方Redis ClusterJedis&#xff08;客户端分片&#xff09;Codis&#xff08;代理分片&#xff09;哨兵模式 优点 哨兵模式是基于主从模式的&#xff0c;解决可主从模式中master故障不可以自动切换故障的问题。缺点 &#xff08;1&#xff09;是一种中心化的…

Express 6 指南 - 路由 6.3 路线路径 Route paths

Express Express 中文网 本文仅用于学习记录&#xff0c;不存在任何商业用途&#xff0c;如侵删 文章目录Express6 指南 - 路由6.3 路线路径 Route paths6 指南 - 路由 6.3 路线路径 Route paths 【这翻译得…生怕国人看懂】 路由路径与请求方法相结合&#xff0c;定义了可以…

大数据培训课程之序列化案例实操

序列化案例实操 1. 需求 统计每一个手机号耗费的总上行流量、下行流量、总流量 &#xff08;1&#xff09;输入数据 &#xff08;2&#xff09;输入数据格式&#xff1a; 7 13560436666 120.196.100.99 1116 954 200 id…

编辑器实现思路

复杂项目 业务的复杂性: 交互的复杂性数据结构和状态的复杂性,例如级联选择器需要遍历树结构,还有一些需要链表、栈、队列等多项目依赖,工程的复杂性性能优化流程的复杂性 git flowlint 工具单元测试commit信息Code ReviewCI/CD开发一个编辑器 例如低代码的编辑器 编辑器…

如何批量旋转图片?学会这三种方法就能轻松实现

对于喜爱拍照的小伙伴来说&#xff0c;你们的手机或者相机应该有很多图片素材吧。那么在整理这些图片到电脑的时候&#xff0c;你们的图片会不会出现方向不一致的情况呢&#xff1f;有的是倒着的&#xff0c;有的是左旋了90。想要将这些图片都调整为同一个方向&#xff0c;靠手…

Delete `␍` 最简单最有效的解决方法和解释(VScode)

一、原因 VScode 出现 Delete ␍ 的原因&#xff0c;大部分都是因为安装了 Prettier 插件指定了文件的结尾换行符与系统不一致导致的&#xff0c;就是下面这个插件 由于历史原因&#xff0c;windows 和 linux 两个系统的文本文件的换行符不一致&#xff1b;Windows在换行的时候…

空域图像增强-图像灰度变换

1.图像灰度变换。自选一张图片&#xff0c;完成以下图像处理&#xff1a;①显示图像的灰度直方图&#xff1b;②直方图均衡化&#xff0c;对比变化前后的图像和灰度直方图&#xff1b;③对图像进行线性灰度变换&#xff0c;对某部分灰度值进行扩展&#xff0c;压缩其它灰度值区…

汽车空调器前缸盖数控加工工艺的制订及夹具设计

目  录 摘  要 &#xff11; Abstract 2 1 引言 3 2 零件的分析 4 2.1 零件的作用 4 2.1.1空调压缩机的功用和要求 4 2.1.2 汽车空调压缩机的一般结构 4 2.1.3 斜盘式压缩机的结构特点 4 2.1.4 斜盘式压缩机的优点 5 2.2 零件的工艺分析 5 3 数控机床的加工性能分析 10 3.1…

[附源码]计算机毕业设计JAVA校园跑腿系统

[附源码]计算机毕业设计JAVA校园跑腿系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis M…

2023年软考备考,系统分析师知识点速记,速看

2023上半年软考系统分析师知识点速记分享给大家&#xff0c;快来一起打卡学习吧&#xff01; 1、企业集成分类&#xff1a;按组织范围分 2、企业集成分类&#xff1a;按集成点分 3、企业战略与信息化战略集成方法 业务与IT整合&#xff08;BITA&#xff09;&#xff1a;重心是…

计算机组成原理习题课第三章-4(唐朔飞)

计算机组成原理习题课第三章-4&#xff08;唐朔飞&#xff09; ✨欢迎关注&#x1f5b1;点赞&#x1f380;收藏⭐留言✒ &#x1f52e;本文由京与旧铺原创&#xff0c;csdn首发&#xff01; &#x1f618;系列专栏&#xff1a;java学习 &#x1f4bb;首发时间&#xff1a;&…

计算机网络——OSI参考模型

计算机网络的分层结构 OSI参考模型 世界上第一个提出网络结构的公司IBM公司&#xff0c;其他的公司有美国国防部提出的TCP/IP 为了支持不同的网络结构的互联互通&#xff0c;国际标准化组织于1984年提出的开放的系统互联&#xff08;OSI&#xff09;参考模型。 OSI参考模型在…

光栅莫尔信号四倍频细分电路模块的设计与仿真研究

笔者电子信息专业硕士毕业&#xff0c;获得过多次电子设计大赛、大学生智能车、数学建模国奖&#xff0c;现就职于南京某半导体芯片公司&#xff0c;从事硬件研发&#xff0c;电路设计研究。对于学电子的小伙伴&#xff0c;深知入门的不易&#xff0c;特开次博客交流分享经验&a…