【RabbitMQ(day2)】默认(直连)交换机的应用

news2025/1/19 11:03:22

文章目录

  • 一、第一种模型(Hello World)
  • 二、第二种模型(work queue)
    • 自动确认机制的后果和公平分配
  • 三、阐述默认交换机

这篇博客是以下资料学后的总结:
不良人的RabbitMQ的教学视频
官方启动教程
RabbitMQ中文文档

一、第一种模型(Hello World)

在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序;
  • C:消费者:消息的接受者,会一直等待消息的到来。
  • queue:消息队列,图中红色部分。类似于一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  1. 开发生产者
		// 创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置连接Rabbitmq的主机
        connectionFactory.setHost("192.168.248.135");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        // 设置用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");

        // 获取连接对象
        Connection connection = connectionFactory.newConnection();

        // 获取连接中的通道对象
        Channel channel = connection.createChannel();

        // 通过通道绑定对应的消息队列
        // 参数1:队列的名称  如果队列不存在会自动创建
        // 参数2:用来定义队列特性是否需要持久化,true:持久化队列,false即不持久化
        // 参数3:exclusive 是否独占队列
        // 参数4:是否在消费完成后自动删除队列
        // 参数5:额外参数
       // 这个不加是没关系的,只是表示我的Rabbitmq中是有hello消息队列的,消费者产生的
channel.queueDeclare("hello",false,false,false,null);

        // 发布消息
        // 参数1:交换机名称;参数2:路由键名称;参数3:传递消息额外设置;参数4:消息的具体内容
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

        channel.close();
        connection.close();
  1. 消费者开始消费
 		// 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机和端口
        connectionFactory.setHost("192.168.248.135");
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        // 设置用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");

        // 创建连接对象
        Connection connection = connectionFactory.newConnection();

        // 创建通道
        Channel channel = connection.createChannel();
        // 通道绑定对象
        channel.queueDeclare("hello",false,false,false,null);

        // 消费消息
        // 参数1:消费哪个队列的消息  队列名称
        // 参数2:开始消息的自动确认机制
        // 参数3:消费消息时的回调接口
        String hello = channel.basicConsume("hello", true, new DefaultConsumer(channel){
            // 最后一个参数:消息队列中取出的消息
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("转化成对应的字符串: " + new String(body));
                System.out.println("============");
            }
        });
        /*channel.close();
        connection.close();*/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vR02ySmZ-1690641488995)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230727224814359.png)]

这里需要注意,由于这里是多线程下生产、消费消息,所以在消费时不应该提前关闭通道,不然无法监听到队列中的数据。

下面是证明,看看各线程的名称就知道了。

在这里插入图片描述

需注意:Junit5之前是不支持多线程的。

  1. 参数说明
channel.queueDeclare("hello",true,false,true,null);
"参数1":用来声明通道对应的队列;
"参数2":用来指定是否持久化队列
"参数3":用来指定是否独占队列,一般为false
"参数4":用来指定是否自动删除队列
"参数5":对队列的额外配置
    
	参数1:交换机名称;
    参数2:路由键名称;
    参数3:传递消息额外设置;
    参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"I love you~".getBytes());

二、第二种模型(work queue)

Work Queues,也被称为(Task Queues 任务模型)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-55cGZian-1690641488996)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230729113349745.png)]

角色:

  • Sender:生产者:任务的发布者
  • Consumer:消费者,领取任务并且完成任务

生产者代码

        // 获取连接对象
        Connection conn = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = conn.createChannel();

        // 通过通道声明队列
        channel.queueDeclare("work", true, false, false, null);

        for (int i = 0; i < 10; ++i) {
            // 生产消息
            channel.basicPublish("", "work", null, ("(" + i + ")Hello worke queue~").getBytes());
        }
        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(conn, channel);

消费者代码

