消息队列的模拟实现(一)
- 认识消息队列
- 生产者消费者模型两大特征
- 市面上可见的消息队列`MQ`
- 消息队列的特点:
- 模拟实现消息队列
- 模型分类
- 提供的核心API
- 消息队列的推拉模式
- 交换机的类型
- 持久化
- 网络通信
- 额外提供的方法
- 使用一个TCP和信道之间的区别
- 消息应答
- 使用数据库`SQLite`
- 总结
认识消息队列
根据block queue
(阻塞队列)->是一个生产者消费者模型。(是在一个进程内部进行的)
而所谓的消息队列就是将阻塞队列这种数据结构,单独提取出作为一个功能来实现,独立进行部署也是所谓的生产者消费者模型。(进程和进程之间,服务器和服务器之间)
在一个分布式系统中,由一组服务器构成的“集群”。
综上所述,严谨一点:
消息队列是一种在应用程序之间进行异步通信的中间件。它允许发送和接收消息的应用程序之间可以独立于彼此的运行,而不需要直接连接。
消息队列主要用于解耦发送方和接收方之间的耦合性,提高系统的可扩展性和可靠性。
生产者消费者模型两大特征
-
解耦和
两个服务器或者进程中间有一个相对于缓存区的消息队列,两个服务器在对消息队列进行交互,如:A发请求到消息队列,而B从消息队列中读取请求,然后返回响应到消息队列,A在消息队列中获取响应。
-
削峰填谷
根据不同时期数据量的大小,利用消息队列让一个服务器在大多数情况下可以正常运行。
比如:A是一个入口服务器,A调用B完成一些具体的业务。在A,B直接通信,如果A突然收到大量数据,达到请求峰值,B随着也会感到峰值的出现。而在引入消息队列,就算A达到峰值时,B也可以仍然按照原来的方式进行读取请求,不会感到大量压力。
市面上可见的消息队列MQ
RabbitMQ
Kafka
RockrtMQ
ActiveMQ
消息队列的特点:
- 可靠性
- 消息队列提供持久化的传输消息机制,确保发送的消息不会丢失。
- 异步型
- 消费者和生产者的操作是异步执行的,使得应用程序可以在发送和接收之间可以独立运行
- 解耦性
- 分离了两种模式,生产者和消费者之间的解耦性得到减少,从而提高系统的灵活性和可维护性
- 缓冲能力
- 消息队列相对于生产者和消费者之间的缓存区,相对于削峰填谷
- 扩展性
- 可以通过增加生产者和消费者的方式来扩展系统的处理业务能力。
模拟实现消息队列
消息队列的运行原理:
消息队列总的来说是一个生产者消费者模型。生产者负责生产消息存储在队列中,而消费者在队列中消费消息。
既然我们要模拟实现一个消息队列,就不得不分析一下所谓的消息队列究竟是如何实现的。
- 需求分析,参考
MQ
的功能- 了解核心概念(生产者消费者模型)
- 中间人(Broker)
- 发布(Publish)—》生产者向中间人投递消息的过程
- 订阅(Subscribe)—》确认那些消费者在中间人这里取出数据,这个注册过程,叫订阅
- 消费(consume)–》消费者从中间人这里取数据
模型分类
服务器中传递数据
- 一对一
- 一对多
- 多对多
中间人服务器
(Broker Server)中包含的内容:
- 虚拟机(Virtual Host)。类似于
MySQL
数据库中一个Database
,相当于一个数据集合。 - 交换机(Exchange)。相当于生产者将消息投递给Broke server中的某个交换机,再有交换机将消息传递给对应的队列。
- 队列(Queue)。真正用于存储消息实体,后续消费者将从对应的队列中读数据。
- 绑定(Binding)。将交换机和队列,建立关联关系,关系可能是一个交换机对应一个队列,可能多个交换机对应一个队列。
- 消息(message)。 就像请求与响应都是一个消息,用于传递的一种介质。
RabbitMQ就是以上的概念来组织的,被称为AMQP
协议。
提供的核心API
- 创建队列(
QueueDeclare
)
- 存在不创建,不存在创建
- 销毁队列(
QueueDelete
)- 创建交换机(
exchangeDeclare
)- 销毁交换机(
exchangeDeclare
)- 创建绑定(
QueueBind
)- 解除绑定(
QueueUnbind
)- 发布消息(
basicPublic
)- 订阅消息(
basicConsume
)- 确认信息(
BasicACK
)
- 该API主要是让消费者告诉
Broke server
,这个消息已经处理完毕,提高系统的可靠性和传输成功率高。
消息队列的推拉模式
推拉模式指的是消息队列其中的发送和接收方式。
RabbitMQ
只支持推的方式,kafka
都支持
push
:Broke把接收到的数据,主动发给订阅者,订阅者在队列中等待,并实时接收到新的消息,这种模式下,消费者无需主动请求消息,而是由生产者主动推送给他们。
- 优点:实时性高
- 缺点:浪费短暂的空闲时间
- 适用场景:需要及时响应的场景
poll
:订阅者主动调取broke中的数据,消费者定期轮询或发送请求来获取新的消息。
- 优点:消费者有更大的控制权
- 缺点:实时性低,可能第一时间无法获取到消息
- 适用场景:对消费者的读取消息有时间要求的场景
交换机的类型
交换机在转发消息时,有一套转发规则,对RubbitMQ
存在四种不同的交换机,来描述不同的交换规则。
交换机与交换机的转发规则
- Direct直接交换机
- 生产者发送消息的时候,会指定一个目标队列的名字,交换机收到以后,查看绑定的队列,有则转发,无则丢弃。
- Fanout扇出交换机
- 将收到的消息转发到每一个绑定的队列。
- Topic主题交换机
bindingKey
:把队列和交换机绑定的时候,指定的一个单词(暗号)。routingKey
:生产者发送消息时,也指定一个单词。- 当两个key对上时,才会将消息转发到消息队列中
- Header消息头交换机
只实现前三种,前三种比较常用!
三种操作的应用场景:
Direct
专属交换机,只有指定队列可以使用FanOut
通用交换机,只要是队列就可以使用Topic
只有对准了"暗号"的队列,才可以使用
持久化
Exchange、Queue、Binding 、Message
的这些内容都需要持久化,都需要让BrokeServer
组织管理,使用两种存储方式:
- 内存:方便使用
- 硬盘:重启数据不丢失,需要设定消息持久化
对于消息队列,能够高效的处理数据,是非常关键的指标,使用内存存储数据可以达到很高效的处理数据速度,不过内存有一个缺点就是关机后会自动清楚数据,所以需要使用硬盘来存储数据。
该持久化就是两个存储方式之间的对比,硬盘的持久化也是相对于内存持久。
网络通信
各种服务器(生产者/消费者)通过网络,通过broke server
进行交互,这里设定使用TCP
和自定义协议
来实现以上两者的交互工作。在网络通信的过程中,客户端需要提供对应的Api
来实现对服务器的操作。
额外提供的方法
- 创建 Connection
- 关闭 Connection
以上两个操作相对于一个
TCP
连接
- 创建 Channel
- 关闭Channel
通信/信道
使用一个TCP和信道之间的区别
一个Connection中包含多个Channel,每个channel中的数据是毫不相干的,就像进程之间是独立的。因为TCP
建立连接消耗资源,频繁创建和销毁更是不利于程序的高效运行,所以更多时候不会频繁断开连接。使用Channel是一个轻量型通信,比TCP
连接和断开,节约更多的资源。
消息应答
当消息队列中的消息被消费,需要进行应答。
两种应答模式:
- 自动应答(消费者自行取走消息,消息丢失也无碍)
- 手动应答(可靠性高,通常需要消费者主动来调用方法,一般用于重要信息的应答方式)
应答模式是为了保证消息确实被消费者处理成功了,在需要时间可靠性强的场景中比较常见。
使用数据库SQLite
Mysql
是一个比较重量型的数据库,为了让项目更加轻便快捷,采用SQLite
数据库。
一个完整的
SQLite
只是一个可执行文件,是一个本地数据库,该数据库直接操作的是系统中的硬盘文件。
在Java中使用Sqlite
不需要安装,只需要引入依赖即可使用,自动加载jar包和动态文件。
手动安装方式:下载SQLite
点击安装,只是一个exe
文件。
Mysql
和SQLite
之间,后者是不需要设置用户和密码的!原因是Sqlite
是一个只允许单个用户使用的数据库,而MySql
支持多用户使用,所以需要验证密码准确性。
总结
要实现一个消息队列首先需要进行项目分析,列出项目组成,然后一一细分其中功能。其实做每一个项目都是这样的,当然如果是你自己创建一个新的项目其实也是需要进行项目分析和项目组成,然后得到你所期待的功能。