2.rabbitMQ之交换机

news2025/1/10 23:56:52

1.交换机的作用

1.默认交换机会自动指定队列
2.之前一个信息必须被消费1次,现在的是一个消息可以被消费多次(发送到不同队列的前提下,正常情况下一个队列只能消费一次)
3.消息先发给交换机,然后交换机发给多个队列,可以达到多次消费的效果

如图mq3
在这里插入图片描述

2.交换机的类型

  1. 默认交换机 无名 ""指定
    1.直接类型 direct(一个routingKey绑定一个队列,一个交换机绑定一个队列)
    2.主题 topic(一个交换机绑定多个队列,主要是通过表达式来实现)
    3.标题 headers(不常用)
    4.扇出 fanout(一个信息被交换机全部队列接收,相当于QQ聊天)

3.临时队列 没有D的队列 一旦断开连接,队列会被自动删除

   //获得临时队列,features有 AD和 Excl
    String QName=channel.quequeDeclare().getQueue();

4.交换机和队列绑定binding

   //在界面, 添加queue然后添加exchange,然后在交换机 添加队列,
       rountingkey代表想要发给哪个队列,后面可以指定哪个可以接收特定的信息

5.fanout(相当于qq群的广播,一条消息被多台计算机 接收) ,队列名可以写为空
1. 2个消费者 声明交换机的名字和类型 主要代码如下

  channel.exchangeDeclare(name,"fanout");
     String QName=channel.queueDeclare().getQueue();
      //队列名,交换机名,routingKey
      channel.queueBind(QName,exName,"");
             -------完整代码-------
public class exchangeConsumer1 {
    public static final String EXCHANGE_NAME="log";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //得到临时队列
        String QName=channel.queueDeclare().getQueue();
        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.queueBind(QName,EXCHANGE_NAME,"");
        DeliverCallback deliverCallback=(tag, delivery)->{

            System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
        };
        CancelCallback nCallback=(tag)->{

            System.out.println("失败应答");
        };

        boolean IsAck=false;
        channel.basicConsume(QName,IsAck,deliverCallback,(tag)->{});



    }
}
   2.生产者 也要不用再次声明交换机,不用队列名了,用交换机名就可以接收消息
 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
           -------完整代码-------
public class exchangeProducer {
    public static final String EXCHANGE_NAME="log";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //交换机的名字和类型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
		//模拟生产者不停发消息
        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//            boolean flag=channel.waitForConfirms();
//            if(flag){
//                System.out.println("消息已经写入磁盘的确认");
//
//            }

        }

    }
}

6.直接交换机 direct(rountingKey相等就是fanout交换机,不相等就是direct)(路由)
//可以绑定队列
1.提供者修改交换机类型和routingKey

      channel.exchangeDeclare(EXCHANGE_NAME,"direct");
      channel.basicPublish(EXCHANGE_NAME,"wrong", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
2.消费者修改交换机类型和routingKey
 channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(QName,EXCHANGE_NAME,"error");

7.topic主题交换机,direct只能一个交换机绑定一个队列,这个可以路由多个队列(生产者不声明队列,只声明交换机,消费者声明队列和交换机)
1.routingKey的写法单词不能全部是字母(就成direct交换机了) 如aa.bb.mq *代替1个单词 #代替0个或多个单词

如 lazy.#可以匹配 lazy.ngs.me
*.*.rabbit 匹配 ngs.me.rabbit

2.注意 绑定是 #相当于fanout交换机   #和*都没有出现就是direct交换机
3.代码 交换机为topic

#消费者.queueBind()可以写多次

     -------代码------
       public class TopicProducer {
    public static final String EXCHANGE_NAME="log2";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        channel.confirmSelect();
        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//        channel.queueDeclare("Q1",false,false,false,null);
        Scanner scanner = new Scanner(System.in);



        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            //fanout交换机模式
//            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
            //direct交换机,这里发送的routingKey可以自行修改测试
			            channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//            boolean flag=channel.waitForConfirms();
//            if(flag){
//                System.out.println("消息已经写入磁盘的确认");
//
//            }

        }

    }
} 
       ----消费者-----
  public class TopicConsumer1 {
    public static final String EXCHANGE_NAME="log2";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //得到临时队列

        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName="Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        //!!!核心代码,就是发送到的rountingKey和队列绑定
        channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
        channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.aa");
        DeliverCallback deliverCallback=(tag, delivery)->{
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
        };
        CancelCallback nCallback=(tag)->{

            System.out.println("失败应答");
        };

        boolean IsAck=false;
        //消费信息
        channel.basicConsume("Q1",IsAck,deliverCallback,(tag)->{});



    }
}

