消息队列——rabbitmq的不同工作模式

news2024/11/23 22:01:46

目录

Work queues 工作队列模式

 Pub/Sub 订阅模式

Routing路由模式

Topics通配符模式 

 工作模式总结


Work queues 工作队列模式

C1和C2属于竞争关系,一个消息只有一个消费者可以取到。

 代码部分只需要用两个消费者进程监听同一个队里即可。

两个消费者呈现竞争关系。

用一个生产者推送10条消息

        for(int i=0;i<10;i++)
        {
            String body=i+"hello rabbitmq!!!";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }

两个监听的消费者接收情况如下。 

 

 Pub/Sub 订阅模式

一个生产者发送消息后有两个消费者可以收到消息。

生产者把消息发给交换机,交换机再把消息通过Routes路由分发给不同的队列。

//发送消息
public class producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost(""); //设置ip地址。默认为127.0.0.1
        factory.setPort(5672);              //端口 默认值5672
        factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/
        factory.setUsername("yhy");        //用户名,默认值guest
        factory.setPassword("");     //密码,默认值guest
        //3.创建连接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();

        /*
        * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        * 参数:
        * 1.exchange  : 交换价名称
        * 2.type      : 交换机类型 ,有四种
        *               DIRECT("direct"),  定向
                        FANOUT("fanout"),   扇形(广播),发送消息到每一个与之绑定队列
                        TOPIC("topic"),     通配符的方式
                        HEADERS("headers"); 参数匹配
        *3.durable  :是否持久化
        * 4.autoDelete:是否自动删除
        * 5.internal: 内部使用。一般false
        * 6.arguments:参数
        * */
        //5.创建交换机
        String exchangeName="test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6.创建队列
        String queue1Name="test_fanout_queue1";
        String queue2Name="test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        /*
        * queueBind(String queue, String exchange, String routingKey)
        * 参数:
        * queue:队列名
        * exchange:交换机名称
        * routingKey:路由键,绑定规则
        *   如果交换机类型为fanout,routingKey设置为""
        * */
        //7.绑定队列和交换机
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body="日志信息:调用了findAll方法";
        //8.发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

 运行之后两个队列里面就会多一条消息

两个消费者的代码大同小异,只是绑定的队列名不同,这里只给其中一个

