文章目录
- 一. RabbitMQ概述
- 1.1 同步/异步
- 1.1.1 同步调用
- 1.1.2 异步调用
- 1.2 消息中间件
- 1.2.1 概念
- 1.2.2 作用
- 1.2.3 常见的消息中间件
- 1.2.4 其他中间件
- 1.3 RabbitMQ
- 1.3.1 简介
- 1.3.2 特点
- 1.3.3 方式
- 1.3.4 架构
- 1.3.5 运行流程
- 二. 安装
- 2.1 Docker 安装 RabbitMQ
- 三. 简单队列(Simple Queue)
- 3.1 消息模式
- 3.2 简单队列(Simple Queue)
- 3.2.1 面板操作
- 3.2.2 代码操作
- 四. 工作队列(Work Queue)
- 4.1 消息模式
- 4.2 工作队列(Work Queue)
- 4.2.1 轮询模式
- 4.2.2 公平模式
- 4.3 总结
- 五. 发布订阅(Pub/Sub)
- 5.1 理解
- 5.1.1 发布订阅模式具体实现
- 生产者
- 消费者
- 六. 路由模式(Routing)
- 6.1 图形化界面理解
- 6.1.1 创建direct交换机
- 6.1.2 交换机与队列绑定
- 6.1.3 模拟生产者发送消息
- 6.1.4 查看队列消息
- 6.2 发布订阅模式具体实现
- 6.2.1 生产者
- 6.1.2 消费者
- 七. 主题模式(Topic)
- 7.1 图像界面理解
- 7.2 主题模式具体实现
- 7.2.1 生产者
- 7.2.2 消费者
- 八. SpringBoot集成RabbitMQ
一. RabbitMQ概述
1.1 同步/异步
1.1.1 同步调用
同步调用是指客户端调用远程服务时,需要等待服务端返回结果后才能继续执行,典型的场景如远程调用 HTTP 服务。
同步调用的优势:时效性强,等到结果后才返回–需要查询结果的通常用同步调用
同步调用的问题:
- 拓展性差–增加功能要改代码
- 性能下降–调用链路长,每次都是阻塞等待上一个服务
- 级联失败问题–一个服务挂掉,整个链路上的服务都出问题
1.1.2 异步调用
异步调用是指客户端调用远程服务时,不需要等待服务端返回结果,而是直接返回一个代表请求的消息,典型的场景如消息队列。当服务端处理完请求后,通过消息队列通知客户端。
异步调用的优势:提升性能–不需要等待结果,可以继续执行–适合处理耗时长的请求
1.2 消息中间件
1.2.1 概念
消息中间件(Message Queue,MQ)是一种应用程序之间的数据交换方式,它是一种分布式系统架构模式,用于在不同的应用程序之间传递消息。消息中间件的主要功能是实现应用间的松耦合,让信息的发送方和接收方不需直连,而是通过消息中间件进行交互。
- 消息: 简单的说就是软件之间通讯时传递的数据,它可以是很简单的数字、字母,也可以是很复杂的嵌套对象数据。
- 中间件:最简单的理解是第三者,本来软件A和软件B间的通讯两者直接传递消息就可以了,但是,此时中间件作为第三者,非要先让软件A通讯的消息先发给它,再由它发给软件B(感觉就是中间商一样),下面通过图来更好的理解它们。
- 消息队列:是消息中间件的一种实现方式。
总结:消息中间件则是将软件与软件之间的交互方式进行存储和管理的一种技术,也可以看做是一种容器
1.2.2 作用
- 异步通信:消息队列可以实现应用间的异步通信,应用只需要将消息放入队列,不用等待回复,就可以继续执行。
比如我们最常见的短信验证码功能,当我们在界面点击“获取验证码”后,我们还可以同时进行其他的操作,如输入更新的密码等,此时,我们不需要一直等到手机收到短信了才进行下一步的操作,这就是异步处理,提高了用户体验。
- 解耦合:通过消息队列解耦合应用,应用之间不再需要直连,而是通过消息队列进行通信。
比如常见的订单系统,当有订单下单时,我们需要减去库存,但如果订单、库存的逻辑都放在一个系统中,不止处理事件需要很长,系统的耦合性比较高,此时,使用消息中间件,可以实现将订单业务和库存业务抽出来做不同的系统,每次下单的时候可以将下单信息放入消息中间间中,然后库存系统去订阅它,只有有订单数据就进行减去库存操作,这样就将应用解耦了
- 削峰填谷:通过消息队列可以有效地削峰填谷,避免应用因消息处理过慢而出现性能问题。
如常见的秒杀系统,如果有5万个商品可以秒杀,没有消息中间件的话,所有的请求都一次性到后台,此时系统很容易卡死,引入消息中间件如消息队列,此时可以在队列中设置好可以存储数据的数量,这样每次用户请求会先但消息队列中,消息队列就减去1,当消息队列中存储长度为0时,直接返回秒杀失败,这样就避免了所有用户请求可能在同一时间到达系统后台,达到流量削峰的作用
- 广播消费:消息队列可以实现广播消费,一个消息可以被多个消费者消费
1.2.3 常见的消息中间件
- ActiveMQ:Apache 出品,主要用于企业级的消息中间件,支持多种协议,包括 AMQP、MQTT、STOMP 等。
- RabbitMQ:RabbitMQ 是一款开源的消息队列软件,由 Erlang 语言编写,基于 AMQP 协议。
- Kafka:Apache 出品,主要用于大数据实时处理,支持多种协议,包括 Apache Kafka、Apache Pulsar 等。
- RocketMQ:阿里巴巴开源的消息队列软件,主要用于微服务架构。
1.2.4 其他中间件
- 分布式消息中间件:RocketMQ、Kafka、ActiveMQ 等。
- 负载均衡中间件:Nginx、LVS、HAProxy 等。
- 缓存中间件:Redis、Memcached 等。
- 数据库中间件:MySQL、MongoDB 等。
- 搜索引擎中间件:ElasticSearch、Solr 等。
- 日志中间件:Logstash、Flume 等。
- 容器中间件:Docker、Kubernetes 等。
1.3 RabbitMQ
RabbitMQ官网
1.3.1 简介
RabbitMQ 是一款开源的消息队列软件,由 Erlang 语言编写,基于 AMQP 协议。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ 是一个在分布式系统中用于存储和转发消息的消息队列,它可以实现可靠的消息传递,支持多种消息队列协议,包括 AMQP、STOMP、MQTT 等。RabbitMQ 是一个非常灵活的消息队列,可以支持多种应用场景,如任务队列、事件驱动、数据流、消息分发等。
1.3.2 特点
- 高可用性:RabbitMQ 集群保证消息的高可用性,即使部分节点发生故障,也能保证消息的传递。
- 灵活的路由机制:RabbitMQ 支持多种路由机制,包括点对点、发布/订阅、主题等。
- 多种协议支持:RabbitMQ 支持多种消息队列协议,包括 AMQP、STOMP、MQTT 等。
- 多种语言客户端:RabbitMQ 提供多种语言的客户端,如 Java、.NET、Python、Ruby 等。
- 管理界面:RabbitMQ 提供了一个易用的管理界面,可以直观地查看消息队列的状态。
- 多种插件支持:RabbitMQ 提供了许多插件,可以实现各种功能,如消息持久化、消息确认、消息集群、Web 管理界面等。
1.3.3 方式
- 点对点(P2P):点对点通信,一个生产者发送消息到一个队列,一个消费者从同一个队列中接收消息。
- 发布/订阅(Pub/Sub):发布/订阅通信,一个生产者发送消息到一个交换机,多个消费者从同一个交换机订阅同一个主题的消息。
- 主题(Topic):主题通信,一个生产者发送消息到一个主题交换机,多个消费者从同一个主题交换机订阅同一个主题的消息。
1.3.4 架构
- Server:又称Broker, RabbitMQ 服务器,用于存储、转发消息。
- 连接器(Connector):用于客户端和 RabbitMQ 服务器之间的网络连接。
- 生产者(Producer):消息的发送方,向 RabbitMQ 队列中发送消息。
- 交换机(Exchange):消息交换机,用于接收生产者的消息并将其路由到队列。
- 队列(Queue):消息队列,存储消息直到消费者取出并处理。
- 绑定(Binding):绑定,用于将交换机和队列进行关联。
- 路由键(Routing Key):路由键,用于指定消息的路由规则。
- 消费者(Consumer):消息的接收方,从 RabbitMQ 队列中接收消息并处理。
- 虚拟主机(Virtual Host):虚拟主机,用于隔离不同用户的权限。
- 信道(Channel):信道,用于连接到 RabbitMQ 服务器,并进行消息的传输。
1.3.5 运行流程
- 1.生产者将消息发送到交换机。
- 2.交换机根据路由规则将消息路由到队列。
- 3.队列将消息存储在内存中。
- 4.消费者从队列中获取消息并处理。
- 5.消费者确认消息已被处理。
- 6.RabbitMQ 服务器将消息发送给消费者。
二. 安装
2.1 Docker 安装 RabbitMQ
1.拉取镜像或加载本地镜像
docker pull rabbitmq:management
或
docker load -i rabbitmq.tar
2.创建数据卷
docker volume create mq-plugins
3.运行容器
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
相关信息
- -e RABBITMQ_DEFAULT_USER=admin:设置默认用户名为 admin
- -e RABBITMQ_DEFAULT_PASS=123456:设置默认密码为 123456
- -v mq-plugins:/plugins:挂载数据卷,用于存储插件
- –name mq:设置容器名称为 mq
- –hostname mq1:设置主机名为 mq1
- -p 15672:15672:将容器的 15672 端口映射到主机的 15672 端口
- -p 5672:5672:将容器的 5672 端口映射到主机的 5672 端口
- -d:后台运行容器
- rabbitmq:3.8-management:指定镜像版本为 3.8-management
4.验证安装
访问 http://服务器IP:15672
输入用户名和密码,默认用户名为 admin,密码为 123456
三. 简单队列(Simple Queue)
3.1 消息模式
参考官网
3.2 简单队列(Simple Queue)
一个生产者对应一个消费者,消息直接发送到队列。
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
3.2.1 面板操作
1.创建一个队列
2.在默认交换处模拟生产者发送消息 因为该队列绑定的是默认交换机,所以消息会直接发送到队列中。
注:Routing Key 写队列名
3.队列处查看消息
4.模拟消费者接收消息(查看消息内容)
AckMode : 应答模式
- Nack: 不应答,只查看,消息不会移除队列
- Ack: 应答模式,查看并移除队列
3.2.2 代码操作
1.导入依赖
- java原生依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
- Spring依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
- Spring Boot 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
根据自己的项目环境进行选择即可
2.定义生产者
package com.syh.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* @author shan
* @date 2024/5/16 14:39
*/
public class Producer {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("47.120.37.156");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 创建连接
connection = factory.newConnection();
// 创建通道
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("hello", false, false, false, null);
// 6: 发送消息
String message = "Hello World!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange,会有一个默认交换机
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
System.out.println(" [x] Unexpected exception: " + e.getMessage());
} finally {
// 关闭连接和通道
if (connection!= null && channel.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
执行发送,这个时候可以在web控制台查看到这个队列queue的信息。
3.定义消费者
package com.syh.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author shan
* @date 2024/5/16 14:48
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("47.120.37.156");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "hello";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
执行消费者,这个时候可以看到控制台输出接收到的消息。
四. 工作队列(Work Queue)
4.1 消息模式
参考官网
4.2 工作队列(Work Queue)
工作队列(Work Queue)模式是一种消息模式,它将任务分派给多个消费者,每个消费者都可以独立地处理任务。
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢? RabbitMQ提供了两种工作队列模式:
- 轮询模式(Round-robin):每个消费者都轮流接收消息,平均分配消息;
- 公平模式(Fair dispatch):每个消费者都有相同的权重,按比例分配消息;
4.2.1 轮询模式
- 轮询模式是最简单的工作队列模式,每个消费者都接收到相同数量的消息,但消息的顺序是不确定的。
- 轮询模式适用于消费者数量固定的情况,消费者的数量越多,平均分配消息的数量越少。
- 轮询模式的优点是简单,缺点是消息的顺序不确定。
轮询模式的实现
- 我们需要创建一个队列,并将消息发送到队列中。
- 创建多个消费者,并将它们绑定到同一个队列上。
1.生产者
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "学相伴:" + i;
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
2.消费者1
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
3.消费者2
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
4.运行结果
Work1-开始接受消息
Work2-开始接受消息
Work1-收到消息是:学相伴:1
Work2-收到消息是:学相伴:1
Work1-收到消息是:学相伴:2
Work2-收到消息是:学相伴:2
Work1-收到消息是:学相伴:3
4.2.2 公平模式
- 公平模式是一种更复杂的工作队列模式,每个消费者都有相同的权重,按比例分配消息。
- 公平模式适用于消费者数量不固定的情况,消费者的数量越多,平均分配消息的数量越多。
- 公平模式的优点是消息的顺序是确定的,缺点是分配消息的数量不确定。
公平模式的实现
- 我们需要创建一个队列,并将消息发送到队列中。
- 创建多个消费者,并将它们绑定到同一个队列上。
1.生产者
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "学相伴:" + i;
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
2.消费者1
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
3.消费者2
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
4.运行结果
4.3 总结
工作模式下的轮询: - 应答模式为自动应答 finalChannel.basicConsume(“queue1”, true, …) 工作模式下的公平: - finalChannel.basicQos(1); // 设置一次只接收一条消息 - 应答模式为手动应对 finalChannel.basicConsume(“queue1”, false, …) - finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 确认消息,手动应答
五. 发布订阅(Pub/Sub)
5.1 理解
角色:
p : Product 生产者 发布消息
X : 交换机
Q : 多个队列
C: Consumer 消费者 订阅消息
1,创建fanout类型的交换机
2.创建多个队列
3.绑定队列和交换机
或者队列处也可以绑定
4.在交换机处模拟生产者发送消息
5.查看队列中的消息数量
消息数量增加了
5.1.1 发布订阅模式具体实现
- 类型:fanout
- 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
生产者
package com.syh.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author shan
* @date 2024/5/16 20:49
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.120.37.156");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
String message = "你好,fanout-exchange";
String exchangeName = "fanout-exchange";
String routingKey = "";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.syh.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author shan
* @date 2024/5/16 20:52
*/
public class Consumer {
private static Runnable runnable = () -> {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.120.37.156");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
// new Thread(runnable, "queue-3").start();
}
}
六. 路由模式(Routing)
6.1 图形化界面理解
6.1.1 创建direct交换机
6.1.2 交换机与队列绑定
注:要指定routingkey
6.1.3 模拟生产者发送消息
6.1.4 查看队列消息
6.2 发布订阅模式具体实现
- 类型:direct
- 特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
6.2.1 生产者
package com.syh.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author shan
* @date 2024/5/16 21:10
*/
public class Product {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.120.37.156");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
String message = "你好,学相伴!!!";
String exchangeName = "dicrect-exchange";
String routingKey1 = "email";
String routingKey2 = "sms";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
6.1.2 消费者
package com.syh.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author shan
* @date 2024/5/16 21:14
*/
public class Consumer {
private static Runnable runnable = () -> {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.120.37.156");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
}
}
七. 主题模式(Topic)
7.1 图像界面理解
#
表示0个或多个值,并且是多级
*
表示一级任意值,必须有
7.2 主题模式具体实现
- 类型:topic
- 特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
7.2.1 生产者
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
String message = "你好,学相伴!!!";
String exchangeName = "topic-exchange";
String routingKey1 = "com.course.order";//都可以收到 queue-1 queue-2
String routingKey2 = "com.order.user";//都可以收到 queue-1 queue-3
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
7.2.2 消费者
public class Consumer {
private static Runnable runnable = () -> {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue-1").start();
new Thread(runnable, "queue-2").start();
new Thread(runnable, "queue-3").start();
}
}
八. SpringBoot集成RabbitMQ
在本人这篇博客(点击超链接)