Springboot集成RabbitMq+延时队列

news2024/11/23 1:06:10

1. 引入jar包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置yml

2.1 配置生产者yml

 

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
    publisher-returns: true  #开启发送失败退回
    # simple:同步等待confirm结果,直到超时
    #correlated:异步回调,次你故意ConfirmCallback,MQ返回结果时会回调这个ComfirmCallback
    publisher-confirm-type: correlated

2.2 配置消费者yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        delayed-topic-input:
          destination: delayed-topic-demo #将消费者队列绑定到指定交换机
          group: group-1 
          #消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息
          consumer:
            delayed-exchange: true #开启延时,生产者和消费者端都需要开启这个配置

 3.生产者生产消息

3.1 direct 直连

把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中

3.1.1 直连队列消息发送

/***直接交换机 **/
    public static final String directExchange = "directExchangeOne";
    public static final String routingKey1 = "directKey1";
    public static final String routingKey2 = "directKey2";
    public static final String directQueue1 = "directQueueOne";
    public static final String directQueue2 = "directQueueTwo";


/**
     * 直接交换机 一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者
     * 通过指定路由键directRouting发送给交换机directExchange
     * 交互机directExchange通过指定的路由键把消息msg投递到对应的队列上面去
     * @param map
     */
    public void directToQueue(Map<String, String> map) {
        map.put("direct-路由key:",RabbitConstants.routingKey1);
        rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey1, map);
        map.put("direct-路由key:",RabbitConstants.routingKey2);
        rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey2, map);
    }

3.1.2 直连队列消息绑定

package rabbit.config;

import config.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置类 : 创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系
 * direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
 *
 *  */
@Configuration
public class DirectConfig {

    /**
     * Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列
     * @return
     */
    @Bean
    public DirectExchange directExchangeOne(){
        return new DirectExchange(RabbitConstants.directExchange);
    }

    @Bean
    public Queue directQueueOne(){
        return new Queue(RabbitConstants.directQueue1);
    }

    @Bean
    public Queue directQueueTwo(){
        return new Queue(RabbitConstants.directQueue2);
    }

    /**
     * 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息
     * 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
     * direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
     * @param directQueueOne
     * @param directExchangeOne
     * @return
     */
    @Bean
    public Binding directBindingOne(Queue directQueueOne, DirectExchange directExchangeOne){
        return BindingBuilder.bind(directQueueOne).to(directExchangeOne).with(RabbitConstants.routingKey1);
    }

    /**
     * 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息
     * 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
     * direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
     * @param directQueueTwo
     * @param directExchangeOne
     * @return
     */
    @Bean
    public Binding directBindingTwo(Queue directQueueTwo, DirectExchange directExchangeOne) {
        return BindingBuilder.bind(directQueueTwo).to(directExchangeOne).with(RabbitConstants.routingKey2);

    }

}

3.1.3 直连队列消息接收

@RabbitListener(queues = RabbitConstants.directQueue1)
    @RabbitHandler // 指定对消息的处理
    public void directClientOne(HashMap<String,String> mes){
        System.out.println("直连队列消息1:" + mes);
    }

    /** @RabbitListener(queues = {"directQueue1","directQueue2"}):这样就可以一次消费两条消息 **/
    @RabbitListener(queues = RabbitConstants.directQueue2)
    @RabbitHandler
    public void directClientTwo(HashMap<String,String> mes){
        System.out.println("直连队列消息2: " + mes);
    }

 3.1.4 结果:

3.2 fanout 扇形

把消息发送到所有与它绑定的Queue中,没有路由概念

3.2.1 扇形消息发送

@Autowired
    public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

/***
     * 扇形交换机
     * 这个交换机没有路由键概念,就算你绑了路由键也是无视的
     * 消息会发送到所有绑定的队列上。
     * @param fanoutMap1
     */
    public void fanoutToQueue(Map<String, String> fanoutMap1) {
        fanoutMap1.put("fanout-交换机:",RabbitConstants.fanoutExchange1);
        rabbitTemplate.convertAndSend(RabbitConstants.fanoutExchange1,null,fanoutMap1);
    }

3.2.2 扇形消息绑定

/**
 * 扇形交换机
 * Fanout:转发消息到所有绑定队列,没有路由key
 * */
@Configuration
public class FanoutConfig {

    /**
     * 不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange1(){
        return new FanoutExchange(RabbitConstants.fanoutExchange1);
    }

    @Bean
    public Queue fanoutQueue1(){
        return new Queue(RabbitConstants.fanoutQueue1);
    }

    @Bean
    public Queue fanoutQueue2(){
        return new Queue(RabbitConstants.fanoutQueue2);
    }

    /** 扇形交换机没有路由key */
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange1){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
    }

    /** 扇形交换机没有路由key */
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);

    }

}