public class consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost(""); //设置ip地址。默认为127.0.0.1
        factory.setPort(5672);              //端口 默认值5672
        factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/
        factory.setUsername("yhy");        //用户名,默认值guest
        factory.setPassword("");     //密码,默认值guest
        //3.创建连接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name="test_fanout_queue1";
        String queue2Name="test_fanout_queue2";
        /*
        * basicConsume(String queue, boolean autoAck, Consumer callback)
        * 参数:
        *   1.队列名称
        *   2.autoAck:是否自动确认
        *   3.callback:回调对象
        * */
        //6.接收消息
        Consumer consumer=new DefaultConsumer(channel){
            /*
            * 回调方法,当收到消息后,会自动执行该方法
            * 1.consumerTag:标识
            * 2.envelope :获取一些信息,交换机,路由key...
            * 3.properties: 配置信息
            * 4.body: 数据
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台......");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);

        //不需要关闭资源
    }
}

 控制台输出有

Routing路由模式

对于特定级别的信息会发送到别的队列,如上图的error,在发送消息时也会有一个routing,只要和后面的队列对应上就可以发送到对应队列。 

 生产者代码:

//发送消息
public class producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost(""); //设置ip地址。默认为127.0.0.1
        factory.setPort(5672);              //端口 默认值5672
        factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/
        factory.setUsername("yhy");        //用户名,默认值guest
        factory.setPassword("");     //密码,默认值guest
        //3.创建连接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();

        /*
        * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        * 参数:
        * 1.exchange  : 交换价名称
        * 2.type      : 交换机类型 ,有四种
        *               DIRECT("direct"),  定向
                        FANOUT("fanout"),   扇形(广播),发送消息到每一个与之绑定队列
                        TOPIC("topic"),     通配符的方式
                        HEADERS("headers"); 参数匹配
        *3.durable  :是否持久化
        * 4.autoDelete:是否自动删除
        * 5.internal: 内部使用。一般false
        * 6.arguments:参数
        * */
        //5.创建交换机
        String exchangeName="test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6.创建队列
        String queue1Name="test_direct_queue1";
        String queue2Name="test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        /*
        * queueBind(String queue, String exchange, String routingKey)
        * 参数:
        * queue:队列名
        * exchange:交换机名称
        * routingKey:路由键,绑定规则
        *   如果交换机类型为fanout,routingKey设置为""
        * */
        //7.绑定队列和交换机
        //队列1绑定error

        channel.queueBind(queue1Name,exchangeName,"error");

        //队列2绑定error,info,warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String body="日志信息:调用了findAll方法,级别:info,error,warning";
        //8.发送消息
        channel.basicPublish(exchangeName,"error",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

消费者代码(两个消费者就绑定队列名不一样):

public class consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost(""); //设置ip地址。默认为127.0.0.1
        factory.setPort(5672);              //端口 默认值5672
        factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/
        factory.setUsername("yhy");        //用户名,默认值guest
        factory.setPassword("");     //密码,默认值guest
        //3.创建连接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name="test_direct_queue1";
        String queue2Name="test_direct_queue2";
        /*
        * basicConsume(String queue, boolean autoAck, Consumer callback)
        * 参数:
        *   1.队列名称
        *   2.autoAck:是否自动确认
        *   3.callback:回调对象
        * */
        //6.接收消息
        Consumer consumer=new DefaultConsumer(channel){
            /*
            * 回调方法,当收到消息后,会自动执行该方法
            * 1.consumerTag:标识
            * 2.envelope :获取一些信息,交换机,路由key...
            * 3.properties: 配置信息
            * 4.body: 数据
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存储到数据库");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);

        //不需要关闭资源
    }
}

Topics通配符模式 

发送消息时设定的routingkey会和后面的routingkey进行匹配。

生产者代码:

//发送消息
public class producer_Topic {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost(""); //设置ip地址。默认为127.0.0.1
        factory.setPort(5672);              //端口 默认值5672
        factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/
        factory.setUsername("yhy");        //用户名,默认值guest
        factory.setPassword("");     //密码,默认值guest
        //3.创建连接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();

        /*
        * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        * 参数:
        * 1.exchange  : 交换价名称
        * 2.type      : 交换机类型 ,有四种
        *               DIRECT("direct"),  定向
                        FANOUT("fanout"),   扇形(广播),发送消息到每一个与之绑定队列
                        TOPIC("topic"),     通配符的方式
                        HEADERS("headers"); 参数匹配
        *3.durable  :是否持久化
        * 4.autoDelete:是否自动删除
        * 5.internal: 内部使用。一般false
        * 6.arguments:参数
        * */
        //5.创建交换机
        String exchangeName="test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6.创建队列
        String queue1Name="test_topic_queue1";
        String queue2Name="test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        /*
        * queueBind(String queue, String exchange, String routingKey)
        * 参数:
        * queue:队列名
        * exchange:交换机名称
        * routingKey:路由键,绑定规则
        *   如果交换机类型为fanout,routingKey设置为""
        * */
        //7.绑定队列和交换机
        // routing key 系统的名称.日志的级别。
        //需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");

        String body="日志信息:调用了findAll方法";
        //8.发送消息
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

 消费者代码

public class consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost(""); //设置ip地址。默认为127.0.0.1
        factory.setPort(5672);              //端口 默认值5672
        factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/
        factory.setUsername("yhy");        //用户名,默认值guest
        factory.setPassword("");     //密码,默认值guest
        //3.创建连接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name="test_topic_queue1";
        String queue2Name="test_topic_queue2";
        /*
        * basicConsume(String queue, boolean autoAck, Consumer callback)
        * 参数:
        *   1.队列名称
        *   2.autoAck:是否自动确认
        *   3.callback:回调对象
        * */
        //6.接收消息
        Consumer consumer=new DefaultConsumer(channel){
            /*
            * 回调方法,当收到消息后,会自动执行该方法
            * 1.consumerTag:标识
            * 2.envelope :获取一些信息,交换机,路由key...
            * 3.properties: 配置信息
            * 4.body: 数据
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存储到数据库");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);

        //不需要关闭资源
    }
}

 工作模式总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/761222.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis进阶底层原理-主从复制

Redis的主从节点都会记录对方的信息&#xff0c;核心还包括ReplicationID 和 offset &#xff0c; ReplicationID &#xff1a; 主从节点实例的ID &#xff0c;redis内部就是通过这个id去识别主从节点。offset&#xff1a;数据同步偏移量&#xff0c;也就是从节点每次从主节点同…

3.6 Bootstrap 导航元素

文章目录 Bootstrap 导航元素表格导航或标签胶囊式的导航菜单基本的胶囊式导航菜单垂直的胶囊式导航菜单 两端对齐的导航禁用链接下拉菜单带有下拉菜单的标签带有下拉菜单的胶囊标签页与胶囊式标签页 Bootstrap 导航元素 本文将讲解 Bootstrap 提供的用于定义导航元素的一些选项…

使用thrift编写C++服务器、客户端

在上一节《Linux 下编译 thrift》中&#xff0c;我们成功编译出了thrift的库文件&#xff0c;本节我们来编写thrift的C服务器&#xff0c;客户端。 官网 https://thrift.apache.org/tutorial/cpp.html 有thrift的C例子。在我们之前下载下来的thrift 源码根目录的tutorial/cpp目…

MySQL高级管理

目录 一、指定主键的一种方式 1.1高级操作 1.2数据表高级操作,克隆表 1.2.1 克隆表名 1.2.2备份表内容 1.3复制表 1.4删除指令 方法一&#xff1a; 方法二&#xff1a; 删除速度 二、创建临时表 三、MySQL中6种常见的约束 3.1创建主表 3.2创建从表 3.3为主表test01添加…

[Docker异常篇]解决Linux[文件异常]导致开机Docker服务无法启动

文章目录 一&#xff1a;场景复现二&#xff1a;解决思路2.1&#xff1a; 对比其他节点docker配置2.2&#xff1a;试着修改为正常节点配置2.2&#xff1a;根据上面异常显示&#xff0c;不一定是配置不对&#xff0c;可能是文件系统有损坏 三&#xff1a;解决 -> 执行命令 mo…

【机器学习算法】奇异值分解(SVD)

文章目录 奇异值分解(SVD)1.理论部分1.1特征分解(ED)1.2 奇异值分解(SVD)求解U和V求解Σ 2.应用部分2.1图像压缩2.2图像数据集成分分析2.3 数据降维(PCA的一种解法) Reference 奇异值分解(SVD) 奇异值分解(Singular Value Decomposition) 是矩阵低秩分解的一种方法&#xff0c;…

太猛了!Web安全漏洞批量扫描框架

关注【Hack分享吧】公众号&#xff0c;回复关键字【230528】获取下载链接 工具介绍 一个应用于web安全领域的漏洞批量扫描框架&#xff0c;可被应用于但不限于如下场景&#xff1a; 0Day/1Day全网概念验证(在没有测试环境(各种商业、闭源软件)或懒得搭建测试环境的情况下&…

D. Binary String Sorting

Problem - 1809D - Codeforces 思路&#xff1a;最后得到的结果就是前面是一串0后面是一串1&#xff0c;那么我们可以枚举分界点&#xff0c;如果枚举到i&#xff0c;那么就将1~i变为0&#xff0c;将i1变为1,我们发现如果一个1在1~i中&#xff0c;如果他是第i-1个&#xff0c;那…

Redis进阶底层原理-Cluster集群底层

Redis实现高可用的方案有很多中&#xff0c;先了解下高可用和分区的概念&#xff1a; 高可用是指系统在面对硬件故障、网络故障、软件错误等意外问题时&#xff0c;仍能给客户端提供正常的服务&#xff0c;尽量的减少服务的阻塞、终端现象。在高可用的方案中一般会采用冗余备份…

《洛谷深浅》第五章---数组与数据批量存储

文章目录 前言一、小鱼比可爱二、小鱼的数字游戏三、冰雹猜想四、校门外的树五、旗鼓相当的对手六、旗鼓相当的对手总结 前言 本节主要学习一维数组 和 多维数组 后边的知识我觉得 可以试着了解并不要求你掌握这么难的题目 因为ACM更多都是思维题目 所以这里把重要的题目掌握就…

【多线程系列-03】深入理解java中线程的生命周期,任务调度

多线程系列整体栏目 内容链接地址【一】深入理解进程、线程和CPU之间的关系https://blog.csdn.net/zhenghuishengq/article/details/131714191【二】java创建线程的方式到底有几种&#xff1f;(详解)https://blog.csdn.net/zhenghuishengq/article/details/127968166【三】深入…

基于树莓派实现的IO-Link 项目

IO-Link 协议 &#xff08;IEC 61131-9&#xff09; 是从传感器或执行器到 IO-Link 主站的串行半双工点对点连接。目前IO-Link 的硬应已经越来越普及。国外产品以巴鲁夫为代表。如何开发IO-link 产品&#xff1f;可以参考国外的一些开源项目。 国外有人开发了开发一个IO-Link主…

soundfile torchaudio 读取音频文件

soundfile 和 torchaudio 读取音频文件后的数据格式不同&#xff0c;前者是numpy&#xff0c;后者是tensor。前者读取后可以直接用于一些python的基础函数输入&#xff0c;后者用于pytorch的一些函数的应用。两者互换用途时候需要进行格式转换。 import soundfile as sf impor…

智能指针使用及详细解析

文章目录 智能指针概念为什么使用智能指针智能指针使用智能指针的常用函数get() 获取智能指针托管的指针地址.reset() 重置智能指针托管的内存地址&#xff0c;如果地址不一致&#xff0c;原来的会被析构掉 auto_ptrunique_ptrshared_ptr**shared_ptr的原理**引用计数的使用构造…

Gradle 构建工具 #5 又冲突了!如何理解依赖冲突与版本决议?

⭐️ 本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 和 [BaguTree Pro] 知识星球提问。 Gradle 作为官方主推的构建系统&#xff0c;目前已经深度应用于 Android 的多个技术体系中&#xff0c;例如组件化开发、产物构建、单元测试等…

STM32(HAL库)驱动SHT30温湿度传感器通过串口进行打印

目录 1、简介 2、CubeMX初始化配置 2.1 基础配置 2.1.1 SYS配置 2.1.2 RCC配置 2.2 软件IIC引脚配置 2.3 串口外设配置 2.4 项目生成 3、KEIL端程序整合 3.1 串口重映射 3.2 SHT30驱动添加 3.3 主函数代 3.4 效果展示 1、简介 本文通过STM32F103C8T6单片机通过HAL库…

Spring Batch之读数据库——JdbcCursorItemReader之自定义RowMapper(三十七)

一、自定义RowMapper 详情参考我的另一篇博客&#xff1a; Spring Batch之读数据库——JdbcCursorItemReader&#xff08;三十五&#xff09;_人……杰的博客-CSDN博客 二、项目实例 1.项目框架 2.代码实现 BatchMain.java: package com.xj.demo28;import org.springfram…

代码随想录第27天 | 455.分发饼干 ● 376. 摆动序列 ● 53. 最大子序和

455.分发饼干 /*** param {number[]} g* param {number[]} s* return {number}*/ var findContentChildren function(g, s) {let a0let b0let count0g.sort((a,b)>a-b)s.sort((a,b)>a-b)while(a<g.length&&b<s.length){if(s[b]>g[a]){countba}else{b}…

STM32(HAL库)软件IIC驱动OLED

目录 1、简介 2、CubeMX初始化配置 2.1 基础配置 2.1.1 SYS配置 2.1.2 RCC配置 2.2 软件IIC引脚配置 2.3 项目生成 3、KEIL端程序整合 3.1 OLED驱动添加 3.3 主函数代 3.4 效果展示 1、简介 本文通过STM32F103C8T6单片机&#xff08;HAL库&#xff09;通过软件IIC方式…

java linux服务器环境搭建

安装 jdk 下载jdk: wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24http%3A%2F%2Fwww.oracle.com%2F; oraclelicenseaccept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/8u141-b15/336fa29ff2bb4ef291e347e091f7f…