目录
一.介绍
1.1MQ概述
1.2MQ优势和劣势
1.3常见的 MQ 产品
1.4RabbitMQ简介
1.5RabbitMQ中的相关概念
1.6RabbitMQ的安装
二.快速入门
2.1入门程序
2.2工作模式
2.2.1Work queues 工作队列模式
2.2.2Pub/Sub 订阅模式
2.2.3Routing 路由模式
2.2.4Topics 通配符模式
三.整合SpringBoot
3.1生产端
3.2消费端
3.3总结
一.介绍
1.1MQ概述
MQ全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
MQ,消息队列,存储消息的中间件
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者
1.2MQ优势和劣势
优势
应用解耦
异步提速
削峰填谷
劣势
系统可用性降低
系统复杂度提高
一致性问题
1.3常见的 MQ 产品
1.4RabbitMQ简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
RabbitMQ基础架构图
1.5RabbitMQ中的相关概念
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据
1.6RabbitMQ的安装
楼主用的是Docker进行安装
拉取镜像
docker pull rabbitmq
运行镜像
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
参数说明:
-d:表示在后台运行容器;
-p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认的用户密码;
--name rabbitmq:设置容器名称;
rabbitmq:容器使用的镜像名称;
二.快速入门
2.1入门程序
需求:使用简单模式完成消息传递
步骤:
① 创建工程(生成者、消费者),在IDEA中创建两个Maven模块
② 分别添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
③ 编写生产者发送消息
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
// String queue,队列名称
// boolean durable,是否持久化
// boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
// boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
// Map<String, Object> arguments
// 如果没有一个名字叫hello_world的队列,则自动创建,有就不创建
channel.queueDeclare("work_queues",true,false,false,null);
//6.发送消息
// String exchange,交换机名称,简单模式使用默认""
// String routingKey,路由名称
// AMQP.BasicProperties props,配置信息
// byte[] body,发送的消息数据
for (int i = 1; i <= 10; i++) {
String body = i+" ---> hello rabbitmq!";
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
④ 编写消费者接收消息
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
// String queue,队列名称
// boolean durable,是否持久化
// boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
// boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
// Map<String, Object> arguments
// 如果没有一个名字叫hello_world的队列,则自动创建,没有就不创建
channel.queueDeclare("hello_world",true,false,false,null);
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("counsumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
}
}
2.2工作模式
2.2.1Work queues 工作队列模式
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
生产者
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
// String queue,队列名称
// boolean durable,是否持久化
// boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
// boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
// Map<String, Object> arguments
// 如果没有一个名字叫hello_world的队列,则自动创建,有就不创建
channel.queueDeclare("work_queues",true,false,false,null);
//6.发送消息
// String exchange,交换机名称,简单模式使用默认""
// String routingKey,路由名称
// AMQP.BasicProperties props,配置信息
// byte[] body,发送的消息数据
for (int i = 1; i <= 10; i++) {
String body = i+" ---> hello rabbitmq!";
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
消费者1
public class Consumer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
// String queue,队列名称
// boolean durable,是否持久化
// boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
// boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
// Map<String, Object> arguments
// 如果没有一个名字叫hello_world的队列,则自动创建,没有就不创建
channel.queueDeclare("work_queues",true,false,false,null);
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("counsumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
消费者2和消费者1一样
小结
①在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
②Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个, 只需要有一个节点成功发送即可。
2.2.2Pub/Sub 订阅模式
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失!
生产者
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
// String exchange,交换机名称
// BuiltinExchangeType type,交换机的类型,4种类型
// DIRECT("direct"),定向
// FANOUT("fanout"),扇形(广播) 发送消息到每一个与之绑定的队列
// TOPIC("topic"),通配符的方式
// HEADERS("headers"),不知道
// boolean durable,是否持久化
// boolean autoDelete,是否自动删除
// boolean internal,内部使用,一般是false
// Map<String, Object> arguments,参数
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
//6.创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定交换机和队列
// String queue,队列名称
// String exchange,交换机名称
// String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
//8.发送消息
String body = "日志信息:张三去调用了findAll方法……日志级别:info...";
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者1
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("counsumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台……");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
消费者2即把最后一行代码,改为channel.basicConsume(queue2Name,true,consumer);
回调方法可以修改用作差别对待
小结
1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
2. 发布订阅模式与工作队列模式的区别:
①工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
②发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)
③发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机
2.2.3Routing 路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
生产者
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
// String exchange,交换机名称
// BuiltinExchangeType type,交换机的类型,4种类型
// DIRECT("direct"),定向
// FANOUT("fanout"),扇形(广播) 发送消息到每一个与之绑定的队列
// TOPIC("topic"),通配符的方式
// HEADERS("headers"),不知道
// boolean durable,是否持久化
// boolean autoDelete,是否自动删除
// boolean internal,内部使用,一般是false
// Map<String, Object> arguments,参数
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
//6.创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定交换机和队列
// String queue,队列名称
// String exchange,交换机名称
// String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")
//队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//8.发送消息
String body = "日志信息:张三去调用了findAll方法……日志级别:info...";
channel.basicPublish(exchangeName,"info",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者1
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("counsumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台……");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
消费者2
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("counsumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息存储到数据库……");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
小结
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
2.2.4Topics 通配符模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
生产者
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
// String exchange,交换机名称
// BuiltinExchangeType type,交换机的类型,4种类型
// DIRECT("direct"),定向
// FANOUT("fanout"),扇形(广播) 发送消息到每一个与之绑定的队列
// TOPIC("topic"),通配符的方式
// HEADERS("headers"),不知道
// boolean durable,是否持久化
// boolean autoDelete,是否自动删除
// boolean internal,内部使用,一般是false
// Map<String, Object> arguments,参数
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);
//6.创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定交换机和队列
// String queue,队列名称
// String exchange,交换机名称
// String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")
//需求 所有error级别日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
//8.发送消息
String body = "日志信息:张三去调用了findAll方法……日志级别:info...";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者1
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("counsumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("把信息存数据库……");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
消费者2
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("ip"); //ip 默认localhost
factory.setPort(5672); //端口 默认5672
factory.setVirtualHost("/itcast");//虚拟机 默认/
factory.setUsername("admin");//用户名
factory.setPassword("admin");//密码
//3.创建Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
//6.接收消息
// String queue,队列名称
// boolean autoAck,是否自动确认
// Consumer callback,回调对象
Consumer consumer = new DefaultConsumer(channel){
//回调方法
// String consumerTag,标识
// Envelope envelope,获取一些信息,交换机、路由key……
// AMQP.BasicProperties properties,配置信息
// byte[] body,消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("counsumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("把日志信息打印到控制台……");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
三.整合SpringBoot
3.1生产端
1. 创建生产者SpringBoot工程
2. 引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>producer-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<!--父工程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
3. 编写yml配置,基本信息配置
#配置RabbitMQ的基本信息
spring:
rabbitmq:
host: ip
username: admin
password: admin
port: 5672
virtual-host: /
4. 定义交换机,队列以及绑定关系的配置类
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3.绑定关系
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
5. 注入RabbitTemplate,调用方法,完成消息发送
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//1.注入
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello!");
}
}
3.2消费端
1. 创建消费者SpringBoot工程
2. 引入依赖
与生产端一样
3. 编写yml配置,基本信息配置
与生产端一样
4. 定义监听类,使用@RabbitListener注解完成队列监听。
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(message);
}
}
3.3总结
SpringBoot提供了快速整合RabbitMQ的方式
基本信息在yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
生产端直接注入RabbitTemplate完成消息发送
消费端直接使用@RabbitListener完成消息接收