3.2.3 扇形消息接收

/** 扇形交换机 */
    public static final String fanoutExchange1 = "fanout_exchange1";
    public static final String fanoutQueue1 = "fanout_queue1";
    public static final String fanoutQueue2 = "fanout_queue2";


@RabbitListener(queues = RabbitConstants.fanoutQueue1)
    @RabbitHandler
    public void fanoutQueue1(HashMap<String,String> fanoutMes){
        System.out.println("扇形队列消息1: " + fanoutMes);
    }

    @RabbitListener(queues = RabbitConstants.fanoutQueue2)
    @RabbitHandler
    public void fanoutQueue2(HashMap<String,String> fanoutMes){
        System.out.println("扇形队列消息2: " + fanoutMes);
    }

3.2.4 扇形--结果

3.3  topic 主题

将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中--多了匹配的概念

3.3.1 主题队列消息发送

@Autowired
    public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }


/***主题交换机:模糊匹配队列
     * *:星号表示任意一个字符
     * 	#:表示任意一个或者多个字符
     */
    // topic 的 routingKey
    public static final String topicA = "helloTopic.world";
    public static final String topicB = "helloTopic.#";
    public static final String topicAll = "#";

    public static final String topicExchange = "topic_exchange";
    /** 绑定 topicA = "helloTopic.world"*/
    public static final String topicQueue1 = "topic_queue1";
    /** 绑定 topicB="helloTopic.#"*/
    public static final String topicQueue2 = "topic_queue2";
    /** 绑定 #,匹配所有 */
    public static final String topicQueue3 = "topic_queue3";


/**
     * 主题交换机:模糊匹配队列
     * topic.# 可匹配topic topic.add topic.add.add
     * topic.* 可匹配topic.add  topic.delete
     * @param map
     */
    public void topicToQueue(Map<String, String> map) {
        // 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
        map.put("Topic-路由key:",RabbitConstants.topicA);
        rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicA, map);

        map.put("Topic-路由key:",RabbitConstants.topicB);
        rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicB, map);

        map.put("Topic-路由key:",RabbitConstants.topicAll);
        rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicAll, map);

    }

3.3.2 主题队列消息绑定

/***
 * 按规则转发消息
 */
@Configuration
public class TopicConfig {

    /**
     * Topic Exchange 转发消息主要是根据通配符
     * 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中
     * @return
     */
    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange(RabbitConstants.topicExchange);
    }

    @Bean
    public Queue topicQueue1(){
        return new Queue(RabbitConstants.topicQueue1);
    }

    @Bean
    public Queue topicQueue2(){
        return new Queue(RabbitConstants.topicQueue2);
    }

    @Bean
    public Queue topicQueue3(){
        return new Queue(RabbitConstants.topicQueue3);
    }

    /**
     * 消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,
     * Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中
     * @param topicQueue1
     * @param topicExchange1
     * @return
     */
    @Bean
    public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange1){
        return BindingBuilder.bind(topicQueue1).to(topicExchange1).with(RabbitConstants.topicA);
    }

    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange1){
        return BindingBuilder.bind(topicQueue2).to(topicExchange1).with(RabbitConstants.topicB);
    }

@Bean
    public Binding topicBinding3(Queue topicQueue3, TopicExchange topicExchange1){
        return BindingBuilder.bind(topicQueue3).to(topicExchange1).with(RabbitConstants.topicAll);
    }

}

3.3.3 主题队列消息接收

@RabbitListener(queues = RabbitConstants.topicQueue1)
    @RabbitHandler
    public void topicQueue1(HashMap<String,String> topicMes){
        System.out.println("主题消息队列1: " + topicMes);
    }

    @RabbitListener(queues = RabbitConstants.topicQueue2)
    @RabbitHandler
    public void topicQueue2(HashMap<String,String> topicMes){
        System.out.println("主题消息队列2: " + topicMes);
    }

    @RabbitListener(queues = RabbitConstants.topicQueue3)
    @RabbitHandler
    public void topicQueue3(HashMap<String,String> topicMes){
        System.out.println("主题消息队列匹配所有: " + topicMes);
    }

3.3.4 主题--结果

3.4 Delayed 延时(需要延时插件,参考我另一篇插件安装)

3.4.1 延时队列消息发送

 /** 延迟队列 */
    public static final String DELAYED_EXCHANGE_NAME = "myDelayedExchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";