8.死信队列(不能被消费的信息放到死信队列,防止消息丢失) 管理界面队列有DLK代表开启了死信

1.来源
—1.消息TTL过期(信息指定时间会过期)
—2.队列达到最大长度(队列满了)
—3.信息被拒绝(basic.reject或basic.nack不应答) requeue=false(信息不放回队列,丢失?)

------2.实现(绑定普通队列和死信队列) 普通和死信交换机都为 dirrect
如图mq4
请添加图片描述

   //普通队列要声明的时候加入arg才会转发到死信队列 !!!注意设置的普通队列的args,不是死信的
    -----消费者-----
  Map<String,Object> args=new HashMap();
  args.put("x-dead-letter-exchange","DEAD_EXCHANGE_NAME");
  //设置死信 routingKey
args.put("x-dead-letter-routing-key","lisi");
     channe.queueDeclare(...,args);
//过期时间这里可以设置,也可producer声明
//声明队列
--------生产者--------  
//设置ttl time to live过期时间
   channel.basicPublish(...,prop,...);             

9.死信队列之队列达到最大长度

   //消费者设置最大正常队列的长度

10.死信队列之信息被拒绝

 //消费者
     channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);//不放回队列
//一定要开启手动应答

11.延迟队列.是死信队列的过期时间(企业上班案例)
//整合springboot 选2.3.1,复制依赖,复制配置
//整合后不用自己声明队列和交换机,由专门的配置类写
1.配置文件类


  @Configuration //配置类上面写
     //声明交换机
     @Bean("xExchange")
     public DirectExchange xExchange(){
return new DirectExchange("name");
    }
    //声明队列
    @Bean("queueA")
     public Queue xExchange(){
             //指定死信 xxx参数
return QueueBuilder.durable(Queue_A).withArguments(xxx).build();
    }
   //死信队列
    @Bean("queueA")
     public Queue xExchange(){
             //没有参数
return QueueBuilder.durable(DEAD_QUEUE).build();
    }
   //绑定,名字必须要有语义化 semantic
      @Bean
     public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
             //,队列和交换机绑定,并指定routingKey
return BindingBuilder.bind(queueA).to(xExchange).with("XB");
    }    
    2.写controller接收消息
@Autowired
 private RabbitTemplate rabbitTemplate;
log.info("xxxx{},xxx{}",new Date().toString(),msg);
//发送消息来自spring公司的工具,转发到交换机和发送
  rabbitTemplate.convertAndSend("X","routingKey","xxmsg")
   3.死信消费者接收消息 需要监听器
    @Slf4j
     @Component
           xx class
                    @RabbitListener(queues="QD") 
                //注意!!!msg是spring的类,Channel是mq的,刚开始导错包
           public void xx(Message msg,Channel channel){
sout(new String(msg.getBody()));
       }

12.延迟优化 不在队列声明写延迟时间,而在生产者的声明,就不用一直更新队列代码
图mq5

请添加图片描述
----1.增加一个不设置时间的队列

//controller当生产者
rabbitTemplate.convertAndSend("exchange_x", "XB", "消息来自 ttl 为 xS 的队列: "+message, correlationData ->{
        correlationData.getMessageProperties().setExpiration(ttl);
        return correlationData;
    });

2.死信做延迟的缺陷,因为是排队的,所以发送多条消息不同延迟时间,按第一条的时间延迟(mq只会检查第一条消息是否过期)(导致先发送时间长的数据一直等待,其他后发送的数据在等待完成用同一时间送达)

3.解决方法(使用插件) 将我们的插件 复制到 mq的plugin的文件夹下(到exchange页面会多一个x-delayed-messeage选项)由交换机延迟了

//文件夹的路径
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
//插件放到文件夹后,enable开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 //只能配置类写自定义交换机
 @Bean
 public CustomExchange delayedExchange(){

 
  }   
 //绑定交换机和队列
//生产者设置时间,注意之前是 setExpiration现在是setDelay交换机延迟  
    //代码如下
    -----配置类-------