Runnable myRunnable = new Runnable() {
            @Override
            public void run() {
                Connection conn = RabbitMQUtils.getConnection();
                try {
                    final int[] cnt = new int[1];
                    cnt[0] = 0;
                    Channel channel = conn.createChannel();
                    channel.queueDeclare("work", true, false, false, null);
                    System.out.println("当前线程:" + Thread.currentThread().getName());
                    channel.basicConsume("work", true, new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
                            System.out.println("消费者-" + consumerTag + ":" + new String(body));
                            System.out.println("================================================");
                            cnt[0]++;
                        }
                    });
                    System.out.println(Thread.currentThread().getName() + ":" + cnt[0]);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        Thread work1 = new Thread(myRunnable, "work-001");
        Thread work2 = new Thread(myRunnable, "work-002");
        Thread work3 = new Thread(myRunnable, "work-003");
        work1.start();
        work2.start();
        work3.start();

测试结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CgpB8a9p-1690641488997)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230729125901348.png)]

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j1M02g6j-1690641488998)(C:\Users\myz03\AppData\Roaming\Typora\typora-user-images\image-20230729190556081.png)]

下面是官方给的

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

自动确认机制的后果和公平分配

channel.basicConsume("work", true, new DefaultConsumer(channel)
// 这里的第二个参数是指是否默认提交

上面 work queue 实现会出现两个问题:

问题一:这里将 autoAcked 参数值设置为了 true,即消费者收到消息队列调度的消息后不管有没有消费成功都立即返回 ACK 确认,消息队列只顾着轮询分配去了。这个时候的话会引发一个问题:当消费者突然宕机了,那还没处理的消息就不会被处理,即消失了。比如一个消费者被分配到了五个消息,但是只处理了三个就嘎了,那剩下的俩个就处理不了了。

问题二:在两个worker的情况下,当所有奇数消息都很重,偶数消息都很轻时,一个worker将一直很忙,而另一个几乎不做任何工作。但是RabbitMQ对此一无所知,仍然会均匀地分发消息。我们应该遵循能者多劳,充分利用资源,但轮询方式总是这么的不合我们的胃口。

解决方案

  • 首先得将自动提交设置为 false,手动提交就好了;

  • 每一次给空闲发消费者一个消息,即设置 prefetchCount = 1,这样的话不会让能者出现不工作,懒者一堆事没做的情况。当消费者死亡(即通道关闭、连接被关闭、或者TCP连接丢失等情况)还没有发送ACK,那有其他消费者在线的话,消息队列会将消息迅速交付给另一个消费者,从而确保消息没有丢失。

    具体解决方案的伪代码如下:

// 配置每一次只能执行一个小希
channel.basicQos(1);
// 关闭手动提交
channel.basicConsume("work", false, new DefaultConsumer(channel) {
       @Override
       public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
        System.out.println("消费者-" + consumerTag + ":" + new String(body));

         // 参数1:确认队列中哪些具体消息 参数2:是否开启多个消息同时确认
                     channel.basicAck(envelope.getDeliveryTag(), false);
                        }
});

三、阐述默认交换机

可以在 RabbitMQ中文文档-默认交换机 去了解更多AMQP协议的一些内容。

默认交换机的本质是直连交换机,当你添加一个队列的时候,这个队列第一反应就是绑定默认交换机,而绑定(binding)的路由键名称和队列名称是一致的。

上面两种模型(RabbitMQ官方教程阐述的)Hello World模型和Work Queues模型,在官方教程中没有指出使用了交换机,但是本质都是绑定了默认交换机的,也就是直连交换机,它也是支持多消费者的负载均衡的。

首先必须知道的是:使用默认交换机时,队列是在消费者端创建的(可以说是用户本身吧),而不是生产者去创建的。当生产者发送一条消息到 RabbitMQ 时,RabbitMQ 会根据消息的路由键(在使用默认交换机的情况下,路由键即为队列名称)来查找是否已经存在该队列,如果队列不存在,则会丢弃该消息。

basicPublish 方法的第二个参数为路由键名称和 basicConsume 方法的第一个参数为队列名称也是可以看出来的。

