RabbitMQ 发布订阅模式,routing路由模式,topic模式

news2025/1/22 18:02:34

发布订阅模式

一个消息可以由多个消费者消费同一个消息

 消费者1和2同时消费了该消息

举例

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("03-pubsub1", "fanout");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("03-pubsub1","", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

 消费者1和2同时消费了该消息,比如说消息是发短信,发邮件,  那么1和发短息  2可以发邮件

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("03-pubsub1", "fanout");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, "03-pubsub1", "");
    channel.basicConsume(queue, false,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

routing路由模式

就是说哪些让谁干

哪些让谁干区分出来

也可以让所有消费者都消费

选择性的让某个消费者消费,或者都消费

 生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key  routing模式需要路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("04-routing1","info", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

 消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "04-routing1", "info");
    channel.queueBind(queue, "04-routing1", "error");
    channel.queueBind(queue, "04-routing1", "waring");
    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个

    channel.queueBind(queue, "04-routing1", "trace");


    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 2  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

上面的只有消费者1消费了消息 

可以根据channel.queueBind(queue, "04-routing1", "trace"); 绑定消息  也可以让1和2都消费,

 

topic模式和Routing模式高度相识,用通配符的形式指定让谁消费,或者都消费

 生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key  routing模式需要路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("05-topic1","employee.save", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "05-topic1", "employee.*");

    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "05-topic1", "user.*");

    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 2  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

结果就是消费者1消费了消息

所有工作模式总结

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/479734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习与深度学习——通过knn算法分类鸢尾花数据集iris求出错误率并进行可视化

什么是knn算法? KNN算法是一种基于实例的机器学习算法,其全称为K-最近邻算法(K-Nearest Neighbors Algorithm)。它是一种简单但非常有效的分类和回归算法。 该算法的基本思想是:对于一个新的输入样本,通过…

Preftest测试

Preftest测试 Perftest 是一组基于uverbs编写的测试程序,是RDMA性能相关的micro-benchmark。可用于软硬件调优以及功能测试。 Perfest支持的测试 源码位置 linux-rdma/perftest: Infiniband Verbs Performance Tests (github.com) 安装 直接安装preftest sudo …

Flink作业任务的9种状态简单介绍

​ 当创建一个Flink任务后,该任务可能会经历多种状态。目前Flink给任务共定义了9种状态,包括:Created,Running,Finished,Cancelling,Canceled,Restarting,Failing&#x…

类加载器和双亲委派模型面试总结

类的生命周期和类加载的过程 在了解类加载器之前,我们先来了解一下一个类的生命周期和类加载的过程。 一个类完整的生命周期包括 加载、验证、准备、解析、初始化、使用和卸载,一共7个阶段。 类加载过程包括,加载、连接和初始化&#xff0…

腾讯云镜YunJing——Agent定时任务脚本分析

缘起 如果你有台腾讯云主机,会发现默认有个叫 YunJing 的进程。 把它kill掉后,发现一段时间又出现了 这是为什么捏? 分析定时任务配置文件 通过crontab定时任务目录, 会发现有个叫yunjing的配置文件。 */30 * * * * root /usr/local/qc…

PCIe物理层详细总结-PCIE专题知识(一)

目录 一、简介二、PCIe物理层结构及功能2.1 PCIe端对端连接方式2.2 PCIe组成2.2.1 逻辑层(Logic)1 发送逻辑2 接收逻辑 2.2.2 电气层(Electrical)1 物理层-电气(Physical Layer Electrical)2 数据传送----差分方式 2.2.3 PLP介绍 三、其他相关链接1、PCI总线及发展历程总结 一、…

ChatGPT 和 Elasticsearch:OpenAI 遇见私有数据(一)

作者:Jeff Vestal 结合 Elasticsearch 的搜索相关性和 OpenAI 的 ChatGPT 的问答功能来查询你的数据。 在此博客中,你将了解如何使用 Elasticsearch 将 ChatGPT 连接到专有数据存储,并为你的数据构建问答功能。 ChatGPT 和 Elasticsearch&…

【react从入门到精通】React JSX详解

文章目录 前言React技能树什么是JSXJSX的基本语法规则1.JSX必须有一个顶层元素2.JSX标签必须有一个结束标记,或者是自闭合的3.JSX中可以使用JavaScript表达式4.JSX中的样式和HTML中的样式有所不同5.JSX中的class属性必须写成className6.JSX中的style属性必须使用对象…

JavaWeb-HTML常用标签了解(一)

这里写目录标题 注释标签标题标签段落标签换行标签格式化标签图片标签超链接标签外部链接与内部链接 注释标签 <!-- 有注释 -->无注释ctrl / 快捷键可以快速进行注释/取消注释. 注意 注释不能传达负能量!!! 标题标签 有六个, 从 h1 - h6. 数字越大, 则字体越小. <h…