@Configuration
public class TtlQueueConfig{
    private  static String Exchange_X= "exchange_x";
    private  static String Exchange_Y_DEAD= "exchange_y_dead";
    private  static String QUEUE_A= "queue_a";
    private  static String QUEUE_B= "queue_b";
    private  static String QUEUE_D= "queue_dead";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(Exchange_X);
    }
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Exchange_Y_DEAD);
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型 !!!
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                args);
    }


    @Bean("queueA")
    public Queue queueA(){
//        Map<String, Object> args = new HashMap<>();
//        //声明当前队列绑定的死信交换机
//        args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
//        //声明当前队列的死信路由 key
//        args.put("x-dead-letter-routing-key", "YD");
//        //声明队列的 TTL
//        args.put("x-message-ttl", 10000);
//        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
        return QueueBuilder.durable(QUEUE_A).build();
    }
    @Bean("queueB")
    public Queue queueB(){
//        Map<String, Object> args = new HashMap<>();
//        //声明当前队列绑定的死信交换机
//        args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
//        //声明当前队列的死信路由 key
//        args.put("x-dead-letter-routing-key", "YD");
//        //声明队列的 TTL
//        args.put("x-message-ttl", 40000);
//        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
        return QueueBuilder.durable(QUEUE_B).build();
    }
    @Bean("queueDead")
    public Queue queueDead(){
        return QueueBuilder.durable(QUEUE_D).build();
    }

    @Bean("xBindingQueueA")
    public Binding xBindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean("xBindingQueueB")
    public Binding xBindingQueueB(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean("yBindingQueueDead")
    public Binding yBindingQueueDead(@Qualifier("queueDead") Queue queueDead,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueDead).to(yExchange).with("YD");
    }



    @Bean
    public Binding bindingDelayedQueue(@Qualifier("queueDead") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }





} 
    ---------死信消费者-------
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues="queue_dead")
    public void deadMeg(Message msg, Channel channel){
        String s = new String(msg.getBody());

        log.info("时间,{}消息{}",new Date(),s);
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/480425.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【模块系列】DY-SV17F语音播放模块

前言 本文针对官方给的应用手册进行补充和加上个人理解。在官方的资料中已经介绍的很详细了&#xff0c;我就节选部分出来&#xff0c;基本认识模块就行了吧。本来还行自己介绍呢&#xff0c;没想到官方写这么详细了&#xff0c;也不知道介绍啥了&#xff0c;现在单纯的写为个人…

网络安全:钟馗之眼ZOOMEYE搜索引擎使用

网络安全&#xff1a;钟馗之眼ZOOMEYE搜索引擎 地址&#xff1a; 首页 - 网络空间测绘,网络安全,漏洞分析,动态测绘,钟馗之眼,时空测绘,赛博测绘 - ZoomEye("钟馗之眼")网络空间搜索引擎 zoomeye是针对互联网空间的搜索引擎&#xff0c;收录了互联网空间中的设备、…

103-Linux_I/O复用方法之epoll

I/O复用方法之epoll 一.epoll介绍二.epoll相关的函数1.epoll_create2.epoll_ctl3.epoll_wait 三.LT和ET模式1.LT模式2.ET模式 四.epoll实现TCP服务器1.代码(1)服务器端(2)客户端代码 2.运行结果截图 一.epoll介绍 epoll 是 Linux 特有的 I/O 复用函数。它在实现和使用上与 sel…

小程序swiper控件的使用

swiper实现左右滑动,以及tab点击,并且给swiper绑定下拉刷新事件 <view class"swiper-tab"><view class"start swiper-tab-list {{currentTab0 ? on : }}" data-current"0" catchtap"swichNav">私教课</view><vi…

sed进阶之创建sed实用工具

shell脚本编程系列 加倍行间距 sed $!G data2.txt保留空间的默认值是一个空行&#xff0c;通过G命令可以将保留空间内的内容附加到模式空间内容之后&#xff0c;但是最后一行不需要附加&#xff0c;所以通过排除命令!进行排除 对可能含有空行的文件加倍行间距 sed /^$/d;$!G …

c++11上篇

c11 1.C11简介2.列表初始化2.1 &#xff5b;&#xff5d;初始化2.2 std::initializer_list 3.变量类型推导3.1 auto3.2 decltype3.3 nullptr 4.范围for循环5.final与override6.智能指针7.新增加容器---静态数组array、forward_list以及unordered系列8.默认成员函数控制9.右值引…

C++——类和对象[中]

0.关注博主有更多知识 C知识合集 目录 1.类的默认成员函数 2.构造函数和析构函数基础 3.构造函数进阶 4.析构函数进阶 5.拷贝构造函数 6.运算符重载 7.日期类 7.1输入&输出&友元函数 8.赋值运算符重载 9.const成员函数 9.1日期类完整代码 10.取地址重载 …

pandas简介

pandas的两个主要数据结构是&#xff1a;Series&#xff08;一维数据&#xff09;、DataFrame&#xff08;二维数据&#xff09;。 Series Series是一种类似于NumPy中一维数组的对象&#xff0c;它由一组任意类型的数据以及一组与之相关的数据标签组成。 import pandas as pd…

( 数组和矩阵) 240. 搜索二维矩阵 II ——【Leetcode每日一题】

❓240. 搜索二维矩阵 II 难度&#xff1a;中等 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,4,7,1…

排序(快速排序 归并排序)

目录 一、快速排序 思路 动画演示 模板 注意点 二、归并排序 思路 动画演示 模板 注意点 三、习题 1.第k个数 2.数组中的逆序对* 一、快速排序 时间复杂度&#xff1a; 平均情况O(nlog2n) 最坏情况O(n^2) 思路 1. 确定分界点x (可取为q[l]、q[r]或 q[(l r) / 2])…

数据结构---堆的实现

文章目录 前言一、什么是堆&#xff1f;二、堆的实现 1.堆的结构 2.接口实现总结 前言 堆(Heap)是计算机科学中一类特殊的数据结构&#xff0c;是最高效的优先级队列。堆通常是一个可以被看做一棵完全二叉树的数组对象。 一、什么是堆&#xff1f; 现实中我们通常把堆(一种二叉…

如何借助快解析实现Tomcat的外网访问

Tomcat深受Java爱好者喜爱&#xff0c;是一个免费开源的轻量级Web应用服务器&#xff0c;是开发调试JSP程序的首选。在项目开发中&#xff0c;常遇到需要远程调试或外网演示的情况&#xff0c;在没有公网IP、路由器不做映射的情况下&#xff0c;如何将Tomcat发布到外网&#xf…

推荐5款免费好用的chatGPT平台

1 ShellGPT 这是一款出色的客户端&#xff0c;无需APIkey和科学上网即可访问chatGPT3.5以及绘画AI。项目的github地址如下&#xff1a;https://github.com/akl7777777/free-chatgpt-client-pub/&#xff0c;可在主页下载windows、linux和macOS的安装包&#xff0c;安装后即可使…

【C++】线程库互斥量库原子性操作库

文章目录 线程库thread类的介绍线程对象的构造方式thread提供的成员函数获取线程的id的方式线程函数参数join与detach 互斥量库&#xff08;mutex&#xff09;mutex的种类lock_guard和unique_lock 原子性操作库&#xff08;atomic&#xff09;条件变量库&#xff08;condition_…

Docker中应用OpenDDS

Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。 容器是完全使用沙箱机制,相互之间不会有任何接口(类似 iPhone 的 app)。几乎没有性能开销,可以很容易地在机器和数据…

【网络】网络基础协议概念IPMAC地址

文章目录 网络基础网络的发展历程网络在哪里的问题网络协议栈各部分所处位置&#xff1a;网络协议栈各层的作用网络协议栈分层的目的 网络协议的概念 网络协议协议分层的好处理解各层之间直接通信OSI七层模型TCP/IP五层&#xff08;或四层&#xff09;模型 网络传输基本流程同局…

一个简单的servlet应用

第一个 Servlet 程序 1. 创建项目 使用 IDEA 创建一个 Maven 项目. 1.1、File -> New Project Name:javaservlet2 Location:选择要存放的路径 Language:Java Build system:Maven 点击Create按钮 1.2、Pom.xml引入依赖 依赖包来源&#xff1a; <dependencies> …

kafka 学习,笔记

前置条件&#xff0c;需要安装Java 1 去官网下载Kafka安装包 2 将安装解压缩到C盘根目录 3 在cmd命令行窗口进入kafka是根目录 cd c:\kafka_2.12-3.4.0 4 启动zookeeper服务 卡夫卡的运行需要zookeeper的支持&#xff0c;一般来说我们需要安装zookeeper&#xff0c;但是卡夫卡…

C语言程序设计研究生考试大纲

适用于全部C语言程序设计自命题院校 1.单选&#xff08;30分&#xff09;。 2.判断&#xff08;15分&#xff09;。 3.程序阅读与分析&#xff08;45&#xff09;。 4.编程题&#xff08;60分&#xff09;。 考试总分&#xff1a;150分 考试时间&#xff1a;3小时 考试内容 一…

浏览器点击下载太 LOW,如何提高下载操作的逼格?

文章目录 Part.I IntroductionChap.I 预备知识Chap.II URL Part.II 下载的方式Chap.I PythonChap.II WgetChap.III Curl Reference Part.I Introduction 用浏览器下载东西需要一个一个点击&#xff0c;当需要批量下载的时候&#xff0c;这样操作不免有些繁琐。本文整理了常用的…