总的来说就是生产者不需要关心队列的创建,这是消费者需要声明指定的,默认交换机会绑定声明的消息队列的,所以生产者该发发,创建的任务就不用管了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/809095.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

php 生成连续递增的Excel列索引 可以控制多少列

今天遇到需要生成对应的下拉&#xff0c;下拉的类 需要PHP 输出一个数组 如 A、B、C、D 到Z 列后 Excel 的列就变成 AA 、AB、 AC 依次类推 查询得知 Excel 最大列数 16384 最大行数 1048576 下面演示3000列或行 <?php$idx [idx > 0];for ($i …

WIZnet W6100-EVB-Pico 静态IP配置教程(二)

W6100是全球第一款支持IPv4/IPv6双核的新一代全硬件以太网TCP/IP协议栈控制器。W6100在WIZnet核心专利技术——全硬件TCP/IP协议栈IPv4的基础上增加了IPv6&#xff0c;解决了嵌入式以太网的接入问题&#xff0c;简单易用&#xff0c;安全稳定&#xff0c;是物联网设备的首选解决…

某文化馆三维建模模型-glb格式-三维漫游-室内导航测试

资源描述 某文化馆某个楼层的三维建模模型&#xff0c;glb格式&#xff0c;适用于three.js开发&#xff0c;可用来做一些三维室内漫游测试和室内导航测试 资源下载地址

Java框架学习(三)spring5高级49讲

文章目录 1、BeanFactory与ApplicationContext2、BeanFactory与ApplicationContext的容器实现BeanFactory的容器实现后处理器排序 ApplicationContext的容器实现 3、Bean的生命周期Bean后处理器 4、常见的Bean后处理器5、常见BeanFactory后处理器6、Aware和InitializingBean接口…

移动零——力扣283

题目描述 双指针 class Solution{ public:void moveZeroes(vector<int>& nums){int n nums.size(), left0, right0;while(right<n){if(nums[right]){swap(nums[right], nums[left]);left;}right;}} };

Golang之路---02 基础语法——常量 (包括特殊常量iota)

常量 //显式类型定义const a string "test" //隐式类型定义const b 20 //多个常量定义 const(c "test2"d 2.3e 27)iota iota是Golang语言的常量计数器&#xff0c;只能在常量表达式中使用 iota在const关键字出现时将被重置为0&#xff0c;const中每新…

Flowable-任务-接受任务

定义 接收任务是一种简单任务&#xff0c;它会等待对应消息的到达。当流程执行到达接收任务时&#xff0c;流程状态会持 久化到数据库中&#xff0c;这意味着该流程将一直处于等待状态&#xff0c;直到引擎接收到一个特定的消息为止&#xff0c;该消息 将触发离开接收任务继续…

Echarts 文字太长用省略号代替