读SQL进阶教程笔记14_SQL编程要点

1. 消灭NULL 1.1. NULL惹人讨厌的原因 1.1.1. 进行SQL编码时&#xff0c;必须考虑违反人类直觉的三值逻辑 1.1.2. 指定IS NULL、IS NOT NULL的时候&#xff0c;不会用到索引&#xff0c;SQL语句执行起来性能低下 1.1.2.1. 1 NULL NULL2- NULL NULL3 &#xff0a; NULL …

BrainStat:用于全脑统计和多模态特征关联的工具箱

BrainStat工具箱在茗创科技往期推文【点此阅读→资源分享 | 利用机器学习进行高级MRI分析】中作过简单介绍。近日&#xff0c;NeuroImage杂志发布了题为BrainStat: a toolbox for brain-wide statistics and multimodal feature associations的预印版文章。这篇文章详细阐述了B…

测试开发备战秋招面试5-牛客刷题之链表篇

趁着5.1假期把牛客网的必刷的TOP101过一遍&#xff0c;额&#xff0c;题目量有点大&#xff0c;争取5天给刷完吧&#xff0c;哈哈&#xff0c;加油啦。再用雷布斯的那句话来激励自己吧&#xff1a;努力了那么多年,回头一望,几乎全是漫长的挫折和煎熬。对于大多数人的一生来说,顺…

学习Python需要注意什么?分享一下如何提升写代码的质量

作为程序员&#xff0c;每天都会面对各种各样的问题和挑战。需求的变更、代码的维护和修复、测试的问题&#xff0c;以及线上出现的各种异常等等&#xff0c;这些都需要我们不断地投入精力去解决。但是&#xff0c;我们不能只关注在解决问题上&#xff0c;还需要关注代码质量。…

IDE - Android Studio/Xcode历史版本下载

文章目录 前言Android Studio1. 历史版本下载2. 文件完整性校验 Xcode1. 历史版本下载2. 网络环境模拟工具2.1 下载2.2 安装2.3 卸载 最后 前言 最近升级开发工具老是遇到各种兼容性问题导致需要降回老版本&#xff0c;Xcode历史版本下载方便倒还好&#xff0c;Android Studio…

使用pands.rolling方法实现移动窗口的聚合计算

一个问题举例 假设有一个5天的收益数据&#xff0c;需要每3天求出一次平均值来达成某个需求&#xff1a; daterevenue2023-05-01102023-05-02202023-05-03302023-05-04402023-05-0550 1号、2号和3号的数据求一次平均值&#xff0c;2号、3号和4号的数据求一次平均值&#xff…

Ucore lab4

实验目的 了解内核线程创建/执行的管理过程了解内核线程的切换和基本调度过程 实验内容 练习一&#xff1a;分配并初始化一个进程控制块 1.内核线程及管理 内核线程是一种特殊的进程&#xff0c;内核线程与用户进程的区别有两个&#xff1a;内核线程只运行在内核态&#x…

内网渗透(六十一)之Kerberosating攻击

Kerberosating攻击 Kerberosating攻击发生在Kerberos协议的TGS_REP阶段,KDC的TGS服务返回一个由服务Hash 加密的ST给客户端。由于该ST是用服务Hash进行加密的,因此客户端在拿到该ST后可以用于本地离线爆破。如果攻击者的密码字典足够强大,则很有可能爆破出SPN链接用户的明文…

JAVA入坑之GUI编程

一、相关概述 GUI编程是指通过图形化的方式来实现计算机程序的编写&#xff0c;它可以让用户通过鼠标、键盘等设备来操作计算机&#xff0c;而不是通过命令行来输入指令。在Java中&#xff0c;GUI编程主要使用的是Swing和AWT两种技术 二、AWT 2.1介绍 AWT是Java提供的用来建立…

【构造】CF851div2 C. Matching Numbers

Problem - C - Codeforces 题意&#xff1a; 有1~2*n的一个排列&#xff0c;进行数与数之间两两匹配&#xff0c;问如何组合可以使n个 数对 aibi排列起来是一个连续序列&#xff0c;如果无解输出No 思路&#xff1a; 构造题&#xff0c;考虑将构造的条件特殊化 手推样例可知…

【数据生成】——Semantic Image Synthesis via Diffusion Models语义分割数据集生成论文浅读

语义分割&#xff0c;数据生成 摘要 Denoising Diffusion Probabilistic Models (DDPMs) 在各种图像生成任务中取得了显著的成功&#xff0c;相比之下&#xff0c;生成对抗网络 (GANs) 的表现不尽如人意。最近的语义图像合成工作主要遵循事实上的基于 GAN 的方法&#xff0c;…