一、MQ概念
1.1、异步通讯和同步通讯
1.2、同步调用和异步调用
1.2.1、同步调用
1.2.2、异步调用
1.3、消息队列的作用
1.3.1、流量削峰/限流
1.3.2、 应用解耦
1.3.3、异步处理
1.4、消息队列的两种模式
1.4.1、点对点模式
1.4.2、发布/订阅模式
二、RabbitMQ
2.1、简介
2.2、特征
2.3、AMQP标准及模型简介
三、Rabbitmq架构图及核心概念
3.1、名词解释
3.2、框架图流程
Producer发送消息流程
Consumer接收消息流程
四、RabbitMQ单节点部署
相关资源包的准备
4.1、安装工作
4.1.1、安装socat
4.1.2、安装erlang
4.1.3、安装RabbitMQ
4.1.4、测试时候安装成功
编辑 4.2、管理界面及管理账号
4.2.1、安装管理插件
4.2.2、测试管理界面
4.2.3、查看默认用户
4.2.4、添加用户设置角色及权限
4.2.5、拓展(v_host的创建)
五、RabbitMQ的工作模式
5.1、简单模式
5.1.1、生产者(Producer)
5.1.2、消费者(Consumer)
测试
5.2、工作队列模式 (working queue)、
5.2.1、生产者
5.2.2、消费者
测试
5.3、扇出模式/发布订阅模式(fanout)
内置交换机四种类型
5.3.1、生产者
5.3.2、消费者
测试
5.4、路由模式/直接路由
5.4.1、生产者
5.4.2、消费者
测试
5.5、话题模式
5.5.1、生产者
5.5.2、消费者
测试
六、RabbitMQ集群搭建
6.1、克隆三台虚拟机
6.2、修改hosts文件
6.4、启动RabbitMQ 服务
6.5、查看集群状态
6.6、创建用户
6.7、查看管理面板
测试
七、镜像集群配置
7.1、命令配置(不推荐)
7.2、 管理界面配置
测试
八、 Haproxy+RabbitMQ高可用
8.1、环境配置及Haproxy的安装
8.2、创建配置文件
8.3、配置启动文件
8.4、启动Haproxy测试
一、MQ概念
消息队列(MQ)是一种通信机制,用于在不同系统或应用程序之间传递消息。它通过将消息存储在队列中来实现异步通信,使得发送者和接收者可以在不同时间操作
其中RabbitMQ也是企业开发中应用非常广泛的“高性能的异步通讯组件”
常见的MQ框架有:Kafka RabbitMQ RocketMQ等等,后续Kafka和RocketMQ也会出的
1.1、异步通讯和同步通讯
通俗来说的话:
同步通信好比A给B打电话,A和B电话打通了,也就是A和B之间建立了同步通讯,A说的话这时候B可以立马听到,A做的什么事B也可以看到,这就是一种实时的通信。也就是说同一时刻,你只能和一个人通话,这时候C和D也想和你打电话,但是打不通了。但是异步就不一样了,可以同时和好几个人发,并发操作
异步通讯就好比A给B发一个微信信息,B这个时候忙,或者在做别的事,没有回复A,所以A这个时候没有收到B的回复,过了一段时间B看见了,回复你了,你才可以看见消息,也就是说异步的这种通讯,不可能实时得到结果,所以时效性有点差
然后将这个场景映射到具体的业务代码中就是:
之前我们微服务之间业务的调用就是使用的Feign调用的,那openFeign调用就是一种同步调用,也就是说,我发送一个请求需要实时的等待你返回结果,那这个等待过程就是阻塞的,企业中的登录不是简单的,其实是登录完之后还需要很多事要做,比如记录登录信息,发送短信等等。。。等这些一系列操作完成才算是登录完成,而这个过程,用户需要等很长时间,这样的话性能就比较低,在高并发情况下可能就应对不了了,所以就使用异步通讯方式(MQ)
用户登录成功之后,先不去做后面的时候(记录登录信息,发送短信什么的等等),而是先发一条消息到MQ中,也就是把登录信息发送到MQ中,告诉MQ用户已经登录成功了,这个时候登录业务就结束了,也个时候,登录这个业务就会变得很短,并发能力就会大大提高,然后后面的其他操作就回去监听MQ的消息,收到消息之后,就去做各自的事
下面就是专业术语了
同步通讯指的是发送方在发送请求后,需要等待接收方处理完请求并返回响应,然后才能继续执行其他操作。这种模式确保了请求和响应之间的顺序性和一致性。同步通讯的特点包括:
- 等待时间:发送方在等待接收方处理请求的过程中,可能会处于阻塞状态。
- 实时性:通讯过程实时进行,通常用于需要立即得到响应的场景。
- 例子:HTTP 请求/响应、函数调用等。
优点:
- 确保请求和响应的顺序。
- 实现简单,逻辑清晰。
缺点:
- 发送方需要等待接收方完成处理,可能会造成延迟。
- 不适合高延迟或长时间处理的操作。
异步通讯则指发送方发送请求后,不必等待接收方处理完请求再继续执行其他操作。发送方通常会立即收到一个确认消息(如任务已接收),然后继续处理其他任务。接收方会在完成处理后,将结果返回给发送方。异步通讯的特点包括:
- 非阻塞:发送方可以继续执行其他任务,不会因等待响应而被阻塞。
- 解耦:发送方和接收方可以独立地处理请求和响应,适合于长时间或不确定的处理时间。
- 例子:消息队列(如 RabbitMQ)、回调函数、事件驱动编程等。
优点:
- 提高系统的并发能力和效率。
- 避免长时间等待导致的阻塞,适合处理长时间或不确定性任务。
缺点:
- 实现较复杂,涉及消息的存储、状态管理等。
- 可能需要额外的机制来确保消息的顺序和一致性。
应用场景
- 同步通讯适用于需要快速响应且处理时间较短的场景,如实时数据请求和响应。
- 异步通讯适用于处理长时间操作、需要高吞吐量或需要解耦的场景,如任务调度和日志处理。
1.2、同步调用和异步调用
1.2.1、同步调用
在同步调用中,调用者发起请求后会等待直到操作完成并返回结果。这意味着调用者在等待过程中会被阻塞,直到获得结果或超时。这种方式简单,但可能导致性能瓶颈,特别是在网络延迟或长时间操作时
同步调用的优势是什么?
时效性强,等待到结果后才返回
同步调用的问题是什么?扩展性差
性能下降
级联失败问题
拿支付做一个案例:用户去做支付时,会调用支付服务,这个服务要干三件事,第一件事就是去扣减用户的余额,只有当余额充足的时候,才会去干接下来的事;余额不充足的时候,就没必要往下做了,所以这个调用我觉得采用同步好像还可以,因为我要扣余额,我是不是要立刻知道你的结果,你告诉我是成功了还是失败了,如果是失败了,就没必要往下走了,如果说我这里采用异步我去扣钱,扣完不管三七二十一,我去把支付状态改为成功,这显然是不合理的,所以这个调用用同步是没有问题的,我等待你的执行结果,执行完了,我再执行接下来的操作,执行更新支付状态,更新订单状态等待,但是这样做会存在一系列的问题
第一个问题就是业务耦合的问题,那什么是耦合呐,就是说:比如咱这个支付服务,你来支付最重要的操作就是扣减余额和更新支付状态,但是后续的操作比如更新业务订单状态就是就是多余的额外的操作,和支付服务本身是没有什么关系的,但是你的状态要修改所以我不得不调用你;但是调就调呗,不就是加一行代码就调用了呗,一看还行,但是可以思考一下,业务是不是会变化的呀,但是中途产品经理说不行,这个支付服务应该加一个逻辑,当用户登录成功了,你应该发一个短信通知一下,这个时候你的支付服务就要加一个逻辑,给用户发送短信通知,支付业务就要改代码,就要加东西,调用短信服务,发送短信;当又有新的需求来了,就又要在支付业务中修改逻辑,修改代码;;只要需求在变更,你的支付业务的代码也在一直变,这显然是不行的,违反原则的,,,这就是代码的耦合导致的,导致拓展性很差,我们都知道java代码是开闭原则的,《面向修改关闭,面向拓展开放》,你现在就一直在修改就是有问题的,这是第一点!!!!!第二个问题就是性能上也有问题,这种基于openfeign调用的都是同步的,当你嗲用扣减余额的时候,你的支付功能只能等待,啥也干不了,直到用户服务执行完,然后执行下一步,又要等,然后在下一步,继续等,你在等待的过程中CPU也在等待,线程也被占用着。。比如每次执行都是耗时50ms,最终的话可能耗时好几百毫秒,甚至更多,用户一点这个操作,用户卡半天,不就影响用户体验了吗,这就是第二个问题,性能太差,甚至有时候微服务项目还不如单体项目,性能会下降
第三个问题就是级联失败,同步调用之间会有影响,当其中一个执行完没问题才会继续往下执行,比如现在交易服务出现了故障,抛出异常,那么这整个业务就会全部失败,💴都扣完了,又给人家还回去,显然是不合适的,要是这个问题一直没有解决,就会影响拖累支付服务,导致支付服务资源耗尽,也出现故障,出现级联失败
但是问题这么多为什么还要使用同步调用,其实有一些业务不得不使用,必须使用同步调用,就是说我下面的操作要依赖上面的结果(也就是扣减余额是否成功,才可以执行下面的操作,但是下面的操作之间没有关系,没有必要同步了,后面的业务慢慢做就行了,跟用户服务没有一点关系,所以没有必要同步和等待了)
1.2.2、异步调用
在异步调用中,调用者发起请求后立即返回,不会等待操作完成。操作在后台进行,调用者可以继续执行其他任务。这种方式更灵活,可以提高系统的并发性能,但实现起来可能会更复杂
采用异步调用可以解决上述的问题
支付服务不再同步调用业务关联度低的服务,而是发送消息通知到Broker
之前我们都是同步调用的,现在就没有必要的,使用异步就行,首先第一步,扣余额一定是同步的,更新支付状态自己服务更新就行属于支付服务里本身的调用,没必要异步,直接执行就行,所以前两个同步就行了,但是从更新订单状态就没必要同步了,这都属于跨服务调用了,需要等待,,,,所以这个时候就需要发一个消息到消息代理,到这里支付服务就完全结束了,不用往下走了,下面的就是自己服务的事了,比如交易服务监听消息当收到消息之后就自己去更新消息,这个时候产品经经理再来增加需求,比如发送短信,增加积分,这些服务本来就有的,这个时候就让短信服务和积分服务自己去监听消息,而支付服务已经结束了,和我没关系了,不需要再发送消息了,支付服务本身的代码也不需要修改了,你小子提再多需要我也不该代码,我只发送消息!!!然后那两个接收到消息后就自己去执行短信发送和积分增加等操作,拓展性就提高了,性能也变好了,当我支付服务完成,你后续的订单呀,短信呀,什么的耗时再长也和我支付服务没有一点关系,出现故障也和我支付没关系,异常就隔离开了。。。。。。也能保证最终数据一致性(这个先不讲了)
1.3、消息队列的作用
1.3.1、流量削峰/限流
举个例子来说:如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单是绰绰有余的,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作,系统是处理不了的,只能限制订单超过一万后不允许用户下单。但是使用消息队列做缓冲,我们就可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到成功的操作,但是比不能下单的体验要好。
我们再来一个场景,现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理1000个请求。然后多出来的1000个请求,可能就会把我们整个系统给搞崩了,所以我们可以写到消息队列中:
系统B和系统C根据自己能够处理的请求数去消息队列中拿数据,这样即便每秒有8000个请求,只用把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩
1.3.2、 应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来维修。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,订单用户感受不到物流系统的故障,提升系统的可用性。
再来一个场景:现在我有一个系统A,系统A可以产生一个
userId
然后,现在有系统B和系统C都需要这个userId去做相关的操作
过了几天,系统B的负责人告诉系统A的负责人,现在系统B的
某某
这个接口不再使用了,让系统A别去调它了,系统A的负责人说"行行行,你小子,那我就不调用你了。",于是就把调用系统B接口的代码给删掉了
又过了几天,系统D的负责人接了个需求,也需要系统A的userId,于是就去找系统A的负责人说:“A老哥,我需要用你的userId,你到时候调一下我的接口呗”,系统A说:“没问题,都是小事啦,这就搞!!!”
接下来的日子
又过了几天,系统E的负责人过来了,告诉系统A,需要userId。
又过了几天,系统B的负责人过来了,告诉系统A,还是重新调那个接口吧。
又过了几天,系统F的负责人过来了,告诉系统A,需要userId。
于是系统A的负责人,每天都被这给骚扰着,改来改去,改来改去…….
还有另外一个问题,调用系统C的时候,如果系统C挂了,系统A还得想办法处理。如果调用系统D时,由于网络延迟,请求超时了,那系统A是反馈
失败呐
还是重试呐??最后,系统A的负责人,觉得隔一段时间就改来改去,没意思,于是就跑路了。
然后,公司招来一个大佬,大佬经过几天熟悉,上来就说:将系统A的userId写到消息队列中,这样系统A就不用经常改动了。为什么呢?
系统A将userId写到消息队列中,系统C和系统D从消息队列中拿数据。这样有什么好处?
系统A只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统A一点都不关心。
即便现在系统D不想要userId这个数据了,系统B又突然想要userId这个数据了,都跟系统A无关,系统A一点代码都不用改。想要自己去拿,别来找我
这样一来,系统A与系统B、C、D都解耦了
系统D拿userId不再经过系统A,而是从消息队列里边拿。系统D即便挂了或者请求超时,都跟系统A无关,只跟消息队列有关。
1.3.3、异步处理
和上面异步调用基本一样
有些服务间调用时异步的,例如A调用B,B需要花很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅,但是使用消息总先,可以很方便的解决这个问题:A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用给B提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息
1.4、消息队列的两种模式
1.4.1、点对点模式
也就是一对一,消费者主动拉取数据,消息收到后消息清除
生产者生产消息发送到Queue中,然后消费者从Queue中取出并且消费消息,消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
1.4.2、发布/订阅模式
也就是一对多,消费者消费数据之后不会清除消息
生产者(发布)将消息发布到消息队列中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到消息队列中的消息会被所有订阅者消费
二、RabbitMQ
2.1、简介
RabbitMQ 是一个流行的开源消息代理(Message Broker),用于在不同的应用程序或系统之间异步传递消息。它基于 AMQP(Advanced Message Queuing Protocol)协议,提供了高可靠性、高可用性和灵活的消息传递机制
2.2、特征
异步消息:
支持多种消息传递协议、消息队列(queue1-8)、传递确认、灵活的队列路由* #、多种交换类型(direct topic fanout)。
支持多种开发语言:
例如:Java、.NET、PHP、Python、JavaScript、Ruby、Go等
支持分布式:
部署为集群以获得高可用性和吞吐量;
支持插件:
RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。
可视化管理界面:
具有用于管理和监控RabbitMQ 的 UI
2.3、AMQP标准及模型简介
官网:amqp-concepts
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
模型简介:
消息发布到Exchange(交换机),这通常被比作邮局或邮箱。然后,交换使用称为 绑定的规则将消息副本分发(路由)到队列。然后代理要么将消息传递(推送)给订阅队列的消费者,要么消费者按需从队列中获取/拉取消息
三、Rabbitmq架构图及核心概念
3.1、名词解释
Producer(生产者):生成消息并将其发送到RaabitMQ交换机(Exchange)的应用程序和或服务
Connection(连接):代表到 RabbitMQ 服务器的物理连接。它是客户端应用程序与 RabbitMQ 服务器之间的通信桥梁,通常基于 TCP/IP 协议。一个连接可以包含多个频道。连接的建立和关闭通常会消耗相对较多的资源,因此应该尽量减少连接的数量
Channel(通信通道):建立在一个连接之上的逻辑通信通道。一个连接可以包含多个频道,每个频道负责处理一组消息传递操作(如发布消息、声明队列、绑定交换机等)。频道是 RabbitMQ 的主要操作单元,提供了对消息和队列的访问,频道的创建和关闭比连接要轻量得多,建议在单个连接内复用多个频道以提高效率
Broker:消息中间件的核心组件,负责接收、存储和转发消息。在 RabbitMQ 中,Broker 就是指 RabbitMQ 服务器本身。它处理消息的路由、队列管理和传递过程,确保消息从生产者正确地传递到消费者。简而言之,Broker 是整个消息传递系统的中心处理单元Virtual Host(虚拟主机):RabbitMQ 实例内的逻辑隔离区域,用于将不同的应用程序或环境的数据分开管理。每个虚拟主机拥有独立的队列、交换机和绑定
Exchange(交换机):接收生产者发送的消息并根据其路由规则将消息转发到一个或多个队列。交换机有不同类型(如 Direct、Fanout和Topic),每种类型具有不同的路由策略
Binding(绑定):定义交换机与队列之间的关系,确定消息如何从交换机路由到队列。绑定包括路由键和条件,用于控制消息的传递
Queue(队列):存储消息的缓冲区,消费者从队列中读取消息进行处理。队列保持消息直到消费者处理完成或消息过期
Consumer(消费者):从 RabbitMQ 队列中接收并处理消息的应用程序或服务
3.2、框架图流程
Producer发送消息流程
(1) 生产者连接到RabbitMQ Broker , 建立一个连接( Connection) ,开启一个信道(Channel)
(2) 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
(3) 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
(4) 生产者通过路由键将交换器和队列绑定起来
(5) 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
(6) 相应的交换器根据接收到的路由键查找相匹配的队列
(7) 如果找到,则将从生产者发送过来的消息存入相应的队列中
(8) 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
(9) 关闭信道
(10) 关闭连接
Consumer接收消息流程
(1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel)
(2) 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
以及做一些准备工作(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息
(4) 消费者确认( ack) 接收到的消息
( 5) RabbitMQ 从队列中删除相应己经被确认的消息
( 6) 关闭信道
( 7) 关闭连接
四、RabbitMQ单节点部署
相关资源包的准备
比如我们都把资源安装在/software这个文件夹下
erlang:erlang-23.2.1-1.el7.x86_64.rpm
rabbitmq:rabbitmq-server-3.8.30-1.el7.noarch.rpm
socat:使用yum安装即可,关于yum可用源,去我上一个作品看就行
socat是一个多功能的网络工具,名字来由是Socket CAT,socat支持多协议,用于协议处理,端口转发,rabbitmq依赖于socat,因此在安装rabbitmq前要安装
4.1、安装工作
4.1.1、安装socat
yum install socat -y
4.1.2、安装erlang
rpm -ivh /software/erlang-23.2.1-1.el7.x86_64.rpm
4.1.3、安装RabbitMQ
rpm -ivh /software/rabbitmq-server-3.8.30-1.el7.noarch.rpm
4.1.4、测试时候安装成功
以此执行下面的命令
service rabbitmq-server status
service rabbitmq-server start
service rabbitmq-server status
service rabbitmq-server stop
service rabbitmq-server status
4.2、管理界面及管理账号
4.2.1、安装管理插件
先关闭服务,查看服务状态
rabbitmq-plugins enable rabbitmq_management
4.2.2、测试管理界面
启动RabbitMQ服务
浏览器输入:虚拟机ip:15672,打开管理界面
4.2.3、查看默认用户
rabbitmqctl list_users
发现guest用户无法登录
4.2.4、添加用户设置角色及权限
用户名:ysy
密码:123456
rabbitmqctl add_user ysy 123456
给ysy一个管理员的角色
rabbitmqctl set_user_tags ysy administrator
给ysy设置权限
rabbitmqctl set_permissions -p "/" ysy ".*" ".*" ".*"
登录测试 ,发现登录成功
4.2.5、拓展(v_host的创建)
可以使用新创建的vhost可以执行上面创建用户步骤,创建新的用户并赋予角色和权限
以此执行以下命令
rabbitmqctl add_vhost /admin/
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/admin/" admin ".*" ".*" ".*"
使用admin账户进行登录测试,使用admin登录可以选择作用的个v_host,和ysy登录不一样的
五、RabbitMQ的工作模式
5.1、简单模式
官网:简单模式
生产者应用将消息发送到 RabbitMQ 服务器中的某个交换机;交换机按照预定义的路由规则,将消息发送到一个或多个队列中。简单模式通常使用默认的交换机(direct exchange)或直接指定的交换机;消费者应用从队列中取出消息并处理。
添加依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
5.1.1、生产者(Producer)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 简单工作模式 生产者
*/
public class Producer {
//定义消息队列 常量 不能变化的
public final static String QUEUE_NAME="queue2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列 各个参数的含义
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消息
String sendMsg = "What can i say ? Mamba Out!!!!";
//执行简单发布的方法
//各个参数的含义
//String exchange, 交换机 ""是使用默认交换机
// String routingKey, 路由关键字 当前简单示例中,使用队列名称 后面再讲解改用其他的
// AMQP.BasicProperties props, 基础属性
// byte[] body 发送消息的内容
//channel发送消息
channel.basicPublish("",QUEUE_NAME,null,sendMsg.getBytes("utf-8"));
System.out.println("消息发送成功!!!!");
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
5.1.2、消费者(Consumer)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 简单工作模式 消费者
*/
public class Consumer {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(Producer.QUEUE_NAME, false, false, false, null);
//调用简单消费方法
channel.basicConsume(Producer.QUEUE_NAME,
(String s, Delivery delivery) -> {
System.out.println("交付信息的唯一识别:" + s);
//delivery.getBody() 获取到中间中的消息的字节数组
//组装消息
String receiveMsg = new String(delivery.getBody(), "utf-8");
//打印接受消息
System.out.println("接受的消息为:" + receiveMsg);
//获取交付环境对象
Envelope envelope = delivery.getEnvelope();
//打印其他信息
long deliveryTag = envelope.getDeliveryTag();
System.out.println("消息的唯一序列号:" + deliveryTag);
System.out.println("交换机名称为:" + envelope.getExchange());
System.out.println("路由键名称为:" + envelope.getRoutingKey());
}, (String var1) -> {
System.out.println("消息被取消时的回调!!!!!");
});
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
测试
先生产者发送消息,后消费者消费消息
5.2、工作队列模式 (working queue)、
官网:工作队列模式
工作队列(又名:任务队列)背后的主要思想是避免 立即执行资源密集型任务,并且必须等待 它完成。相反,我们安排任务稍后完成。我们将任务封装为消息,并将其发送到队列。正在运行的工作进程 在后台会弹出任务并最终执行 工作。当您运行多个工作线程时,任务将在它们之间共享
生产者将任务消息发送到一个队列中;队列将接收到的任务消息存储起来,等待消费者处理;消费者从队列中获取任务并处理。当一个消费者处理完成后,它会继续从队列中获取下一个任务
5.2.1、生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 工作队列(working queue)模式 生产者
*/
public class Producer {
//定义消息队列 常量 不能变化的
public final static String QUEUE_NAME="queue1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 1; i <= 10; i++) {
//定义消息
String sendMsg = "What can i say ? Mamba Out!!!!"+i;
//String exchange, 交换机 ""是使用默认交换机
// String routingKey, 路由关键字 当前简单示例中,使用队列名称 后面再讲解改用其他的
// AMQP.BasicProperties props, 基础属性
// byte[] body 发送消息的内容
//channel发送消息
channel.basicPublish("",QUEUE_NAME,null,sendMsg.getBytes("utf-8"));
}
System.out.println("消息发送成功!!!!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
5.2.2、消费者
创建三个消费者,代码都一样
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 工作队列(working queue)模式 消费者A
*/
public class ConsumerA {
public static void main(String[] args) {
/* int i = new Consumer().hashCode();
System.out.println(i);*/
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(Producer.QUEUE_NAME,false,false,false,null);
channel.basicConsume(Producer.QUEUE_NAME,
(String s, Delivery delivery)->{
System.out.println("交付信息的唯一识别:"+s);
//delivery.getBody() 获取到中间中的消息的字节数组
//组装消息
String receiveMsg = new String(delivery.getBody(),"utf-8");
//打印接受消息
System.out.println("接受的消息为:"+receiveMsg);
//获取交付环境对象
Envelope envelope = delivery.getEnvelope();
//打印其他信息
long deliveryTag = envelope.getDeliveryTag();
System.out.println("消息的唯一序列号:"+deliveryTag);
System.out.println("交换机名称为:"+envelope.getExchange());
System.out.println("路由键名称为:"+envelope.getRoutingKey());
},(String var1)->{
System.out.println("消息被取消时的回调!!!!!");
});
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
测试
先开启三个消费者让他们等待消息,这样做确保他们三个是同时开启的,然后在执行生产者生产消息,这样以生产完消费者A,B,C就可以根据轮询A,B,C就都可以从队列中拿到数据
执行生产者类,生产消息
去那三个消费者的控制台看看
通过看可以发现是按照ABC的顺序一次从队列中拿取消息
5.3、扇出模式/发布订阅模式(fanout)
官网:发布/订阅模式
Exchange 交换机 一方面它接收来自生产者的消息,另一方面它将它们推送到队列中。 主要用来绑定队列,接受和分配消息,不可以存储消息
生产者将消息发送到交换机;交换机将消息广播到所有绑定到它的队列;所有绑定到交换机的队列都会接收到消息;消费者从队列中获取消息并进行处理
交换机四种类型之一:FANOUT("fanout")
内置交换机四种类型
DIRECT("direct"):消息通过完全匹配的路由键路由到绑定的队列。例如,消息的路由键与队列的绑定键完全一致时,消息会被投递到该队列
FANOUT("fanout"):消息广播到所有绑定到该交换机的队列,忽略路由键的值。所有与交换机绑定的队列都会接收到消息
TOPIC("topic"):根据路由键的模式匹配规则路由消息。可以使用通配符模式进行更复杂的匹配。例如,
*.important.*
可以匹配logs.important.error
这样的路由键HEADERS("headers"):根据消息的头部属性进行路由,而不是基于路由键。可以通过设置消息头的键值对来进行更复杂的路由匹配
5.3.1、生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 扇出(fanout)模式(订阅发布模式) 生产者
*/
public class Producer {
//定义消息队列 常量 不能变化的
public final static String QUEUE_NAME_3="queue3";
public final static String QUEUE_NAME_4="queue4";
public final static String QUEUE_NAME_5="queue5";
private final static String EXCHANGE_NAME="exchange_fanout";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,false,false,null);
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(QUEUE_NAME_3,false,false,false,null);
channel.queueDeclare(QUEUE_NAME_4,false,false,false,null);
channel.queueDeclare(QUEUE_NAME_5,false,false,false,null);
//把交换机和队列进行绑定
channel.queueBind(QUEUE_NAME_3,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME_4,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME_5,EXCHANGE_NAME,"");
for (int i = 1; i <= 10; i++) {
//定义消息
String sendMsg = "What can i say ? Mamba Out!!!!"+i;
//String exchange, 交换机 ""是使用默认交换机
// String routingKey, 路由关键字
// AMQP.BasicProperties props, 基础属性
// byte[] body 发送消息的内容
//channel发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,sendMsg.getBytes("utf-8"));
}
System.out.println("消息发送成功!!!!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
5.3.2、消费者
消费者ABC依次绑定队列345即可
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 扇出(fanout)模式(订阅发布模式) 消费者A
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
//剩下的消费者依次绑定4,5队列
channel.queueDeclare(Producer.QUEUE_NAME_3,false,false,false,null);
channel.basicConsume(Producer.QUEUE_NAME_3,
(String s, Delivery delivery)->{
System.out.println("交付信息的唯一识别:"+s);
//delivery.getBody() 获取到中间中的消息的字节数组
//组装消息
String receiveMsg = new String(delivery.getBody(),"utf-8");
//打印接受消息
System.out.println("接受的消息为:"+receiveMsg);
//获取交付环境对象
Envelope envelope = delivery.getEnvelope();
//打印其他信息
long deliveryTag = envelope.getDeliveryTag();
System.out.println("消息的唯一序列号:"+deliveryTag);
System.out.println("交换机名称为:"+envelope.getExchange());
System.out.println("路由键名称为:"+envelope.getRoutingKey());
},(String var1)->{
System.out.println("消息被取消时的回调!!!!!");
});
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
测试
依次运行生产者,和消费者ABC
A的话就接收到了队列3的所有数据,BC同理
5.4、路由模式/直接路由
官网:路由模式
直接(direct)模式:添加一个功能 - 我们将使其可以仅订阅消息的子集。当前演示的是Exchange类型为direct使用。直接direct绑定比扇出fanout更加灵活
routingKey
确定消息应该路由到哪个队列。只有那些绑定了匹配routingKey
的队列才会接收该消息
上面演示的扇出模式,忽略路由键的值,现在演示的是带有路由键的实例
交换机四种类型之一:DIRECT("direct")
5.4.1、生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
* @Description: 路由(direct)模式 生产者
*/
public class Producer {
//定义消息队列 常量 不能变化的
public final static String QUEUE_NAME_6="queue6";
public final static String QUEUE_NAME_7="queue7";
public final static String QUEUE_NAME_8="queue8";
private final static String EXCHANGE_NAME="exchange_direct";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,false,null);
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(QUEUE_NAME_6,false,false,false,null);
channel.queueDeclare(QUEUE_NAME_7,false,false,false,null);
channel.queueDeclare(QUEUE_NAME_8,false,false,false,null);
//把交换机和队列进行绑定
channel.queueBind(QUEUE_NAME_6,EXCHANGE_NAME,"fatal");
channel.queueBind(QUEUE_NAME_7,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"trace");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"debug");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"warn");
//定义数组
String[] routingKeyTypeArray = {"trace","debug","info","warn","error","fatal"};
//实例化随机数Random
Random random = new Random();
for (int i = 1; i <= 100; i++) {
//[0,6) = 0-5的一个数字
int index = random.nextInt(routingKeyTypeArray.length);
String routingKey = routingKeyTypeArray[index];
//定义消息
String sendMsg = "What can i say ? Mamba Out!!!!,消息序号:"+i+",消息的类型为:"+ routingKey;
//String exchange, 交换机 ""是使用默认交换机
// String routingKey, 路由关键字 当前简单示例中,使用队列名称 后面再讲解改用其他的
// AMQP.BasicProperties props, 基础属性
// byte[] body 发送消息的内容
//channel发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey,null,sendMsg.getBytes("utf-8"));
}
System.out.println("消息发送成功!!!!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
5.4.2、消费者
消费者ABC依次绑定队列678即可
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 路由(direct)模式 消费者A
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(Producer.QUEUE_NAME_6,false,false,false,null);
channel.basicConsume(Producer.QUEUE_NAME_6,
(String s, Delivery delivery)->{
System.out.println("交付信息的唯一识别:"+s);
//delivery.getBody() 获取到中间中的消息的字节数组
//组装消息
String receiveMsg = new String(delivery.getBody(),"utf-8");
//打印接受消息
System.out.println("接受的消息为:"+receiveMsg+",模拟消息被存入了数据库!!!");
//获取交付环境对象
Envelope envelope = delivery.getEnvelope();
//打印其他信息
long deliveryTag = envelope.getDeliveryTag();
System.out.println("消息的唯一序列号:"+deliveryTag);
System.out.println("交换机名称为:"+envelope.getExchange());
System.out.println("路由键名称为:"+envelope.getRoutingKey());
},(String var1)->{
System.out.println("消息被取消时的回调!!!!!");
});
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
测试
依次运行生产者,和消费者ABC
执行生产者
管理面板查看
执行消费者ABC
消费者A,当时加载100条消息,只有匹配路由键fatal的消息,才会被生产者写入队列并由消费者接收,一共19条
消费者B,当时加载100条消息,只有匹配路由键error的消息,才会被生产者写入队列并由消费者接收,一共17条
消费者B,当时加载100条消息,只有匹配路由键error的消息,才会被生产者写入队列并由消费者接收,一共17条
消费者C,当时加载100条消息,只有匹配路由键trace,info,debug,warn的消息,才会被生产者写入队列并由消费者接收,一共64条
5.5、话题模式
官网:话题模式
其实也是一种路由模式,用于根据消息的路由键进行复杂的路由。话题交换允许根据一个或多个通配符模式匹配消息的路由键,从而将消息路由到一个或多个队列。这个模式特别适用于需要灵活和细粒度路由的场景
Topic模式:尽管使用直接direct交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。当前演示的是Exchange类型为topic使用。topic方式可以基于多个标准进行路由
路由规则:单词列表,由点分隔,路由键中可以有任意多的单词,最多为 255 个字节。topic比direct方式,本质上更灵活,使用绑定键有两个重要的特殊符号:
* 可以只替换一个单词。
# 可以代替零个或多个单词
交换机四种类型之一:TOPIC("topic")
5.5.1、生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
* @Description: 话题(topic)模式(通配符模式) 生产者
*/
public class Producer {
//定义消息队列 常量 不能变化的
public final static String QUEUE_NAME_9="queue9";
public final static String QUEUE_NAME_10="queue10";
public final static String QUEUE_NAME_11="queue11";
private final static String EXCHANGE_NAME="exchange_topic";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,false,false,null);
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(QUEUE_NAME_9,false,false,false,null);
channel.queueDeclare(QUEUE_NAME_10,false,false,false,null);
channel.queueDeclare(QUEUE_NAME_11,false,false,false,null);
//把交换机和队列进行绑定
channel.queueBind(QUEUE_NAME_9,EXCHANGE_NAME,"*.orange.*"); //对橙色动物 感兴趣
channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"*.*.rabbit");
//channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"#.rabbit");
channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"lazy.#"); //对兔子或者行动缓慢的动物都感兴趣
channel.queueBind(QUEUE_NAME_11,EXCHANGE_NAME,"lazy.*.elephant"); //只对行动缓慢的大象感兴趣
String[] routingKeyArray = {"quick.orange.rabbit","lazy.pink.elephant","lazy.red.elephant","lazy.orange.elephant","quick.orange.fox",
"lazy.brown.fox","lazy.pink.rabbit","quick.brown.fox","lazy.","lazy.blue.fox.ab.ba.cc.dd",
"lazy.red.horse"};
//实例化随机数Random
Random random = new Random();
for (int i = 1; i <= 20; i++) {
//[0,6) = 0-5的一个数字
String describe = routingKeyArray[random.nextInt(routingKeyArray.length)];
//定义消息
String sendMsg = "动物的描述为:"+describe;
//String exchange, 交换机 ""是使用默认交换机
// String routingKey, 路由关键字 当前简单示例中,使用队列名称 后面再讲解改用其他的
// AMQP.BasicProperties props, 基础属性
// byte[] body 发送消息的内容
//channel发送消息
channel.basicPublish(EXCHANGE_NAME, describe,null,sendMsg.getBytes("utf-8"));
}
System.out.println("消息发送成功!!!!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
5.5.2、消费者
消费者ABC依次绑定队列9,10,11队列即可
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 话题(topic)模式(通配符模式) 消费者A
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置各种连接
connectionFactory.setHost("192.168.37.171");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/admin/");
//使用工厂创建重量级的链接Connnection
connection = connectionFactory.newConnection();
//使用Connection创建chanel
channel = connection.createChannel();
//定义队列
// String queue, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否允许多个消费者消费消息
// boolean autoDelete, 是否允许自动删除消息
// Map<String, Object> arguments 其他参数 其他定义队列的参数
channel.queueDeclare(Producer.QUEUE_NAME_9,false,false,false,null);
//String queue, 队列名称
// DeliverCallback deliverCallback, 交付回调
// CancelCallback cancelCallback 取消交付回调
channel.basicConsume(Producer.QUEUE_NAME_9,
(String s, Delivery delivery)->{
System.out.println("交付信息的唯一识别:"+s);
//delivery.getBody() 获取到中间中的消息的字节数组
//组装消息
String receiveMsg = new String(delivery.getBody(),"utf-8");
//打印接受消息
System.out.println("接受的消息为:"+receiveMsg+",这些是我感兴趣的橙色动物!!!");
//获取交付环境对象
Envelope envelope = delivery.getEnvelope();
//打印其他信息
long deliveryTag = envelope.getDeliveryTag();
System.out.println("消息的唯一序列号:"+deliveryTag);
System.out.println("交换机名称为:"+envelope.getExchange());
System.out.println("路由键名称为:"+envelope.getRoutingKey());
},(String var1)->{
System.out.println("消息被取消时的回调!!!!!");
});
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
测试
依次运行生产者,和消费者ABC
执行生产者
执行消费者ABC
消费者A,当时加载20条消息,只有匹配路由键(*.orange.*:意思是只要是橙色动物就行,不管什么品种,不管快还是懒惰,是橙色就行)的消息,才会被生产者写入队列并由消费者接收,一共4条
消费者B,当时加载20条消息,只有匹配路由键(*.*.rabbit:意思是只要是兔子就行,不管什么颜色,不管快还是懒惰,是兔子就行;lazy.#:意思是只要是懒惰的我都喜欢)的消息,才会被生产者写入队列并由消费者接收,一共17条
消费者C,当时加载20条消息,只有匹配路由键(lazy.*.elephant:意思是只要是缓慢的大象就行,不管什么颜色,是缓慢的大象就行)的消息,才会被生产者写入队列并由消费者接收,一共6条
六、RabbitMQ集群搭建
6.1、克隆三台虚拟机
利用之前的虚拟机克隆两个,并修改ip,并远程连接上
6.2、修改hosts文件
分别在三台虚拟机中配置/etc/hosts文件
#编辑
vi /etc/hosts
#在/etc/hosts中添加以下代码,修改成自己的ip地址
192.168.37.171 cluster1
192.168.37.172 cluster2
192.168.37.173 cluster3
#查看
cat /etc/hosts
这里配置好记住重启一下虚拟机,要不然后续建立集群不起作用,保证这里变了
保证三台虚拟机可以互相ping通
拿cluster1举例,分别ping cluster2和ping cluster3
6.3、 同步cookie
RabbitMQ的集群依赖Erlang的分布式特性,需要保持Erlang Cookie一致才能实现集群节点的认证和通信
cat /var/lib/rabbitmq/.erlang.cookie
分别查看 三台虚拟机的erlang.cookie,不用看百分百一样,因为后两台机子是克隆过来的
6.4、启动RabbitMQ 服务
全部启动
service rabbitmq-server start
分别在rabbitmq-2和3上执行如下命令:
# 停止Erlang VM上运行的RabbitMQ应用,保持Erlang VM的运行
rabbitmqctl stop_app
# 移除当前RabbitMQ虚拟主机中的所有数据:重置
rabbitmqctl reset
在rabbitmq-2上执行:
#将当前RabbitMQ的主机加入到rabbit@cluster1这个虚拟主机的集群中
rabbitmqctl join_cluster rabbit@cluster1
在rabbitmq-3上执行:
#将当前RabbitMQ的主机加入到rabbit@cluster2这个虚拟主机的集群中
rabbitmqctl join_cluster rabbit@cluster2
rabbit@rabbitmq-1表示RabbitMQ节点名称,默认前缀就是rabbit , @ 之后是当前虚拟主机所在的物理主机hostname,要保证相互之间可以ping通
在rabbitmq-2和3上执行
# 启动当前Erlang VM上的RabbitMQ应用
rabbitmqctl start_app
6.5、查看集群状态
rabbitmqctl cluster_status
随便一个服务执行就行
6.6、创建用户
查看用户,现在之前在单体部署环境下的用户都不在了
创建vhost和用户,并授予权限,和上面的一样,直接创建就行,然后取1,2,3中查看都会有admin这个用户和vhost虚拟主机
搭建集群结构之后,之前创建的交换机、队列、用户都属于单一结构,在新的集群环境中是不能用的所以在新的集群中重新手动添加用户即可(任意节点添加,所有节点共享),如果要看到所有RabbitMQ节点上的运行情况,都需要启用rabbitmq_management 插件
6.7、查看管理面板
192.168.37.171:15672 Cluster1
192.168.37.172:15672 Cluster2
192.168.37.173:15672 Cluster3
三个都是一样的
此时,集群搭建完毕,但是默认采用的模式“普通模式”,可靠性不高
测试
使用生产者向集群rabbitmq-1发送消息,发现有数据,注意队列最好持久化(要不主节点宕机,从节点不显示队列)。查看时,发现rabbitmq-1,2,3都有数据,说明集群成功了!
1,2,3管理页面都是一样的
持久化:只用修改生产者里面的代码就行(我先不修改)
停止节点cluster1,因为没有持久化,所以从节点没有队列
rabbitmqctl stop_app
http://192.168.37.171:15672/ 访问报错
http://192.168.37.172:15672/#/queues 集群中rabbitmq-1是红色 出现NaN错误
再次启动,发现数据丢失
rabbitmqctl start_app
如果rabbitmq服务出现错误,查看日志
cat /var/log/rabbitmq/rabbit@cluster1.log
七、镜像集群配置
官网:镜像集群配置
虽然集群共享队列,但默认情况下,消息只会被路由到某一个节点的符合条件的队列上,并不会同步到其他队列的相同队列上。如此,则队列内容存在单点故障,解决方式之一就是使用镜像队列。 将集群中的队列彼此之间进行镜像,这样消息就会被拷贝到处于同一个镜像分组中的所有队列上。
7.1、命令配置(不推荐)
rabbitmqctl set_policy ha-all '^' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# 策略说明
格式:rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
-p Vhost:可选参数,针对指定vhost下的queue进行设置
--priority:可选参数,policy的优先级
--apply-to:可选参数,策略适用对象类型,可选queues,exchanges,all
name:policy策略的名称
pattern:queue的匹配模式(正则表达式),^表示匹配所有队列
definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
1、ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
2、ha-params:ha-mode模式需要用到的参数
3、ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual,默认manual
7.2、 管理界面配置
队列的开头需要和pattern一致
再次发送消息
再次停掉cluster1
rabbitmqctl stop_app
再次查看其它节点
测试
使用图形化界面,创建队列,写入消息,看是否所有节点队列同步
八、 Haproxy+RabbitMQ高可用
Haproxy官网:Haproxy官网
HAProxy是一款高性能的TCP/HTTP负载均衡器,它能高效地分配网络请求至多个服务器,同时提供健康检查和高可用性功能,是构建可扩展和可靠Web服务的关键组件。
HAProxy是一个免费、非常快速和可靠的反向代理,为基于TCP和HTTP的应用程序提供高可用性、负载平衡和代理。它特别适合流量非常大的网站,并为世界上访问量最大的网站中的很大一部分提供动力。多年来,它已成为事实上的标准开源负载均衡器,现在随大多数主流Linux发行版一起提供,并且通常默认部署在云平台中
8.1、环境配置及Haproxy的安装
安装依赖包
yum -y install wget gcc
下载haproxy源码包
# -P <目标地址> 设置下载的位置
wget -P /software https://www.haproxy.org/download/2.8/src/haproxy-2.8.0.tar.gz
解压
tar -xzvf /software/haproxy-2.8.0.tar.gz -C /usr/
编译安装haproxy
#创建安装目录
mkdir /usr/local/haproxy
#进入目录
cd /usr/haproxy-2.8.0/
#编译,安装
make TARGET=linux30 && make install PREFIX=/usr/local/haproxy/
target=linux30:
这个选项指定了HAProxy的编译目标平台。linux30是HAProxy用来表示Linux平台的一个目标名称,其中“30”表示的是一个内部的版本标识符,它代表了对Linux内核的特定支持和优化。选择正确的target很重要,因为它会影响生成的二进制文件的兼容性和性能。
创建配置目录,组和用户
#创建配置目录,组和用户:
mkdir /etc/haproxy
#创建组
groupadd -r -g 166 haproxy
#创建用户:
useradd -g haproxy -r -s /sbin/nologin -u 166 haproxy
创建组:
-r 创建系统组(gid<1000)
创建用户:
-r 创建系统用户(uid<1000)
-s选项设置用户的登录Shell为/sbin/nologin 不能登录交互的用户
8.2、创建配置文件
vi /etc/haproxy/haproxy.cfg
global
# 全局设置
#设置日志 使用 local0 来标记其日志条目,这有助于在接收端区分不同来源的日志。
log 127.0.0.1 local0 info
#log 127.0.0.1 local1 notice
#当前工作目录
chroot /usr/local/haproxy
pidfile /run/haproxy.pid
#最大链接数
maxconn 4096
#用户和组配置 和上面配置一致
user haproxy
group haproxy
#守护进程启动
daemon
defaults
# 默认设置
#启动全局日志配置
log global
#默认的模式(tcp4层|http7层|health返回ok)
mode tcp
#日志类型tcplog
option tcplog
#不记录健康日志检查信息
option dontlognull
#3次失败认为服务不可用
retries 3
#每个进程可用的最大连接数
maxconn 200
#连接超时
timeout connect 5000
#客户超时
timeout client 50000
#服务超时
timeout server 50000
#绑定配置
listen rabbitmq_cluster
bind 192.168.37.170:5672
#配置tcp模式
mode tcp
#轮询
balance roundrobin
#启用了TCP健康检查
#option tcp-check
#在健康检查时,向RabbitMQ发送一个换行符(CRLF)
#tcp-check send CRLF
#期望在健康检查中接收到以"AMQP"开头的字符串,这是RabbitMQ响应的一部分,表明服务器处于活动状态。
#tcp-check expect string AMQP
#服务器列表
#check健康检查的频率和阈值设置。
#inter 2000意味着每2秒进行一次检查;
#rise 2意味着需要连续两次成功响应才能认为服务器是健康的;
#fall 3意味着连续三次失败响应后,服务器将被认为不健康。
server rabbitmq1 192.168.37.171:5672 check inter 2000 rise 2 fall 3
server rabbitmq2 192.168.37.172:5672 check inter 2000 rise 2 fall 3
server rabbitmq3 192.168.37.173:5672 check inter 2000 rise 2 fall 3
#监控统计页面配置
listen stats
bind 192.168.37.170:15672
mode http
stats enable
#项目名字
stats uri /haproxy_stats
#设置了访问统计页面时的认证提示。6
stats realm Haproxy\ Statistics
#设置了访问统计页面时的认证提示。
stats auth admin:123456
# 设置了统计信息刷新的时间间隔,默认为 30 秒,这里设置为 10 秒
stats refresh 10s
修改绑定配置中的bind
改为自己haproxy的本机ip 端口号建议修改为16666(就是在idea里面连接的端口号)修改绑定配置中的bind的服务器列表
ip改为三台rabbitmq的ip,端口号默认是5672可以不用修改修改监控统计页面配置的bind
改为自己haproxy的本机ip ,端口号是浏览器上面连接的端口,可以不用修改,修改监控统计页面配置的 设置了访问统计页面时的认证提示(可以不用修改)
stats auth scott:tiger 其中scott是浏览器连接haproxy的账号,tiger是密码
8.3、配置启动文件
#复制haproxy文件到/usr/sbin下
cp /usr/local/haproxy/sbin/haproxy /usr/sbin/
这个操作的目的是为了让 HAProxy 成为系统级的可执行命令,使其可以在任何 shell 环境下直接调用,而不需要显式指定完整路径。
#复制haproxy初始化脚本到/etc/init.d/下
cp /usr/haproxy-2.8.0/examples/haproxy.init /etc/init.d/haproxy
#修改权限
chmod 755 /etc/init.d/haproxy
8.4、启动Haproxy测试
service haproxy start
浏览器输入:http://192.168.37.170:15672/haproxy_stats
成功!!!!!!!!!!!!!!