前言
RabbitMQ作为当下主流的消息中间件之一,无疑是我们Java后端开发技术成长路线的重要一环,在这篇文章中荔枝将会梳理入门RabbitMQ的知识,文章涉及RabbitMQ的基本概念及其环境配置,荔枝的RabbitMQ是在Docker上部署的,有需要的小伙伴可以直接看2.3.3节~~~荔枝也梳理了RabbitMQ六大核心模式中最简单的两种:简单模式和工作模式,以及相应的消息应答机制知识,希望我的梳理能对正在学习小伙伴们有用!
文章目录
前言
一、MQ的相关概念
1.1 MQ
1.2MQ特性
1.3 几种不同的MQ
kafka
RocketMQ
RabbitMQ
二、RabbitMQ简介
2.1 四个基本的概念
2.2 RabbitMQ结构
2.3 RabbitMQ安装
2.3.1 进入RabbitsMQ官网下载
2.3.2 将本地文件复制到Docker容器中
2.3.3 容器安装RabbitMQ
2.3.4 直接Docker安装RabbitMQ
2.4 依赖引入
三、简单模式和工作模式
3.1 简单队列模式
生产者代码
消费者代码
3.2 工作模式
3.3 消息应答机制
3.3.1 自动应答
3.3.2 其余应答机制
3.3.3 批量应答Multiple
3.3.4 消息自动重新入队
3.3.5 消息手动应答机制的代码示例
总结
一、MQ的相关概念
1.1 MQ
MQ(message queue)本质是个队列,遵循先入先出原则,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游”逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
1.2MQ特性
流量消峰
由于MQ的独特结构,由于访问的请求需要进行排队等待,在大量的请求达到我们的服务器系统中时候,为了防止宕机我们可以借助MQ进行流量消峰处理,但这也会带来性能的下降。
应用解耦
若有一个主从结构的系统,如果不使用队列进行直接耦合的话,任意一个子系统出现故障都会造成下单操作异常。但是采用的是消息队列的方式之后,即使一个子系统的功能挂了的话,用户感知不到系统发生故障,因为系统请求操作指令被储存在消息队列中,等待系统恢复又可以进行处理。
异步处理
比如A要调用一个api使B去执行大数据查询等耗时的业务,从资源的利用角度我们自然不能让A一直等待B的响应,B不再直接返回消息,而是将处理完成之后将消息放入一个MQ中,再由MQ发给A。这样A就可以即使获得异步处理的消息。
1.3 几种不同的MQ
kafka
kafka是专门为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。
优点:性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pu‖方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消
息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
RocketMQ
RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自己的一
些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是jva我们可以自己阅读源码,定制自己公司的MQ
缺点:支持的客户端语言不多,目前是jva及c++,其中c++不成熟;社区活跃度一般,没有在MQ
核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
RabbitMQ
2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
优点:由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言如:Python、.Ruby、.NET、Java、JMS、C、PHP、ActionScript、.XMPP、STOMP
二、RabbitMQ简介
RabbitMQ是一个消息中间件,它不处理消息而是接收、存储和转发消息数据。
2.1 四个基本的概念
生产者:产生数据发送消息的程序
交换机:是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列:是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
2.2 RabbitMQ结构
下图简单绘制了RabbitMQ的结构及其运行机制:
- Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker消息实体
- Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类以于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
- Connection:信道,publisher/consumer和broker之间的TCP连接
- Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point)),topic(publish-subscribe)and fanout(multicast)
- Exchange:交换机
- Queue:队列
- Producer:生产者
- Consumer:消费者
2.3 RabbitMQ安装
鉴于其它的方式网上大多都比较全,荔枝这里给出一种不同的安装方式哈。因为荔枝自己的云服务器到期了,所以就用Docker在电脑中虚拟化出一个Centos系统作为自己的Linux服务器。这里荔枝只梳理一下自己的做法:
首先安装完Docker和Docker Desktop之后就开始pullCentos的镜像并运行容器。这里可能会跟荔枝一样遇到很多问题,但别怕一步一步跟着教程走:
https://blog.csdn.net/GoodburghCottage/article/details/131413312
如果遇到Exited(255)的报错,重启一下容器即可
拉取Centos镜像后跑起来大致长这样。
2.3.1 进入RabbitsMQ官网下载
这里的版本大家可以自行决定。
erlang下载:Release 22.2.1 · rabbitmq/erlang-rpm · GitHub
RabbitMQ 下载: Release RabbitMQ 3.8.11 · rabbitmq/rabbitmq-server · GitHub
下载在指定目录下,请记住你的路径名
2.3.2 将本地文件复制到Docker容器中
命令行里面cd进入当前文件存放的路径并借助docker cp命令来将文件转移到Docker中即可。
执行docker cp语法是:
docker cp 移动文件到当前目录下的名字 容器名|容器id:文件存放在容器的路径
查询一下文件是否成功复制到指定的目录下
可以看到已经成功咯!
2.3.3 容器安装RabbitMQ
yum install -y socat
rpm -ivh erlang-22.2.1-1.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.11-1.el7.noarch.rpm
到这一步大家可能会发现报错了:
Failed to get D-Bus connection: No such file or directory
这个bug查了许久荔枝也不知道怎么解决,大佬们可以在评论区教教俺~~~
然后荔枝就换了一种方式:
2.3.4 直接Docker安装RabbitMQ
#使用docker拉取rabbitmq镜像
docker pull rabbitmq:management
#创建一个数据卷用来持久化rabbitmq中的数据
docker volume create rabbitmq-home
#创建rabbitmq的docker容器并设置rabbitmq登录管理后台的用户名和密码
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:management
需要注意的是:这里的rabbitmq管理后台的访问端口是15672,我们访问的时候只需要加上自己对应的服务器ip
进入后的页面长这样。
一步到位哈哈哈哈~~~
2.4 依赖引入
在学习RabbitMQ六种核心模式之前,我们需要在项目中配置好RabbitMq的环境依赖,在pom.xml文件中配置好:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<!--rabbitmq依赖客户端-->
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
更新maven即可完成依赖引入!
三、简单模式和工作模式
六大核心模式分类:简单模式、工作模式、发布订阅模式、路由模式、主题模式、发布确认模式
3.1 简单队列模式
生产者代码
对于RabbitMQ的使用来说,我们需要根据ConnectionFactory来创建一个连接工厂对象,并设置好相应的连接账户和工厂IP。之后创建连接和信道,使用queueDeclare来声明想要创建的队列。
queueDeclare方法的五个参数:
- 第一个参数:队列名称
- 第二个参数:设置队列里面的消息是否持久化,默认不开启的话将消息存储在内存中
- 第三个参数:设置队列是否只供一个消费者进行消费 是否进行消息共享,设置为true设置多个消费者消费
- 第四个参数:最后一个消费者端断开连接之后是否自动删除队列,true自动删除,false不自动删除
- 第五个参数:其它参数
声明完队列之后需要使用basicPublish()来发送消息体。
具体的参数含义:
- 交换机名
- 队列名(路由的key值)
- 其它参数消息
- 发送消息的消息体
package com.crj.rabbitmq;
import com.rabbitmq.client.*;
public class Producer {
//定义队列名
public static final String QUEUE_NAME = "hello";
//发送消息
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接RabbitMQ的队列
factory.setHost("127.0.0.1");
factory.setUsername("root");
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//创建一个信道
Channel channel = connection.createChannel();
//生成一个队列
/**
* 几个属性:
* 1.队列名称
* 2.队列里面的消息是否持久化,默认不开启的话将消息存储在内存中
* 3.第三个参数设置队列是否只供一个消费者进行消费 是否进行消息共享,设置为true设置多个消费者消费
* 4.最后一个消费者端断开连接之后是否自动删除队列,true自动删除,false不自动删除
* 5.其它参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消息
String message = "hello word";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息已发送");
}
}
消费者代码
消费者同生产者一样需要设置好RabbitMQ的队列信息,创建连接和信道。最后使用basicConsume来消费消息,其中basicConsume()方法需要传入四个参数,具体含义:
- 消费哪个队列
- 消费成功之后是否自动应答,true为自动应答,false为手动应答
- 未成功消费的回调函数
- 消费者取消消费的回调函数
对于回调函数我们需要实现两个接口DeliverCallback和CancelCallback,使用lambda表达式来代替内部实现类。
package com.crj.rabbitmq;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接RabbitMQ的队列
factory.setHost("127.0.0.1");
factory.setUsername("root");
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//创建一个信道
Channel channel = connection.createChannel();
//使用Lambda表达式来定义消费和取消消费的回调函数
//消费回调函数
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("message is "+new String(message.getBody()));
};
//取消消费的回调函数
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"取消消息消费");
};
//消费消息
/**
* 消费哪个队列
* 消费成功之后是否自动应答,true为自动应答,false为手动应答
* 未成功消费的回调函数
* 消费者取消消费的回调函数
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
3.2 工作模式
当生产者大量发送消息进入消息队列的时候,消费者仅有一个工作线程明显已经不能满足需求,甚至可能会导致队列的阻塞。因此提出了一种使用多个工作线程来消费消息的模式,这就是工作模式。简单理解就是大金主甲方公司的一个想法,你们一个部门所有人都需要共同来配合完成甲方给的任务。这里需要注意的就是消息只能被处理一次,因此RabbitMQ采用轮询分发消息机制。
抽取信道的创建工具类
package com.crj.rabbitmq.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtil {
//抽象一个静态类来定义一个获取信道的方法
public static Channel getChannel() throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接RabbitMQ的队列
factory.setHost("127.0.0.1");
factory.setUsername("root");
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//创建一个信道
Channel channel = connection.createChannel();
return channel;
}
}
分别创建发布者和多个消费者验证该模式,这里只给出发布者的demo,消费者的demo跟上述一致
package com.crj.rabbitmq.simple;
import com.rabbitmq.client.*;
public class Producer {
//定义队列名
public static final String QUEUE_NAME = "hello";
//发送消息
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接RabbitMQ的队列
factory.setHost("127.0.0.1");
factory.setUsername("root");
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//创建一个信道
Channel channel = connection.createChannel();
//生成一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消息
String message = "hello word";
//交换机名 队列名|路由的key值 其它参数消息 发送消息的消息体
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息已发送");
}
}
3.3 消息应答机制
前面在工作模式中提及在队列中的一个消息只能被消费者处理一次,但什么时候队列会删除消息呢,有没有可能一个消费者在处理消息的过程中发生宕机,这就会出现消息丢失。为了避免这种情况,RabbitMQ使用了消息应答机制。该机制保证了只有消费者完整处理了该消息,消息队列才会删除该消息。
3.3.1 自动应答
必须在特定前提下使用:即消息发送后就立即被认为已经传送成功。该模式需要在高吞吐量和数据传输安全性方面做出权衡,并且如果出现消费者宕机或者是连接出了问题,很容易出现消息丢失。因此该模式仅适用于在保证消费者可以高效并发并能承载起当下的消息吞吐量的情况下。
3.3.2 其余应答机制
相比于自动应答,手动应答的好处就是能够批量应答并可以减少网络的拥塞。
Channel.basicAck:肯定应答确认,RabbitMQ只有在已经获得消息被成功处理的消息后才会在队列中丢弃该消息体。
Channel.basicNack:用于否定确认, 不用确定,消息被认领后就会丢弃
Channel.basicReject:拒绝确认,与basicNack相比多了一个用于批量处理消息的参数。
3.3.3 批量应答Multiple
批量删除其实就是决定是否一次性将队列中的所有消息全部发出,这里一般是用一个boolean参数来选择,true代表批量应答,false代表的是只应答当前队列头部的信息。
注意:应答的消息是已经被处理了的消息,也就是消费者已经消费的消息。
还需要注意的是,我们需要慎用批量处理,避免队列中还没有受到ACK的消息直接应答而导致消息丢失。
3.3.4 消息自动重新入队
在消息的应答机制中,如果消息队列RabbitMQ没有受到来自消费者的Ack(这里就不赘述了哈),就会重新将消息分配给一个新的消费者消费,这就是消息的重新入队。
3.3.5 消息手动应答机制的代码示例
手动应答首先需要在channel.basicConsumer()中设置autoAck参数为false,之后需要自行设置手动应答的逻辑,channel.basicAck()方法中有两个参数:
- 第一个参数:获取当前消息的标记
- 第二个参数:设置是否批量应答
package com.crj.rabbitmq.UnAutoReply;
import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.crj.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Work01 {
public static final String TASK_QUEUE_ACK = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
System.out.println("我是消费者 1 ,正在等待接收消息》》》》");
System.out.println("接收消息的时间较短");
DeliverCallback deliverCallback = (consumerTag,message)->{
//睡眠时间
SleepUtils.sleep(1);
System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
/**
* 手动应答的逻辑
*/
//获取消息标记
long deliveryTag = message.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag,false);
};
CancelCallback cancelCallback = (consumerTarget)->{
System.out.println(consumerTarget + "消费者取消消费接口的回调逻辑");
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_ACK,autoAck,deliverCallback,cancelCallback);
}
}
简单测试一下手动应答机制,可以看到消息并不会消失~
总结
上面荔枝简单梳理了RabbitMQ的两种模式,在之后的文章中荔枝也会把剩下的模式和RabbitMQ集群的知识梳理出来。一周搞定RabbitMQ,加油!
今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~
如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!
如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!