学习视频参考
3.RabbitMQ的安装(二)_哔哩哔哩_bilibili
06-MQ的分类_哔哩哔哩_bilibili
RabbitMQ官网
RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ
目录
1.MQ引言
1.1 什么是MQ?
1.2 MQ的分类
1.2.1 ActivaMQ
1.2.2 Kafka
1.2.3 RocketMQ
1.2.4 RabbitMQ
1.3 MQ选择
(1)ActivaMQ
(2)Kafka
(3)RocketMQ
(4)RabbitMQ
1.4 MQ应用场景
2.Rabbit MQ
2.1 简介
2.2 安装
2.3 启动
3 RabbitMQ配置
3.1 RabbitMQ管理命令行
3.2 web管理界面
4 消息策略
4.1 HelloWorld(直连)
4.2 work quenes(工作队列)
4.3 fanout(广播模型)
5 消息确认ACK
5.1 自动应答
5.2 手动应答
5.3 消息自动重新入队
6 Springboot整合
6.1 引入依赖
6.2 配置文件
6.3 生产者
6.4 消费者
1.MQ引言
1.1 什么是MQ?
MQ(Message Queue) 消息队列 ,通过典型的生产者和消费者的模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关系消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为 消息中间件 ,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
不仅仅是前后端用,业务系统之间也可以用。
1.2 MQ的分类
当今市场上有很多主流的消息中间件
如老牌的 ActivaMQ 、 RabbitMQ ,炙手可热的 Kafka ,阿里巴巴自主研发的 RocketMQ 等
1.2.1 ActivaMQ
优点:单机吞吐量万级,时效性ms级,可用性高,消息可靠性较低的概率丢失数据
缺点:比较老的MQ,维护越来越少,高吞吐量较少使用
1.2.2 Kafka
大数据的杀手锏,大数据领域内的消息传输,为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪。在数据采集、传输、存储的过程中有着举足轻重的作用。
优点:性能卓越,吞吐量高,分布式,不会丢失数据不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次,有优秀的第三方Kafka Web管理界面Kafka-Manager;在日志领域比较成熟。
缺点:kafka单机超过64个队列/分区,Load会发生明显飙高,实时性取决于轮询间隔时间,消费失败不支持重试,一台代理宕机后就会产生消息乱序,社区更新较慢。
1.2.3 RocketMQ
出自于阿里巴巴的开源产品,用java语言实现,在设计时参考了kafka,并作出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,流计算,消息推送,日志流式处理等场景
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能完善,扩展性好,支持10亿级别的消息堆积。
缺点:支持的客户端语言不多,目前是java和c++;社区活跃度一般。
1.2.4 RabbitMQ
2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
优点:由于erlang语言的高并发特性,性能较好,吞吐量到万级,MQ功能比较完善,健壮,稳定,易用,跨平台,支持多种语言,如Python、Ruby、java、C、PHP等,支持AJAX文档齐全,开源,提供的管理界面好,社区活跃度高,更新频率相当高
缺点:商业版需要收费,学习成本较高
1.3 MQ选择
(1)ActivaMQ
适用于中小型企业,比较老的经典的MQ,性能适中,可靠性适中
(2)Kafka
高吞吐量,大数据领域,追求效率
主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是日志采集和传输,适合产生大量数据的互联网服务的数据收集业务。
大型公司建议采用,如果有日志采集功能,肯定首选kafka。
(3)RocketMQ
支持分布式事务,性能好,开源功能少。
阿里巴巴开发,天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商订单扣款,非常实用。经历过多次双11场景考验,因此十分适用于并发量大且追求稳定性和可靠性的业务。
(4)RabbitMQ
结合erlang语言本身的并发优势,性能好时效性微妙级,Spring默认支持RabbitMQ,可靠性一致性很高。如果数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ。
相比之下,RabbitMQ比kafka可靠,kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求较低的场景使用,比如ELK日志收集。
1.4 MQ应用场景
-
流量削峰(缓冲队列)
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,引入消息队列
-
日志处理,大数据
-
应用解耦
场景说明:双11是购物狂节,用户下单后,订单需要通知库存系统,传统的做法就是订单系统调用库存系统的接口
-
异步处理
场景说明:用户注册后,需要发注册邮件和注册短信
2.Rabbit MQ
2.1 简介
基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。更新速度非常块。
消息可靠性强,支持消息恢复。
实现跨系统之间的通信和解耦
默认端口(管理界面/消息): 15672 / 5672
AMQP
Advanced Message Queuing Protocol 高级消息队列协议
消费者和队列建立连接,通过某一个通道通信
2.2 安装
最新官网下载网址: Downloading and Installing RabbitMQ | RabbitMQ
本次采用课程提供的安装包进行安装,注意erlang和rabbitmq版本的匹配
后续可以使用docker镜像安装,比较简单,适合集群部署
下载压缩包,上传到centos服务器
rabbitmq-server-3.7.18-1.el7.noarch.rpm
下载erlang依赖环境
erlang-22.0.7-1.el7.x86_64.rpm
安装erlang依赖包
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
安装socat
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
安装rabbitmq-server
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
拷贝配置文件模板到etc
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
查看配置文件位置
ls /etc/rabbitmq/
修改配置文件
vim /etc/rabbitmq/rabbitmq.config
修改第61行,取消注释,删掉该行末尾","逗号,保存。
2.3 启动
首先执行如下命令,启动rabbitmq中的插件管理:
rabbitmq-plugins enable rabbitmq_management
查看状态
systemctl status rabbitmq-server
开启服务
systemctl start rabbitmq-server
重启服务
systemctl restart rabbitmq-server
停止服务
systemctl stop rabbitmq-server
启动成功查看状态,会有如下显示:
通过网页进入管理页面,端口 15672
初始登录账户和密码都是 : guest
3 RabbitMQ配置
3.1 RabbitMQ管理命令行
#服务器启动相关
systemctl start|restart|stop|status rabbitmq-server
#管理命令行 用来在不适用web管理月末情况下命令操作
rabbitmq rabbitmqctl help #可以查看更多命令
#插件管理命令行
rabbitmq-plugins enable|list|disable
3.2 web管理界面
4 消息策略
生产者对应一个专门的虚拟主机Vitual Host,对应关系型数据库的库的概念,为每一个应用建立虚拟主机,访问虚拟主机需要与用户进行绑定,一个项目只能访问一个虚拟主机。
是否把消息放到交换机中,取决于用什么消息模型。不一定要用交换机。
消息发送到队列中之后,生产者任务完成。
消费者连接指定虚拟主机,监听队列里是否有消息,消费消息。
4.1 HelloWorld(直连)
最简单的模型 。
相当于 点对点 通信模型,没有交换机,只有队列,一个生产者和一个消费者
1.建立maven项目,引入相关依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
2.创建一个新的虚拟主机,将指定用户匹配到该虚拟主机上
3.工具类封装
public class RabbitMqUtils { // 连接工具类
private static ConnectionFactory connectionFactory;
static{
// 重量级资源,类加载只创建一次
// 创建连接mq的连接工程对象
connectionFactory = new ConnectionFactory();
// 设置连接rabbitmq主机
connectionFactory.setHost("47.113.93.41");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接的虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
}
// 定义提供链接对象的方法
public static Connection getConnection(){
try{
// 获取连接对象
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
return null;
}finally {
System.out.println("Connect OK");
}
}
// 关闭通道和关闭连接的工具方法
public static void closeConnectionAndChannel(Channel channel,Connection conn){
try {
if(channel != null) channel.close();
if(conn != null) conn.close();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("Close OK");
}
}
4.生产者代码
public class Provider {
// 生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接对象
Connection connection = RabbitMqUtils.getConnection();
// 获取连接中的通道对象 Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数1:队列名称 如果队列不存在自动创建
// 参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
// 参数3:exclusive 是否独占队列 true 独占 false 不独占
// 参数4:autoDelete 是否在消费完成后自动删除队列,true 自动删除 false 不自动删除
// 参数5:附加参数 channel.queueDeclare("hello",false,false,false,null);
// 发布消息
// 交换机名称 队列名称 传递消息额外设置 消息的具体内容
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
// 通过工具类来关闭通道和连接
RabbitMqUtils.closeConnectionAndChannel(channel,connection);
System.out.println("done.");
}
}
5.消费者代码
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
// 通过工具类获取连接对象
Connection connection = RabbitMqUtils.getConnection();
// 获取连接中的通道对象
Channel channel = connection.createChannel();
// 通道绑定对象
channel.queueDeclare("hello",false,false,false,null);
// 消费消息
// 参数1 队列名称 参数2:开启消息的自动确认机制 参数3:消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override // 最后一个参数:消息队列中取出的消息
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(new String(body));
}
});
// 不建议关闭,消费者需要一直监听
System.out.println("done.");
}
}
6.API参数细节
同一个通道可以向不同队列发送消息。
注意:生产者和消费者声明的队列必须是完全一致的队列,不能出现参数不一样
// API:声明队列,如果队列不存在则先创建
// 参数:队列名称,是否持久化,是否独占队列,是否自动删除队列,附加参数
// 持久化:如果不持久化,关闭mq服务后队列会丢失;无论持久与否,消息持久化于此无关
// 独占队列:一般不独占,多个连接共享同一个队列
// 自动删除:队列中如果没有其他消息时,同时消费者彻底与队列断开连接之后,将会自动删除队列
channel.queueDeclare("hello",false,false,false,null);
// API:发送消息到指定队列
// 参数:交换机名称,队列名称,传递消息额外设置,消息的具体内容
// 传递消息额外设置:MessageProperties.PERSISTENT_TEXT_PLAIN 使得消息持久化
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
// API:从队列中消费消息
// 参数:队列名称,自动确认,对象
// 消费自动确认:如果是true则是自动确认,此时消费者将会一开始拿到等量的消息并确认,如果中途宕机会丢失消息
// -- 因此不建议使用消息自动确认,设置为false
channel.basicConsume("work",true,new DefaultConsumer(channel)
4.2 work quenes(工作队列)
任务模型 ,当消息处理比较耗时的时候,可能生产消息的速度回远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列的消息一旦消费,就会消失,因此任务是不会被重复执行的。
消费者的消费是 平均分配 的,无论处理速度快慢差异,这种分发消息的方式称为 循环 。
1.生产者部分代码:发送20条消息
for (int i =0; i<20;i++){ // 发布消息
channel.basicPublish("","work",null,(i + "hello work quene").getBytes());
}
2.消费者部分代码:2个消费着消费消息
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override // 最后一个参数:消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者2号:" + new String(body));
}
});
3.测试结果,消息是 平均分配 的
4.实现能者多劳
实现手动确认,队列才把消息删了,因此处理速度快的处理消息多。
public class Customer1 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMqUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",false,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者1:" + new String(body));
// 手动确认,参数1:手动确认消息标识 参数2:false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
4.3 fanout(广播模型)
在广播模式下,消息发送流程是这样的:
-
可以有多个消费者
-
每个消费者有自己的queue(队列)
-
每个队列都要绑定到exchange
-
生产者发送的消息,只能发送到交换机,交换机来决定发给哪个队列,生产者无法决定。
-
交换机把消息发送给绑定过的所有队列
-
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
1.生产者:只负责把消息发送给交换机
//将通道声明指定交换机,交换机不存在时创建交换机
// 参数1:交换机名称
// 参数2:交换机类型 fanout 广播类型
channel.exchangeDeclare("logs","fanout"); // 广播,一条消息多个消费者同时消费
//发送消息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
2.消费者:拥有一个临时队列,绑定到交换机上
// 通道绑定交换机
channel.exchangeDeclare("logs","fanout");
// 创建临时队列
String queneName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queneName,"logs","");
// 消费消息
channel.basicConsume(queneName,true,new DefaultConsumer(channel){
@Override
// 最后一个参数:消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者3: "+new String(body));
}
});
3.测试:三个消费者
生产者发送一条消息个给交换机,三个消费者全部收到了消息。
5 消息确认ACK
该部分参考视频:22-消息应答概念_哔哩哔哩_bilibili
消息应答:消费者在收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了
5.1 自动应答
消息发送后立即被认为已经传送成功,这个模式需要在高吞吐量和数据传输安全性方面做权衡。这种模式仅使用在消费者可以高效并以某种速率能够处理这些消息的情况下使用!这种模式在生产中不适用,容易使得客户端宕机,同时容易丢失消息,只适用于实时系统。
设置 auto_ack=true
即自动应答
5.2 手动应答
设置 auto_ack=false
即自动应答
消息应答的方法
Channel.basicack
用于肯定确认,RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了Channel.basicNack
用于否定确认Channel.basicReject
用于否定确认,不处理该消息了,直接决绝,可以将其丢弃了
批量应答
5.3 消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
6 Springboot整合
6.1 引入依赖
<!--rabbit MQ 消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
</dependency>
6.2 配置文件
#rabbit-MQ
spring.rabbitmq.host = 47.113.90.23 # IP地址
spring.rabbitmq.port = 5672 # 端口号
spring.rabbitmq.username = ems
spring.rabbitmq.password = 123
spring.rabbitmq.virtual-host = /ems
6.3 生产者
RabbitTemplate
用来简化操作,使用的时候直接在项目中注入即可
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = ProjectApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
@Autowired //注入rabbitTemplate
private RabbitTemplate rabbitTemplate;
@Test // hello world 模型,点对点
public void testHelloWorld(){
// 发送消息"hello world"给消息队列 hello
// 前提:一定要有消息队列再监听
rabbitTemplate.convertAndSend("hello","hello world");
}
@Test // work 模型,多个消费者
public void testWork(){
// 发送10个消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work model");
}
}
}
6.4 消费者
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkCustomer {
// 第一个消费者
@RabbitListener(queuesToDeclare = @Queue("work")) // 一定要springboot2.x以上的版本
public void receivelOne(String message){
System.out.println("message:1 = " + message);
}
// 第二个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receivelTwo(String message){
System.out.println("message:2 = " + message);
}
}