/**
     * 死信延迟队列
     * @param message
     */
    public void sendDelayedMessage(String message) {
        System.out.println("Send time 开始: " + LocalDateTime.now());
        rabbitTemplate.convertAndSend(RabbitConstants.DELAYED_EXCHANGE_NAME,
                RabbitConstants.DELAYED_ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000); // 设置消息的延长时间延,单位毫秒
                    return messagePostProcessor;
                });

        System.out.println("Send time 结束: " + LocalDateTime.now() );
    }

3.4.2 延时队列消息绑定

public class DelayedConfig {

    /** 定义一个延迟交换机 **/
    @Bean
    public CustomExchange delayedExchange() {
        /*Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");*/
        return new CustomExchange(RabbitConstants.DELAYED_EXCHANGE_NAME,
                "x-delayed-message", // 消息类型  x-delayed-message
                true, // 是否持久化
                false); // 是否自动删除
    }

    /** 延时队列 **/
    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(RabbitConstants.DELAYED_QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    /** 绑定队列到这个延迟交换机 */
    @Bean
    public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitConstants.DELAYED_ROUTING_KEY).noargs();
    }
}

3.4.3 延时队列消息接收

@RabbitListener(queues = RabbitConstants.DELAYED_QUEUE_NAME)
    public void receiveDelayedMessage(String message,  Channel channel) {
        System.out.println("Received delayed message: " + message);
        log.info("当前时间:{},接收时长信息给延迟队列:{}", LocalTime.now(),message);
        System.out.println("Received time: " + LocalDateTime.now() + "  Received: " + message);
    //    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }

3.4.4 延时--结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1588756.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言.指针(5)

指针&#xff08;5&#xff09; 1.sizeof和strlen的对比1.1sizeof1.2strlen1.3sizeof和strlen的对比 2.数组和指针笔试题解析2.1一维数组2.2字符数组2.3二维数组 3.指针运算笔试题解析3.1 题目13.2 题目23.3 题目33.4 题目43.5 题目53.6 题目63.7 题目7 1.sizeof和strlen的对比…

SNRO 编号范围对象管控,唯一ID

事务代码:SNRO 代码引用: DATA: MAXTID TYPE I,NEWNO TYPE CHAR8. CALL FUNCTION NUMBER_RANGE_ENQUEUE EXPORTING OBJECT ZQC57 EXCEPTIONS FOREIGN_LOCK 1 OBJECT_NOT_FOUND 2 SYSTEM_FAILURE 3 OTHERS …

【论文阅读——Profit Allocation for Federated Learning】

1.摘要 由于更为严格的数据管理法规&#xff0c;如《通用数据保护条例》&#xff08;GDPR&#xff09;&#xff0c;传统的机器学习服务生产模式正在转向联邦学习这一范式。联邦学习允许多个数据提供者在其本地保留数据的同时&#xff0c;协作训练一个共享模型。推动联邦学习实…

开发日志2024-04-12

开发日志2024/04/12 1、分店月业绩和年业绩都需要添加为真实数据 **开发思路&#xff1a;**分店下所属的技师的业绩总和 代码实现&#xff1a; 前端 无 后端 //TODO 将技师多对应的积分累加到他所属的分店的月/年累计业绩销量中//TODO 查询技师所对应的分店地址String f…

智算时代的基础设施如何实现可继承可演进?浪潮云海发布 InCloud OS V8 新一代架构平台

从 2023 年开始持续火爆的 AIGC 正在加速落地应用&#xff0c;为全行业带来生产生活效率的变革与升级。面对数字化转型与智能化转型&#xff0c;对于技术团队来说&#xff0c;既要根据业务与 AI 应用去部署以云为基础的 AI 算力&#xff0c;又要与已有数据和系统&#xff08;甚…

网络流量分析与控制

⚠申明&#xff1a; 未经许可&#xff0c;禁止以任何形式转载&#xff0c;若要引用&#xff0c;请标注链接地址。 全文共计5477字&#xff0c;阅读大概需要3分钟 &#x1f308;更多学习内容&#xff0c; 欢迎&#x1f44f;关注&#x1f440;【文末】我的个人微信公众号&#xf…

架构设计参考项目系列主题:新零售SaaS架构:客户管理系统架构设计

什么是客户管理系统? 客户管理系统,也称为CRM(Customer Relationship Management),主要目标是建立、发展和维护好客户关系。 CRM系统围绕客户全生命周期的管理,吸引和留存客户,实现缩短销售周期、降低销售成本、增加销售收入的目的,从而提高企业的盈利能力和竞争力。 …

YOLOV5 分类:利用yolov5进行图像分类

1、前言 之前介绍了yolov5的目标检测示例,这次将介绍yolov5的分类展示 目标检测:YOLOv5 项目:训练代码和参数详细介绍(train)_yolov5训练代码的详解-CSDN博客 yolov5和其他网络的性能对比 yolov5分类的代码部分在这 2、数据集准备 yolov5分类的数据集就是常规的摆放方式…

PyCharm远程链接AutoDL

AutoDL使用方法&#xff1a; Step1&#xff1a;确认您安装的PyCharm是社区版还是专业版&#xff0c;只有专业版才支持远程开发功能。 Step2&#xff1a;开机实例 复制自己实例的SSH指令&#xff0c;比如&#xff1a;ssh -p 38076 rootregion-1.autodl.com 在ssh -p 38076 roo…

AWS CloudFront + Route53 + EC2 + Certificate Manager

CloudFront Route53 EC2 Certificate Manager 教程 先理解它是怎么运转的 用户请求Route53解析到CloudFront&#xff0c;CloudFront解析EC2也就是资源。 了解了运作&#xff0c;接下来就一步步实现 首先处理CloudFront解析资源EC2 EC2是服务器&#xff0c;并不是资源&…

SQLite从出生到现在(发布历史记录)(二十二)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;从 SQLite 3.5.9 迁移到 3.6.0&#xff08;二十一&#xff09; 下一篇&#xff1a;SQLite—系列文章目录 引言&#xff1a; SQLite拥有别人无法比拟的装机量&#xff0c;究竟什么成就了SQLite呢&#xff0c;本…

uni-app调用苹果登录,并获取用户信息

效果 模块配置 dev中的配置 需要开启登录的权限&#xff0c;然后重新下载配置文件&#xff0c;发布打包基座&#xff0c;再运行程序 代码 <button click"appleLogin">苹果登录</button>function appleLogin() {uni.login({provider: apple,success: …

预印本仓库ArXiv——防止论文录用前被别人剽窃

文章目录 一、什么是预印本二、什么是ArXiv2.1 ArXiv的领域2.2 如何使用 一、什么是预印本 预印本&#xff08;Preprint&#xff09;是指科研工作者的研究成果还未在正式出版物上发表&#xff0c;而出于和同行交流目的自愿先在学术会议上或通过互联网发布的科研论文、科技报告…

使用了代理IP怎么还会被封?代理IP到底有没有效果?

代理IP作为一种网络工具&#xff0c;被广泛应用于各种场景&#xff0c;例如网络爬虫、海外购物、规避地区限制等。然而&#xff0c;很多用户在使用代理IP的过程中却发现自己的账号被封禁&#xff0c;这让他们不禁产生疑问&#xff1a;使用了代理IP怎么还会被封&#xff1f;代理…

前端开发攻略---简化响应式设计:利用 SCSS 优雅管理媒体查询

1、演示 2、未优化前的代码 .header {width: 100px;height: 100px;background-color: red; } media (min-width: 320px) and (max-width: 480px) {.header {width: 10px;} } media (min-width: 320px) and (max-width: 480px) {.header {height: 20px;} } media (min-width: 48…

Pygame教程10:在背景图片上,添加一个雪花特效

------------★Pygame系列教程★------------ Pygame经典游戏&#xff1a;贪吃蛇 Pygame教程01&#xff1a;初识pygame游戏模块 Pygame教程02&#xff1a;图片的加载缩放旋转显示操作 Pygame教程03&#xff1a;文本显示字体加载transform方法 Pygame教程04&#xff1a;dra…

【银行测试】性能瓶颈出现崩溃怎么办?支付类测试关注点整理...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、银行系统线上因…

基于springboot实现桂林旅游景点导游平台管理系统【项目源码+论文说明】

基于springboot实现桂林旅游景点导游平台管理系统演示 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了桂林旅游景点导游平台的开发全过程。通过分析桂林旅游景点导游平台管理的不足&#xff0c;创建了一个计算…

速率再次翻倍!现有SSD显卡又要被强制换代了

去年 AMD Ryzen 7000 和 Intel 第 13 代 CPU 发布的同时&#xff0c;消费级的 PCI-E 5.0 平台出现在了大众视野。 这个用了快 20 年的接口虽然外形上似乎没变过、新老平台通吃&#xff0c;但其实内在已更新了好多个版本了。 和 3.0 换 4.0 一样&#xff0c;5.0、6.0 换代只是时…