目录
一、需求分析
1.1、对 Message Queue 的认识
1.2、消息队列核心概念
1.3、Broker Server 内部关键概念
1.4、Broker Server 核心 API (重点实现)
1.5、交换机类型
Direct 直接交换机
Fanout 扇出交换机
Topic 主题交换机
1.6、持久化
1.7、网络通信
通信流程
远程调用设计思想
1.8、模块设计图
二、实现核心类
2.1、交换机和队列的属性及绑定关系
2.2、Message 消息
一、需求分析
1.1、对 Message Queue 的认识
消息队列就是把 阻塞队列 这样的数据结构,单独提取成了一个独立的程序进行部署,实现 “进程和进程之间 / 服务和服务之间” 的生产者消费者模型(分布式系统中则是一组服务器构成的集群).
生产者消费者模型的好处如下:
- 解耦合:在一个分布式系统中 服务器A 给 服务器B 发送请求,B 给 A 返回响应,这样 A 和 B 的耦合是比较大的(B 一旦挂了,A 这边也无法正常接收响应);引入消息队列后,A 把请求发送给消息队列,B 再从消息队列中获取请求,就降低了耦合度(哪怕 B 挂了,A 也不用管,继续发送请求给消息队列,队列反馈响应).
- 削峰填谷:假设这样一个常见, A 是入口服务器, A 再调用 B 完成一些具体的业务,如果是 A 和 B 直接通信,突然有一天 A 收到用户请求的峰值,此时 B 也会随之感受到峰值;引入消息队列之后, A 把请求发给队列,这时候 B 就可以按照自己原有的节奏从队列中取请求,不至于一下子收到太多的并发量.
1.2、消息队列核心概念
生产者(Producer):发布消息的客户端应用程序.
消费者(Consumer):订阅消息的客户端应用程序,用于处理生产者产生的消息.
中间人(Broker):消费者要拿到生产者的消息,需要经过中间人,用来削峰填谷.
发布(Publish):生产者向中间人这里投递消息的过程.
订阅(Subscribe):类似于订阅报纸,消费者要从中间人这里取到对应消息的前提是,先订阅消息.
消费(Consume):消费者从中间人这里取数据的操作.
1.3、Broker Server 内部关键概念
1.虚拟主机(Virtual Host):类似于 MySQL 中的 database,是一个 “逻辑” 上的数据集合.
实际的开发中,一个 Broker Server 也可以有多种不同类型的数据,可能会同时用来管理多组 业务线 上的数据,可以使用 Virtual Host 进行逻辑上的区分~
2.交换机(Exchange):实际上,生产者是先把消息给了 Broker Server 上的某一个交换机,再由交换机把消息转发给对应的队列.
交换机就类似于公司的前台小姐姐,有一天你来面试,你就告诉前台小姐姐,然后她就会把你带到对应的楼层(队列).
3.队列(Queue):在一个大的消息队列中,可以又很多哥具体的小队列,他们用来存储消息实体,后续消费者也是从对应的队列中取数据.
4.绑定(Binding):把交换机和队列之间,建立起关联关系.
可以把 交换机 和 队列 看作 数据库 中的 “多对多” 这样的关系.
5.消息(Message):服务器 A 给 B 发的请求(通过 MQ 转发),就是一个消息,服务器 B 给 A 返回的响应(通过 MQ 转发)也是一个消息.
一个消息,可以视为是一个 字符串(二进制数据)。消息中具体包含啥样的数据,都是程序员自定义的.
Ps:这些概念,既需要在内存中存储(方便,快),也需要在硬盘中存储(持久化).
1.4、Broker Server 核心 API (重点实现)
1. 创建队列(queueDeclare)
这里不使用 Create 创建 这样的术语,而使用 Declare ,是因为这里以 RabbitMQ 为蓝本, Declare 表示 不存在则创建,存在就什么也不干.
2.销毁队列(queueDelete)
3.创建交换机(exchangeDeclare)
4.销毁交换机(exchangeDelete)
5.创建绑定(queueBind)
6.解除绑定(queueUnbind)
7.发布消息(basicPublish)
8.订阅消息(basicConsume)
9.确定消息(basicAck)
类似于 TCP 的确认应答机制,确认消息这个 api 就是让消费者显式告诉 broker server ,这个消息,我已经处理完毕了,提高整个系统的可靠性.
客户端除了提供以上 9 中方法,还需要提供 4 个方法:
1.创建 Connection
2.关闭 Connection
3.创建 Channel
4.关闭 Channel
一个 Connection 对象,就代表一个 TCP 连接,里面可以包含多个 Channel ,每隔 Channel 上面传输的数据都是互不相干的.
有了 Connection 了,为什么还要搞一个 Channel ?
TCP 中,建立 / 断开一个连接成本还是挺高的,因此,很多时候,不希望频繁的建立断开 TCP 连接,因此引入 Channel ,相比于 TCP,就要轻量很多. Connection 和 Channel 之间的关系,就类似于 “网线” 一样.
1.5、交换机类型
交换机在转发消息的时候,按照转发规则,提供了几种不同的 交换机类型(ExchangeType)来描述这里不同的转发规则.
RabbitMQ 主要实现了 四种 交换机类型(也是 AMQP 协议定义的)
- Direct 直接交换机
- Fanout 扇出交换机
- Topic 主题交换机
- Header 消息头交换机
Ps:这里主要实现前三种交换机类型,因为第四种交换机规则复杂,应用场景少,前三种就已经够用了.
Direct 直接交换机
有两个关键概念:
bindingKey:把队列和交换机绑定的时候,指定一个单词(类似于暗号).
routingKey:生产者发送消息的时候,也指定一个单词.
当 routingKey 和 BindingKey 对上暗号了,就可以把这个消息转发到对应的队列中了.
生产者发送消息的时候,会指定一个 “目标队列的名字”(routingKey),交换机收到之后,就看看绑定的队列里,有没有匹配的队列(BindingKey),如果有,就转发过去(把消息塞进对应的队列中),如果没有,消息直接丢弃.
Fanout 扇出交换机
会将接收到的消息广播到每一个跟其绑定的queue,最后交给对应的消费者,值得注意的是,exchange负责消息路由,而不是存储,路由失败则消息丢失。
Topic 主题交换机
有两个关键概念:
bindingKey:把队列和交换机绑定的时候,指定一个单词(类似于暗号).
routingKey:生产者发送消息的时候,也指定一个单词.
当 routingKey 和 BindingKey 对上暗号了,就可以把这个消息转发到对应的队列中了.
Ps:TopicExchange 与 DirectExchange 十分类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
1.6、持久化
上述这些概念(交换机、队列、绑定、消息....)对应的数据,都需要存储和管理起来,此时内存和硬盘都会各自存储一份,内存为主,硬盘为辅.
在内存中存储的原因:对于 MQ 来说,能够高效的转发处理数据式非常关键的,因此在内存上组织数据比硬盘上要快的多.
在硬盘上存储的原因:防止内存中的数据随着 进程重启/主机重启 而丢失.
Ps:硬盘存储,这个持久化是相对于 内存 的,对于一个硬盘来说,存储的消息寿命一般为 几年~几十年(一直不通电的情况下).
1.7、网络通信
通信流程
⽣产者和消费者都是客⼾端程序, broker server 则是作为服务器. 通过⽹络进⾏通信.
例如如下过程:
- 客户端发送请求:生产者的代码中需要有一个方法 queueDeclare 来创建队列,这个方法内部要做的事情就是给服务器发送一个请求,告诉服务器,咱们要创建一个队列,以及队列长啥样子......
- 服务器处理请求,并返回响应:broker server 收到这个请求之后,再执行服务器这边的 queueDeclare 方法,真正取给 内存/硬盘 上写一些数据,把这个队列给真正创建出来,再把创建 成功/失败 结果包装成响应,返回给客户端.
- 客户端接收响应:当响应回来了,客户端的 queueDeclare 就会获取到这个响应,看到 创建队列成功,此时 queueDeclare 就算执行完毕了.
远程调用设计思想
此处,客户端调用了一个本地方法,结果这个方法的背后,又给服务器发送了一系列消息,由服务器完成了一系列工作,站在调用者的角度,只是看到了这个功能完成了,并不知道背后的实现细节.
虽然调用的是一个本地方法,但实际上好像调用另一个远端服务器的方法一样~
这里可以认为是编写客户端服务器程序,通信过程中的一种设计思想——远程调用(RPC)
举个例子
以前有个美国的老哥,再大厂干活,但是他特别想摸鱼,于是他就找了一个中国人帮他干(有偿). 同样级别的程序员,在美国工作的薪水是国内的 3-4 倍.
1.8、模块设计图
二、实现核心类
Ps:Spring boot、MyBatis
2.1、交换机和队列的属性及绑定关系
我们可以使用 一个枚举类 表示三种交换机.
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
交换机属性如下
public class Exchange {
//模仿 rabbitmq 使用 name 作为唯一身份标识
private String name;
//交换价类型,DIRECT,FANOUT,TOPIC
private ExchangeType type = ExchangeType.DIRECT;
//当前交换机是否要持久化存储,true 标识持久化.
private boolean durable = false;
//TODO: 若当前交换机没人使用了,就会自动删除
private boolean autoDelete = false;
//TODO: 表示创建交换机时可以指定一些额外的选项
private Map<String, Object> arguments = new HashMap<>();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ExchangeType getType() {
return type;
}
public void setType(ExchangeType type) {
this.type = type;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public Map<String, Object> getArguments() {
return arguments;
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
队列属性如下
public class MSGQueue {
//表示队列的身份标识
private String name;
//是否持久化,true 表示支持
private boolean durable;
//TODO: 这个属性为 true,表示队列只能被一个消费者使用, false表示大家都能用
private boolean exclusive = false;
//TODO: 自动删除,为 true 表示没有人使用以后自动删除
private boolean autoDelete = false;
//TODO: 扩展参数,自定义选项
private Map<String, Object> arguments = new HashMap<>();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public Map<String, Object> getArguments() {
return arguments;
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
交换机和队列绑定关系如下
public class Binding {
//交换机身份标识
private String exchangeName;
//队列身份标识
private String queueName;
private String bindingKey;
//绑定没必要设置持久化,因为没有意义,他存在的意义前提是 交换机和队列存在(持久化)
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
2.2、Message 消息
Message 消息主要分为:
- 属性部分 BasicProperties(网络):这是一个自定义类,里面承载着 Message 核心属性,如 消息的唯一身份标识、routingKey、是否需要持久化.......
- 正文部分 byte[]:承载着具体的消息内容.
- 辅助属性:通过 offsetBeg 和 offsetEnd 快速找到文件中某一个消息的具体位置(一个文件中会存储很多消息),这里约定的规则为 “前闭后开”(以字节为单位);isValid 标识消息要删除,如果删除采用逻辑删除(并不是真的删除,而是标记为无效数据),这里约定 0x1 为有效数据、0x0 表示无效数据.
Ps:前两个信息需要在网络上传输,并且写入文件,因此就需要通过 Serializable 序列化和反序列化(直接继承接口即可,不需要具体实现)~~
而第三条不需要被序列化保存到文件中,这个属性主要是为了内存中的 Message 对象快速找到 文件中的 Message 对象,因此使用 trasient 修饰即可防止序列化
public class Message implements Serializable {
//这两个属性是 Message 最核心的属性
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
//以下是辅助类型的属性
private transient long offsetBeg = 0; //消息数据的开头距离文件开头的位置偏移量(字节)
private transient long offsetEnd = 0; //消息数据的结尾距离文件开头的位置偏移量(字节)
//表示文件中的消息是否是有效消息(对文件中的消息,如果删除,使用逻辑删除)
// 0x1 表示有效, 0x0 表示无效
private byte isValid = 0x1;
//创建一个工厂方法,让工厂方法创建 Message 对象
//此方法中创建的 Message 对象,会自动生成唯一的 MessageId
//万一 routingKey 和 basicProperties 里的 routingKey 冲突,以外面为主
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
Message message = new Message();
if(basicProperties != null) {
message.setBasicProperties(basicProperties);
}
//生成的 MessageId 以 M- 作为前缀
message.setMessageId("M-" + UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;
// offsetBeg, offsetEnd, isValid ,是消息持久化的时候才会用到,消息写入文件之前在进行设定
return message;
}
public String getMessageId() {
return basicProperties.getMessageId();
}
public void setMessageId(String messageId) {
basicProperties.setMessageId(messageId);
}
public String getRoutingKey() {
return basicProperties.getRoutingKey();
}
public void setRoutingKey(String routingKey) {
basicProperties.setRoutingKey(routingKey);
}
public int getDeliverMode() {
return basicProperties.getDeliverMode();
}
public void setDeliverMode(int mode) {
basicProperties.setDeliverMode(mode);
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public long getOffsetBeg() {
return offsetBeg;
}
public void setOffsetBeg(long offsetBeg) {
this.offsetBeg = offsetBeg;
}
public long getOffsetEnd() {
return offsetEnd;
}
public void setOffsetEnd(long offsetEnd) {
this.offsetEnd = offsetEnd;
}
public byte getIsValid() {
return isValid;
}
public void setIsValid(byte isValid) {
this.isValid = isValid;
}
}
Message 中的核心属性如下
public class BasicProperties implements Serializable {
//消息的唯一标识(使用 UUID 保证唯一性)
private String messageId;
//和 bindingKey 做匹配
//如果当前交换机类型是 DIRECT ,此时 routingKey 就表示要转发的队列名
//如果当前交换机类型是 FANOUT ,此时 routingKey 无意义(不使用)
//如果当前交换机类型是 TOPIC ,此时 routingKey 就是要和 bindingKey 做匹配,匹配才进行转发
private String routingKey;
//标识消息是否要持久化,1 表示不持久化, 2 表示持久化(rabbitmq 是这么搞得)
private int deliverMode = 1;
//RabbitMQ 还有其他属性,这里就不考虑了
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public int getDeliverMode() {
return deliverMode;
}
public void setDeliverMode(int deliverMode) {
this.deliverMode = deliverMode;
}
}