xAxis: [{type: category,data: [materialUserEchartsDate.value[0] ? materialUserEchartsDate.value[0].name : ,materialUserEchartsDate.value[1] ? materialUserEchartsDate.value[1].name : ,materialUserEchartsDate.value[2] ? materialUserEchartsDate.value[2].na…

JAVASE---数据类型与变量

1. 字面常量 常量即程序运行期间&#xff0c;固定不变的量称为常量&#xff0c;比如&#xff1a;一个礼拜七天&#xff0c;一年12个月等。 public class Demo{ public static void main(String[] args){ System.Out.println("hello world!"); System.Out.println(…

HDFS中数据迁移的使用场景和考量因素

HDFS中数据迁移的使用场景和考量因素 数据迁移使用场景数据迁移要素考量HDFS分布式拷贝工具-DistCpdistcp的优势性能命令 数据迁移使用场景 冷热集群数据同步、分类存储集群数据整体搬迁 当公司业务迅速的发展&#xff0c;导致的当前的服务器数量资源出现临时紧张的时候&#…

2,继承、内联函数、虚继承、友元、构造析构函数、初始化列表

继承 2.1结构体成员权限2.1.1访问权限2.1.2类与结构体 2.2类的成员函数2.2.1类内规则2.2.2类成员内联函数inline 2.3类的继承2.3.1类的继承与成员函数2.3.2类的多继承2.3.2.1类的多继承&#xff1a;菱形问题提出 2.3.3类的虚继承&#xff08;关键字virtual&#xff09; 2.4友元…

Zookeeper入门介绍

Zookeeper在我本次系统的学习之前是已经开始使用了&#xff0c;但是并不理解Zookeeper到底是什么&#xff0c;有什么作用&#xff0c;你或许跟我有一样的疑惑&#xff0c;本专栏将会解决这些疑惑。 目录 Zookeeper介绍&#xff1a; zookeeper特点&#xff1a; 数据结构&#x…

Python 进阶(四):日期和时间(time、datetime、calendar 模块)

❤️ 博客主页&#xff1a;水滴技术 &#x1f338; 订阅专栏&#xff1a;Python 入门核心技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; 文章目录 1. time模块1.1 获取当前时间1.2 时间休眠1.3 格式化时间 2. datetime模块2.1 获取当前…

【Docker】Docker应用部署之Docekr容器安装Nginx

目录 一、搜索镜像 二、拉取镜像 三、创建容器 四、测试使用 一、搜索镜像 docker search nginx 二、拉取镜像 docker pull nginx # 不加冒号版本号 默认拉取最新版 三、创建容器 首先我们需要在宿主机创建数据卷目录 mkdir nginx # 创建目录 cd nginx # 进入目录 mkd…

【SAP Abap】记录一次SAP长文本内容通过Web页面完整显示的应用

【SAP Abap】记录一次SAP长文本内容通过Web页面完整显示的应用 1、业务背景2、实现效果3、开发代码3.1、拼接html3.2、显示html3.3、ALV导出Excel 1、业务背景 业务在销售订单中&#xff0c;通过长文本描述&#xff0c;记录了一些生产备注信息&#xff0c;如生产标准、客户要求…

【Java面试丨企业场景】常见技术场景

一、单点登录怎么实现的 1. 介绍 单点登录&#xff08;Single Sign On&#xff0c;SSO&#xff09;&#xff1a;只需要登录一次&#xff0c;就可以访问所有信任的应用系统 2. 解决方案 JWT解决单点登录问题 用户访问应用系统&#xff0c;会在网关判断Token是否有效如果Tok…

位运算 剑指offer15 二进制中1的个数 搜索算法:55-II 平衡二叉树 数值的整数次方 39数组中出现次数超过一半的数字

可能会引起死循环的解法&#xff1a; 看最右边一位是不是1&#xff0c;然后将输入的整数右移一位&#xff0c;再判断最右边一位&#xff08;即倒数第二位&#xff09;是否为1&#xff0c;接着再右移&#xff0c;知道整数移动到0为止 这个解法&#xff0c;把整数右移一位和把整数…

mfc140u.dll丢失怎样修复?这三个方法的可以修复

最近遇到了mfc140u.dll丢失的问题&#xff0c;让我感到非常困扰。在使用某个软件时&#xff0c;突然弹出了一个错误提示&#xff0c;说是mfc140u.dll文件不存在&#xff0c;导致该软件无法正常运行。一开始我并不知道这个文件是什么&#xff0c;也不知道为什么会丢失。于是我开…

7.29训练总结

CodeForces - 1609E 这种使得整个串不包含子串’abc’的题目&#xff0c;发现可以用线段树维护 #include<bits/stdc.h> using namespace std; const int maxn1e55; #define lson now<<1 #define rson now<<1|1 struct seg {int a,b,c;int ab,bc,abc; }tr[m…

使用开源免费AI绘图工具神器-Stable Diffusion懒人整合包

使用开源免费AI绘图工具神器-Stable Diffusion懒人整合包 Stable Diffusion 是什么 Stable Diffusion (简称 SD) 是一款开源免费的以文生图的 AI 扩散模型&#xff0c;它和付费的 Midjourney 被人称为当下最好用的 AI 绘画工具。你在网上看到的绝大多数优秀 AI 图片作品&…