中间件&消息队列
中间件概述
中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。
说白了就是平台+通信,我们开发出来的应用程序遵循某些协议或者规范,去和底层操作系统打交道。而中间件的一大特征就是跨平台。屏蔽底层操作系统的复杂性,对于上层,屏蔽
中间件应用
举例:
1,RMI(Remote Method Invocations, 远程调用)
2,Load Balancing(负载均衡,将访问负荷分散到各个服务器中)
3,Transparent Fail-over(透明的故障切换)
4,Clustering(集群,用多个小的服务器代替大型机)
5,Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
6,Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)
7,Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)
8,System Management(系统管理)
9,Threading(多线程处理)
10,Message-oriented Middleware面向消息的中间件(异步的调用编程)
11,Component Life Cycle(组件的生命周期管理)
12,Resource pooling(资源池)
13,Security(安全)
14,Caching(缓存)
MQ概述
MQ全称 Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。也是中间件的一种。
分布式系统在通信过程中,一般分为两种:
-
直接远程调用
-
引入中间件间接调用
小结
- MQ,消息队列,存储消息的中间件
- 分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
- 发送方称为生产者,接收方称为消费者
常见的MQ产品
目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,
也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑
MQ的优势&劣势
MQ的优势
应用解耦
系统开发的时候应该遵循高内聚低耦合的守则,尤其是分布式的系统,如果不是高内聚低耦合,整个系统杂糅在一起将会非常麻烦。
- 直接系统调用
- 引入MQ实现间接调用
异步提速
-
同步调用(不引入MQ)
-
异步调用(引入MQ)
不需要管系统的处理结果,这些消息消费的结果都交给各大系统,订单是不对其负责的,都交给MQ执行。这种设计方式从用户的角度来看,就是很短的时间就给用户交付了,感觉就很好。
削峰填谷
- 直接调用
- 间接调用
MQ承载5k请求轻轻松松,一点一点给A系统放出流量处理
小结
- 应用解耦:提高系统容错性和可维护性
- 异步提速:提升用户体验和系统吞吐量
- 削峰填谷:提高系统稳定性
MQ的劣势
小结
RabbitMQ背景
AMQP协议简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
RabbitMQ就是基于AMQP协议所构建出来的中间件
RabbitMQ 简介
基础架构
2007年,Rabbit 技术公司基于 AMQP 标准开发的RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:
对上图的解释:
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。可以反复利用提高系统效率。
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。中间件内部类似交换机的存在,接收生产者发布来的消息,并进行分发。将其分发到对应的Queue中,具体是怎么分发的,要取决于Binding。
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存
到 exchange 中的查询表中,用于 message 的分发依据
工作模式
- 简单模式
- Work queues、
- Publish/Subscribe 发布与订阅模式
- Routing路由模式
- Topics 主题模式
- RPC 远程调用模式(远程调用,不太算MQ;暂不作介绍)
补充一个Java原生的中间件 JMS
小结
- RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
- RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
- AMQP 是协议,类比HTTP。
- JMS 是 API 规范接口,类比 JDBC。
RabbitMQ快速上手
RabbitMQ的安装详见官网RabbitMQ官网
安装文档:资料/软件/安装 RabbitMQ.md
入门程序
使用简单模式完成消息传递
架构
创建工程
创建初始工程
分别添加依赖
引入依赖
<dependencies>
<!--rabbitmq java 客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
编写生产者发送消息
向MQ中发送消息主要有以下的步骤
创建连接工厂
设置参数
创建连接 Connection
创建Channel
创建队列Queue
发送消息
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
//要放入队列的信息
String body = "hello rabbitmq~~~";
//6. 发送消息
channel.basicPublish("","hello_world",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
点一下运行,这个时候看管理的页面
编写消费者接收消息
消费者和生产者的代码还是很类似的,配置的内容基本一样,只是后面的部分改用了监听的程序
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue,上面配置内容基本一致
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:接收后是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
匿名内部类的方式进行方法重写
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//关闭资源?不要,因为是监听程序
}
启动程序之后,监听程序就可以正常从队列中把消息拿出来了
小结:
RabbitMQ的工作模式
Work queues 工作队列模式(轮询减压)
架构图,实际上就是从原来的单一消费者模式,变成了多个消费者的模式,现在MQ要服务多个消费者了,但是某一个瞬间,C1和C2只有一个消费者取到消息。换句话说,这个时候C1和C2是竞争关系
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。
代码实现
所有的代码都是基于上面的例子修改的
-
生产者
-
消费者1、消费者2
代码同理和上面的快速入门内容差不多,但是这里的改的内容是队列的名称,要不然找不到具体的被消费队列。
注意,如果队列中的消息被消费了,那么这个消息就不存在了。也就是说:C1如果先C2一步把所有的消息都给消费了,那么C2也就没得消费了。
启动好就可以等待生产者生产好消息放入MQ了
总结
启动生产者,这个时候就可以等待消费者消费了。
消费者自动启动,C1和C2对MQ进行同步监听,一旦新的消息上来了,两个消费者将轮询着对其进行消费。
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系,如果我想每个消费者都能消费到,那么要看Pub/Sub 订阅模式。
- Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,
只需要有一个节点成功发送即可。
Pub/Sub 订阅模式
保证多个消费者都能有消息来拿,而不是竞争的关系。
注意要在创建交换机时把类型更改为FANOUT
代码实现
思路:用生产者来生产一条日志消息,希望可以让两个消费者都能接收到,并且以不同的方式对消息进行处理。C1负责把消息打印,C2把数据存入数据库。每个消费者只监听自己的队列。
创建连接工厂
设置参数 (如何连接MQ)
创建连接 Connection
创建Channel
创建交换机
创建队列Queue
绑定队列和交换机
发送消息
最后释放资源
- 生产者
相比于上面的WorkQueue,多了创建交换机、为每个消费者都创建队列、绑定队列和交换机的内容,其余的部分基本一致。
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
解读一下创建交换机的参数
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。一般都是这个。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
//因为这里生产者和交换机进行通信,所以要创建一个交换机,按照参数规定交换机名称,交换机类型等等
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
//为每个队列都创建相应的队列Queue
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
//要放入队列的信息
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//发送消息,这里发布消息是通过交换机分发到不同队列中
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
channel.basicPublish(exchangeName,"",null,body.getBytes());
//释放资源
channel.close();
connection.close();
运行,可以发现新的队列已经上来了
- 消费者
消费者基本上和WorkQueue基本一致,只需要绑定好监听队列即可,等待交换机分发消息到MQ中。绑定好队列的名称就可以消费了。
以消费者C1为例
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue,上面配置内容基本一致
//队列名称
String queue2Name = "test_fanout_queue2";
//如果没有一个名字叫test_fanout_queue2的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(queue2Name,true,false,false,null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
匿名内部类的方式进行方法重写
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//不需要的暂时注释
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("消费者2接收消息,输出内容存储到数据库");
}
};
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:接收后是否自动确认
3. callback:回调对象
*/
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要,因为是监听程序
}
两个队列都从队列中拿到了消息。
总结
1、 交换机需要与队列进行绑定,绑定之后:一个消息可以被多个消费者都收到。
2、发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机
Routing 路由模式
刚刚例子的消息,是无差别发送。但是应该是,不同的消息应该转发到该去的队列。或者说,同一条信息,由不同的人收到之后,来去做不同的事情。
注意要在创建交换机时把类型更改为DIRECT
代码实现
队列1只绑定 error 的RoutingKey 队列2绑定 error、warning、info
的RoutingKey,多绑定几个即可完成一个队列多个RoutingKey
综上,队列1处理error的消息,队列2处理error、warning、info的消息
- 生产者
配置信息不写了,太多了
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//创建交换机以及队列,并绑定
String exchangeName = "routingExchange";
//5. 创建交换机,指定好交换类型为DIRECT
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6. 创建队列,一共两个队列,分别对应不同的消费者
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机,根据RoutingKey绑定不同的交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
//队列1只绑定 error
channel.queueBind(queue1Name, exchangeName, "error");
//队列2绑定 error、warning、info,多绑定几个即可
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
channel.queueBind(queue2Name, exchangeName, "info");
//综上,队列1处理error的消息,队列2处理error、warning、info的消息
String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
//8. 发送消息,规定RoutingKey为warning,所以只有队列2可以被路由分发该消息
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
- 消费者
消费者一共有两个,队列1处理error的消息,队列2处理error、warning、info的消息
也就是说,如果我发了一个warning的消息,和队列1的RoutingKey匹配不到,所以消费者1就没有消息可以消费了。
消费者代码基本上和WorkQueue一样,这里就不赘述了,因为RoutingKey的分发是靠交换机,而不是消费者。所以消费者只需要去关心怎么处理MQ来的消息就可以了。
启动两个消费者监听
再启动生产者,此时发现,只有符合warning条件的queue2拿到了消息,而queue1是没有消息的。
总结
Routing 模式要求队列在绑定交换机时要指定 routing key,交换机会把消息转发到符合 routing key 的队列,消费者对队列进行监听,有新消息就把自己队列的消息拿出来消费。
Topics 通配符模式
说白了就是带了通配符,进行RoutingKey的匹配
一般的通配符有俩用.
来进行分割
- #匹配一个或者多个次
- *匹配一个
注意要在创建交换机时把类型更改为TOPIC
代码实现
- 生产者
实际上总体思路和上面的Routing是差不多的。只不过在绑定队列和交换机的时候,用到的RoutingKey被改为了通配符版本。除此以外,再改一下路由器类型为TOPIC
即可
更改后的通配符绑定规则为:
//需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
//队列1,只收error结尾和order开头后面跟一个单词的
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
//队列2,全部都要
channel.queueBind(queue2Name,exchangeName,"*.*");
- 消费者
保证队列名字和生产者创建的队列名字一致,能找到该找的队列就可以了。其余的消费部分都差不多。基本上RoutingKey的分发全靠交换机,只是消费者单纯的消费。
路由分发规则:
//队列1
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
//队列2
channel.queueBind(queue2Name,exchangeName,"*.*");
先测试第一个RoutingKey:"a.a"
。
只符合队列2的规则,路由会把消息分发给对应的队列2,所以绑定队列2的消费者也就可以直接拿到消息
再测试第一个RoutingKey:"a.error"
。
这个消息两个队列的匹配规则都符合,因此两个队列都能接收到消息。
总结
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key
的时候可以使用通配符,显得更加灵活。
总结工作模式
- 简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。 - 工作队列模式 Work Queue一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
- 发布订阅模式 Publish/subscribe需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
- 路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。 - 通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
Spring整合RabbitMQ
代码实现
配置文件
一般来说,先引入相关的POM文件已经对应的目标。这里不做过多演示,就是Spring和MQ的依赖
- properties文件,主要为配置MQ的连接,和之前的配置内容基本没差别
- xml文件内部主要配置的是交换机、队列等等内容,根据消费者和生产者的角色不同,其内容也不相同,这个在后面再详解。
生产者
- xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory,读取properties的内容 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->
<!--
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
exclusive:是否独占
durable:是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播(Pub/Sub 订阅模式);所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1" />
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--<rabbit:direct-exchange name="aa" >
<rabbit:bindings>
<!–direct 类型的交换机绑定队列 key :路由key queue:队列名称–>
<rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>-->
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符(Topic模式);*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建(auto-declare)-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
在代码中主要是注入一个RabbitTemplate,通过这个RabbitTemplate来进行消息的发送,主要是convertAndSend
这个API,该API下重载有许多方法,通过不同的参数。实现不同队列,不同工作模式的收发。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
//1.注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 普通的轮询RoutingKey模式
*/
@Test
public void testHelloWorld(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
}
/**
* 发送fanout消息,也就是无差别发送,每个消费者都有消息,单一交换机保证信息发送
*/
@Test
public void testFanout(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
}
/**
* 发送topic消息,也就是RoutingKey改为通配符的形式
*/
@Test
public void testTopics(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic....");
}
}
消费者
主要是实现一个监听器MessageListener接口,实现消息,来完成对于队列的监听。具体监听哪个队列在XML队列里配置即可
- xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--配置对于不同队列的监听类,自己配置的监听类-->
<bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
<!--这些还没写,所以注释掉了
<bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
<bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>
-->
<!--配置具体类来对应监听哪一个队列-->
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
</rabbit:listener-container>
</beans>
监听类:
这个监听类每次只运行一次,但是对于队列的监听肯定是要求持续监听的,所以我们这里再去写一个测试的循环,保证这个监听类能一直检测,保持监听状态。
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
Junit测试里面写的循环类,持续循环
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test1(){
boolean flag = true;
while (true){
}
}
}
运行测试
SpringBoot整合RabbitMQ
主要是引入MQ的starter即可
生产者
生产端基本步骤如下:
- 创建生产者SpringBoot工程
- 引入start,依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写yml配置,基本信息配置
- 定义交换机,队列以及绑定关系的配置类
- 注入RabbitTemplate,调用方法,完成消息发送
- 创建SpringBoot工程,引入RabbitMQ的坐标
- 编写yml配置信息
# 配置RabbitMQ的基本信息 ip 端口 username password..
spring:
rabbitmq:
host: localhost # ip
port: 5672
username: guest
password: guest
virtual-host: /
- 在配置类内部定义交换机,队列以及绑定关系的配置类
@Configuration
public class RabbitMQConfig {
//常量定义 交换机 和 队列 的名字
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
/**
* 定义交换机
*/
@Bean("bootExchange")
public Exchange bootExchange(){
/*
通过调用ExchangeBuilder的方法来配置交换机
分别为:ExchangeBuilder.topic模式的交换机(交换机名字).是否持久化(是).构建()
当然,这个语句只是一小部分,其他的配置内容需要自己调用
*/
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 定义队列
*/
@Bean("bootQueue")
public Queue bootQueue(){
/*
通过调用QueueBuilder的方法来配置队列
分别为:QueueBuilder.durable持久化队列(队列名字)..构建()
当然,这个语句只是一小部分,其他的配置内容需要自己调用
*/
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
队列和交互机绑定关系 Binding
这里参数列表(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange")
是因为这里以后会绑定很多交换机和队列,所以一定要用@Qualifier来区分一下
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
//这里通过调用绑定器BindingBuilder来绑定队列和交换机、并且因为交换机是topic模式,要配好通配符
//BindingBuilder.绑定(参数列表的队列).和哪个交换机绑定(参数列表的交换机).绑定规则(RoutingKey通配符).没有参数()
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
- 注入RabbitTemplate,调用方法,完成消息发送
还是之前的那个API,和Spring的写法基本一样,写一个测试类测试一下,将消息发送到队列中。
@SpringBootTest
class RabbitMqProducerApplicationTests {
//注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
//发送消息,RoutingKey格式为:boot.#
//convertAndSend(要发送的交换机名称,RoutingKey,消息)
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.abc","boot mq hello~~~");
}
}
运行一下,将服务主启动类启动,再启动测试类
看一下MQ的管理页面,新的队列和交换机已经就位了
现在等待消费者配置好消费就可以了
消费者
消费端基本步骤如下:
- 创建消费者SpringBoot工程
- 引入start,依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写yml配置,基本信息配置
- 定义监听类,使用@RabbitListener注解完成队列监听。
- 新建SpringBoot工程,引入RabbitMQ依赖就可以了,这里不赘述了
- 编写yml根据实际情况对各项配置修改即可这里也不赘述了
- 创建一个MQ监听类,主要是通过注解
@RabbitListener(queues = "队列名称")
来确定具体对于某个队列的监听。然后和方法里的参数进行绑定即可。方法内部通过Message 对象 进行操作,即可拿到消息。
@Component
public class RabbimtMQListener {
//绑定队列名称,监听boot_queue的队列
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
//System.out.println(message);
System.out.println(new String(message.getBody()));
}
}
总结
- SpringBoot提供了快速整合RabbitMQ的方式
- 基本信息再yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
- 生产端直接注入RabbitTemplate完成消息发送
- 消费端直接使用@RabbitListener完成消息接收