目录
前言
1. 创建VirtualHost
1.1 定义虚拟主机的相关属性
1.2 VirtualHost 构造方法
1.3 交换机和队列的创建和删除
1.3.1 交换机操作
1.3.2 队列操作
1.4 绑定的创建和删除
1.5 发送消息到指定的队列/交换机
2. 实现路由规则Router
2.1 checkBindingKey()
2.2 checkRoutingKey()
2.3 route()
2.4 单元测试
3. 订阅消息
3.1 添加一个订阅者
3.2 创建订阅者管理类ConsumerManager
3.3 订阅消息小结
4. 消息确认 basicAck()
5. VirtualHost单元测试
结语
前言
写到这里,内存和硬盘的数据就组织完毕了,接下来我们就会引入在消息队列初识中提出的一个概念 --- 虚拟主机.简单回顾一下虚拟主机的概念: 它类似于MySQL的database,是一个逻辑的集合,一个BrokerServer上可以存在多个VirtualHost.在一个BrokerServer上可以组织不同的数据,可以使用不同的虚拟主机做出逻辑上的区分.本章节就是进行进一步的封装,同时实现一些消息队列的API.这里需要注意的是在RabbitMq中,虚拟主机是可以随便创建和删除的,在本项目目前只是默认只有一个虚拟主机的存在,后续根据情况会进行扩展,这里也提前预留了对于多虚拟主机的管理的数据结构.保证了不同虚拟机中的交换机 队列 绑定 消息都是相互隔离的.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!
1. 创建VirtualHost
👇👇👇
注意: 这一块比较重要也比较复杂,所以将代码进行截图加标注的形式进行总结,完整的VirtualHost.class代码会在讲解完给出.
👆👆👆
1.1 定义虚拟主机的相关属性
Router: 是用来定义交换机转发的规则,主要实现的是对routingKey进行验证以及判断,具体的细节会在后面给出.
ConsumerManager: 实现的是管理消费者进行消费.
以上两者就是锁对象了,后续我们要对硬盘和内存进行数据的读写,为了保证操作的原子性,以及线程安全我们会给相关操作进行加锁.
1.2 VirtualHost 构造方法
主要就是传入虚拟主机的名字,对该虚拟主机的数据库以及文件信息进行初始化,主要是对数据库进行初始化.具体DataBaseManager.init()
初始化内容如下:
初始化完成,将硬盘中的数据恢复到内存中
至此前置工作就差不多了.下面对一些重要的方法进行创建.
1.3 交换机和队列的创建和删除
1.3.1 交换机操作
如果交换机不存在就进行创建,存在就直接返回(ExchangeDeclare)
- 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
- 2. 判断交换机是否存在: 存在直接返回true即可,不存在就直接创建新的交换机即可.设置交换机的属性,根据是否持久化写入到硬盘,然后在写入到内存.这里需要注意的是,我们一定要先写硬盘再写内存,因为些硬盘是一个失败率很高的事情,经常会因为文件权限问题导致数据写入不进去.如果先写内存,而硬盘写入不进去,就还需要堆内存的数据进行删除,这就很繁琐了.
- 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/**
* 1. 创建交换机
* 如果交换机不存在就进行创建,存在就直接返回
*/
// 创建交换机
// 如果交换机不存在, 就创建. 如果存在, 直接返回.
// 返回值是 boolean. 创建成功, 返回 true. 失败返回 false
public boolean exchangeDeclare(String exchangeName,
ExchangeType exchangeType,
boolean durable,
boolean autoDelete,
Map<String, Object> arguments) {
// 1. 更改交换机的名字 交换机的名字 = 虚拟主机 + 交换机
exchangeName = virtualHostName + exchangeName;
try{
synchronized (exchangeLocker){
// 2. 判定该交换机是否存在
Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
if (existsExchange != null){
System.out.println("[VirtualHost] 交换机已经存在!");
return true;
}
// 3. 不存在,直接进行创建新的交换机
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);
// 4. 将构造好的交换机进行写入硬盘(含有持久化信息的交换机) 先写硬盘后写内存
if (durable){
diskDataCenter.insertExchange(exchange);
}
// 5. 将交换机写入到内存中
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交换机创建完成! exchangeName="+exchangeName);
// 上述操作为什么不先写内存后写硬盘?
// 因为写硬盘操作比较容易出现异常,如果写入硬盘失败,写入内存成功,再进行从内存中进行删除就比较麻烦了
}
return true;
} catch (Exception e){
System.out.println("[VirtualHost] 交换机创建失败! exchangeName="+exchangeName);
e.printStackTrace();
return false;
}
}
删除交换机
- 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
- 2. 根据交换机的名字得到交换机对象,判断交换机是否为空,不为空进行删除操作,还是先进行删除硬盘的数据,再删除内存中数据
- 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/**
* 2.删除交换机
* @param exchangeName 交换机名字
* @return
*/
public boolean exchangeDelete(String exchangeName) {
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
// 1. 先找到对应的交换机.
Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
if (toDelete == null) {
throw new MqException("[VirtualHost] 交换机不存在无法删除!");
}
// 2. 删除硬盘上的数据
if (toDelete.isDurable()) {
diskDataCenter.deleteExchange(exchangeName);
}
// 3. 删除内存中的交换机数据
memoryDataCenter.deleteExchange(exchangeName);
System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);
e.printStackTrace();
return false;
}
}
1.3.2 队列操作
针对队列创建和删除操作,这里就不做过多的解释了,过程跟上述交换机的操作一样. 下面给出代码:
/**
* 3. 创建队列
* @param queueName 队列名
* @param durable 持久化
* @param exclusive 队列独有
* @param autoDelete 自动删除
* @param arguments 其他声明
* @return
*/
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) {
// 把队列的名字, 给拼接上虚拟主机的名字.
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 判定队列是否存在
MSQueue existsQueue = memoryDataCenter.getQueue(queueName);
if (existsQueue != null) {
System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);
return true;
}
// 2. 创建队列对象
MSQueue queue = new MSQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);
// 3. 写硬盘
if (durable) {
diskDataCenter.insertQueue(queue);
}
// 4. 写内存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
/**
* 4. 删除队列
* @param queueName 队列名
* @return
*/
public boolean queueDelete(String queueName) {
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 根据队列名字, 查询下当前的队列对象
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName);
}
// 2. 删除硬盘数据
if (queue.isDurable()) {
diskDataCenter.deleteQueue(queueName);
}
// 3. 删除内存数据
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
1.4 绑定的创建和删除
- 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字 队列名字 = 虚拟主机的名字 + 队列的名字
- 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,不为空抛出异常
- 3. 绑定对象为空: 1, 判断绑定的bindingKey是否合法. 2.合法就创建绑定对象,设置响应的绑定属性.
- 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的.
- 5. 写入硬盘,再写内存
- 6. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.
这一步我们在Router进行设置一个方法,等下面更加详细的介绍router类.
/**
* 5. 创建绑定
* @param queueName 队列名字
* @param exchangeName 交换机名字
* @param bindingKey 绑定规则
* @return
*/
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
// 1. 转换交换机和队列的名字
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
synchronized (queueLocker){
// 2. 判断交换机和队列是否已经绑定成功
Binding existBinding = memoryDataCenter.getBinding(exchangeName,queueName);
if (existBinding != null){
throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName+ ", exchangeName=" + exchangeName);
}
// 3. 验证bing中的bindingKey 是否合法
if (!router.checkBindingKey(bindingKey)){
throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
}
// 4. 创建绑定对象
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
// 5. 获取对应的交换机和队列,判断是否是存在的
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
}
// 5. 先写硬盘
if (queue.isDurable() && exchange.isDurable()) {
diskDataCenter.insertBinding(binding);
}
// 6. 写入内存
memoryDataCenter.insertBinding(binding);
System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
}
return true;
}
} catch (MqException e) {
System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
e.printStackTrace();
return false;
}
}
删除绑定
- 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字 队列名字 = 虚拟主机的名字 + 队列的名字
- 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,为空抛出异常
- 3. 从硬盘进行删除,从内存进行删除
- 4. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.这里需要注意的是,我们对交换机和队列进行加锁的时候,顺序要和创建绑定的顺序是一致的.不然会出现死锁的现象.
/**
* 6. 删除绑定
* @param queueName 队列名
* @param exchangeName 交换机名字
* @return
*/
public boolean queueUnbind(String queueName, String exchangeName) {
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
// 1. 获取 binding 看是否已经存在~
Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
if (binding == null) {
throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
}
// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.
diskDataCenter.deleteBinding(binding);
// 3. 删除内存的数据
memoryDataCenter.deleteBinding(binding);
System.out.println("[VirtualHost] 删除绑定成功!");
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 删除绑定失败!");
e.printStackTrace();
return false;
}
}
1.5 发送消息到指定的队列/交换机
发布消息其实就是把消息发送到指定的交换机中,然后根据绑定关系发送到指定的队列
- 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字 队列名字 = 虚拟主机的名字 + 队列的名字
- 2. 检查消息的routingKey是否合法,不合法抛出异常
- 3. 根据传入的交换机的名字进行查找交换机对象,然后判断交换机的类型,而进行下一步的行为.
- 4. 如果交换机类型为DIRECT,则表示为直接交换机,则把routingKey作为队列的名字,先进行根据传入的参数,创建消息对象,然后按照刚才组合好的队列名字进行查找队列,查找队列进行发送消息,没查找进行抛出异常.发送消息的时候判断消息是否是持久化的,是持久化就往硬盘中写入,否则只写内存就可以.发送完消息之后,要进行重要的操作.通知消费者进行消费消息.这一块是在管理消费者进行消费消息实现的.
- 5. 如果交换机类型为Fanout 或者 Topic 我们需要在Router中进行设置相应的路由规则.
/**
* 9. 发送消息到指定的队列/交换机
*/
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
try {
// 1. 转换交换机的名字
exchangeName = virtualHostName + exchangeName;
// 2. 检查 routingKey 是否合法.
if (!router.checkRoutingKey(routingKey)) {
throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
}
// 3. 查找交换机对象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
}
// 4. 判定交换机的类型
if (exchange.getType() == ExchangeType.DIRECT) {
// 按照直接交换机的方式来转发消息
// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
// 此时, 可以无视绑定关系.
String queueName = virtualHostName + routingKey;
// 5. 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 6. 查找该队列名对应的对象
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
}
// 7. 队列存在, 直接给队列中写入消息
sendMessage(queue, message);
} else {
// 按照 fanout 和 topic 的方式来转发.
// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
// 1) 获取到绑定对象, 判定对应的队列是否存在
Binding binding = entry.getValue();
MSQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if (queue == null) {
// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
continue;
}
// 2) 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 3) 判定这个消息是否能转发给该队列.
// 如果是 fanout, 所有绑定的队列都要转发的.
// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
if (!router.route(exchange.getType(), binding, message)) {
continue;
}
// 4) 真正转发消息给队列
sendMessage(queue, message);
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 消息发送失败!");
e.printStackTrace();
return false;
}
}
private void sendMessage(MSQueue queue, Message message) throws IOException, MqException, InterruptedException {
// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上. 根据此条消息时是否要进行持久化进行判断
int deliverMode = message.getDeliverMode();
// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.
if (deliverMode == 2) {
diskDataCenter.sendMessage(queue, message);
}
// 写入内存
memoryDataCenter.sendMessage(queue, message);
// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.
consumerManager.notifyConsume(queue.getName());
}
2. 实现路由规则Router
这个类我们实现具体的路由转发规则,对之前还没实现的方法进行实现.还未实现的方法具体如下:
1. 在创建绑定的时候我们对bindingKey进行验证是否合法checkBindingKey();
2. 在往交换机进行发送消息的时候,我们对消息的routingKey进行验证\checkRoutingKey();
3. 当消息插入到交换机之后,根据交换机的主题往队列中分发消息的时候.对不同主题的交换机实现不同的路由规则route();
以上是我们在虚拟主机类中还没有进行实现的方法.下面进行一一实现:
2.1 checkBindingKey()
以下是我们合法的BindingKey的规则
/**
* 验证bindingKey是否是合法的
* 1. 数字, 字母, 下划线
* 2. 使用 . 分割成若干部分
* 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.
* @return
*/
public boolean checkBindingKey(String bindingKey){
if (bindingKey.length() == 0) {
// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.
return true;
}
// 检查字符串中不能存在非法字符
for (int i = 0; i < bindingKey.length(); i++) {
char ch = bindingKey.charAt(i);
if (ch >= 'A' && ch <= 'Z') {
continue;
}
if (ch >= 'a' && ch <= 'z') {
continue;
}
if (ch >= '0' && ch <= '9') {
continue;
}
if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
continue;
}
return false;
}
// 检查 * 或者 # 是否是独立的部分.
// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.
String[] words = bindingKey.split("\\.");
for (String word : words) {
// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
return false;
}
}
// 约定一下, 通配符之间的相邻关系(人为约定的).
// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~
// 1. aaa.#.#.bbb => 非法
// 2. aaa.#.*.bbb => 非法
// 3. aaa.*.#.bbb => 非法
// 4. aaa.*.*.bbb => 合法
for (int i = 0; i < words.length - 1; i++) {
// 连续两个 ##
if (words[i].equals("#") && words[i + 1].equals("#")) {
return false;
}
// # 连着 *
if (words[i].equals("#") && words[i + 1].equals("*")) {
return false;
}
// * 连着 #
if (words[i].equals("*") && words[i + 1].equals("#")) {
return false;
}
}
return true;
}
2.2 checkRoutingKey()
验证routingKey是合法的.routingKey是与BindingKey进行匹配的,所以必须是具体的.
/**
* 验证routingKey是否是合法的
* 1. 数字, 字母, 下划线
* 2. 使用 . 分割成若干部分
* @return
*/
public boolean checkRoutingKey(String routingKey){
if (routingKey.length() == 0){
// 空字符串,合法的情况 当交换机的类型为fanout的时候,是不需要的,所以可以设置为""
return true;
}
for (int i = 0; i < routingKey.length(); i++) {
char ch = routingKey.charAt(i);
// 判定该字符是否是大写字母
if (ch >= 'A' && ch <= 'Z') {
continue;
}
// 判定该字母是否是小写字母
if (ch >= 'a' && ch <= 'z') {
continue;
}
// 判定该字母是否是阿拉伯数字
if (ch >= '0' && ch <= '9') {
continue;
}
// 判定是否是 _ 或者 .
if (ch == '_' || ch == '.') {
continue;
}
// 该字符, 不是上述任何一种合法情况, 就直接返回 false
return false;
}
// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 true
return true;
}
2.3 route()
判断交换机的类型进而得出是否可以进行给队列进行转发消息.
1. 交换机的类型为fanout.代表给交换机进行绑定的所有队列进行转发消息.
2. 交换机的类型为Topic,需要对routingKey进行判断.进而设置给队列转发消息
/**
* 判断是否可以给绑定的交换机进行转发消息
* @return
*/
public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {
if (type == ExchangeType.FANOUT){
// 如果交换机类型为 fan-out 就直接进行返回true,表示转发给当前当前绑定的所有对列
return true;
}else if(type == ExchangeType.TOPIC){
// 如果是主题交换机,规则就比较复杂
return routerTopic(binding,message);
}else {
throw new MqException("[Router] 交换机类型有误 exchangeType=" + type);
}
}
对于主题交换机,我们进行详细的讲解.
- 1. 将bindingKey 和 routingKey 进行按照"."进行分割成字符串数组
- 2. 定义下标进行遍历数组
- 3. 遍历两个数组,主要分为5种情况.
- 3.1 当bindingKey遇到*号时直接跳过*,两个下标都进行自增1
- 3.2 当bindingKey遇到#号,如果此时#号是bindingKey的最后一位,那么直接返回true
- 3.3 当bindingKey遇到#号,如果此时#号不是最后一位,就去匹配#号下一位在routingKey的部分,匹配到了就将routingIndex指到匹配的位置,进而在进行上述循环,如果没匹配到就返回false
- 3.4 此时没有遇见通配符,所有的内容部都要进行匹配上,匹配不上就返回false
- 3.5 最后判断此时两个数组的下标是否都比较到了末尾.比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
/**
* 用来实现:topic类型的交换机的转发规则
* @param binding 绑定信息对象
* @param message 消息对象
* @return
*/
private boolean routerTopic(Binding binding, Message message) {
// 1. 将bindingKey 和 routingKey 进行按照"."进行分割
String[] bindingTokens = binding.getBindingKey().split("\\.");
String[] routingTokens = message.getRoutingKey().split("\\.");
// 2. 定义用来遍历数组的下标
int bindingIndex = 0;
int routingIndex = 0;
// 3. 进行遍历两个数组
while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){
if (bindingTokens[bindingIndex].equals("*")){
// (1.)遇到*号两个下标直接跳过 * 可以匹配一个部分
bindingIndex++;
routingIndex++;
}else if (bindingTokens[bindingIndex].equals("#")){
bindingIndex += 1;
// (2.)遇到#号 # 可以匹配多个部分
if (bindingIndex == bindingTokens.length){
// (3.)当遇到#号,#号的下标为最后一个元素的时候,直接返回true,因为可以直接匹配后面所有的内容
return true;
}else {
// (4.)当遇到#号,后面后还有内容的时候,就去匹配#号下一个部分在routingKey的部分,
// 匹配了就直接将bindingIndex指到bindingTokens下一个部分,同时将routingIndex指到匹配的地方
// 没匹配配到就返回false
routingIndex = findNextMatch(routingIndex,routingTokens,bindingTokens[bindingIndex]);
if (routingIndex == -1){
return false;
}
bindingIndex++;
routingIndex++;
}
}else {
// (5.)此时没有遇见通配符,所有的内容部都要进行匹配上
if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){
return false;
}
bindingIndex++;
routingIndex++;
}
}
// (6.)最后判断此时两个数组的下标是否都比较到了末尾
// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
return true;
}
return false;
}
/**
* 给定起始下标去在一个数组中寻找指定数组元素,找到就返回该元素在数组的下标,没找到就返回-1;
* @param routingIndex 起始下标
* @param routingTokens 目标数组
* @param bindingToken 目标元素
* @return
*/
private int findNextMatch(int routingIndex, String[] routingTokens, String bindingToken) {
for (int i = routingIndex; i < routingTokens.length; i++) {
if (routingTokens[i].equals(bindingToken)){
return i;
}
}
return -1;
}
以上就是整个Router的所有方法.我们对上述代码进行单元测试.
2.4 单元测试
package com.example.demo.mqserver.core;
import com.example.demo.common.MqException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
/**
* Created with IntelliJ IDEA.
* Description:测试交换机的转发规则(交换机类型为topic)
* User: YAO
* Date: 2023-08-01
* Time: 13:56
*/
@SpringBootTest
class RouterTest {
private Router router = new Router();
private Binding binding = null;
private Message message = null;
@BeforeEach
public void setUp() {
binding = new Binding();
message = new Message();
}
@AfterEach
public void tearDown() {
binding = null;
message = null;
}
/**
* [测试用例]
* binding key routing key result
* aaa aaa true
* aaa.bbb aaa.bbb true
* aaa.bbb aaa.bbb.ccc false
* aaa.bbb aaa.ccc false
* aaa.bbb.ccc aaa.bbb.ccc true
* aaa.* aaa.bbb true
* aaa.*.bbb aaa.bbb.ccc false
* *.aaa.bbb aaa.bbb false
* # aaa.bbb.ccc true
* aaa.# aaa.bbb true
* aaa.# aaa.bbb.ccc true
* aaa.#.ccc aaa.ccc true
* aaa.#.ccc aaa.bbb.ccc true
* aaa.#.ccc aaa.aaa.bbb.ccc true
* #.ccc ccc true
* #.ccc aaa.bbb.ccc true
*/
@Test
public void test1() throws MqException {
binding.setBindingKey("aaa");
message.setRoutingKey("aaa");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test2() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test3() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test4() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test5() throws MqException {
binding.setBindingKey("aaa.bbb.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test6() throws MqException {
binding.setBindingKey("aaa.*");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test7() throws MqException {
binding.setBindingKey("aaa.*.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test8() throws MqException {
binding.setBindingKey("*.aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test9() throws MqException {
binding.setBindingKey("#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test10() throws MqException {
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test11() throws MqException {
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test12() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test13() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test14() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test15() throws MqException {
binding.setBindingKey("#.ccc");
message.setRoutingKey("ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test16() throws MqException {
binding.setBindingKey("#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
}
单元测试通过.
3. 订阅消息
在我们的虚拟主机中进行添加方法完成消息的订阅.要想完成消息的订阅,就需要在消息队列中新建一个列表consumerEnvList用来存储消费者的信息,当有消息进行存储到队列的时候,此时选出消费者进行消费消息.而消费者消费信息的这个环境需要单独定义一个类ConsumerEnv进行描述.以上这个消费信息的过程我们定义一个类ConsumerManager进行管理这些逻辑.
3.1 添加一个订阅者
给队列添加消费者,当队列接收到消息的时候,就要将消息推送给订阅者
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
此处插入的参数Consumer相当于一个回调函数,就是一个函数式接口.我们在common中进行定义Consumer
@FunctionalInterface
public interface Consumer {
// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.
// 通过这个方法把消息推送给对应的消费者.
// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)
void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException;
}
定义这个回调函数表示:收到消息之后要对消息进行处理.
3.2 创建订阅者管理类ConsumerManager
1. 这个类是和虚拟主机是一一对应的,每个虚拟主机都有一个管理消费者的对象,而管理的消费者的对象对应的是与之对应的.
2. 我们采用一个堵塞队列来记录收到消息的的队列名字,每次队列收到消息,就会往这个队列中进行添加队列的名字,然后后续进行通知这个队列的消费者进行消费消息.
3. 单独使用一个线程池用来执行消息的回调.(主要是获取到消息之后,给响应设置消息的属性与消息本体发送给客户端.)
4. 我们设置一个扫描线程,从堵塞队列不断地取出元素,进而找到队列,在这个队列进行消费消息,并且设置扫描线程为后台线程,这样就不会阻止进程的结束.
public class ConsumerManager {
// 1. 持有虚拟主机对象的引用,用来操作数据
private VirtualHost parent;
// 2. 指定一个线程池,负责执行具体的回调任务
private ExecutorService workPool = Executors.newCachedThreadPool();
// 3. 存放令牌的队列,存放接收到消息的队列名字(堵塞队列)
// 当这个堵塞队列一接收到队列的名字,扫描线程就会就会找到虚拟主机,然后找到这个队列,进而消费消息
private BlockingQueue<String> tokenQueue = new LinkedBlockingDeque<>();
// 4. 扫描线程 (关注令牌队列中添加了哪些队列的名字,就知道哪些队列添加了消息,取出消息,进而交给线程池,进行消费这些消息)
private Thread scannerThread = null;
}
1. 给堵塞队列设置接口,供虚拟主机进行调用.
/**
* 1. 收到消息,通知消费者进行消费消息(将消息对应的队列名字添加到堵塞队列中)
*/
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
2. 实现扫描线程
public ConsumerManager(VirtualHost p) {
parent = p;
scannerThread = new Thread(() -> {
while (true) {
try {
// 1. 拿到令牌
String queueName = tokenQueue.take();
// 2. 根据令牌, 找到队列
MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);
}
// 3. 从这个队列中消费一个消息.
synchronized (queue) {
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
e.printStackTrace();
}
}
});
// 把线程设为后台线程.
// 后台线程不会影响进程的结束
scannerThread.setDaemon(true);
scannerThread.start();
}
3. 添加消费者环境ConsumerEnv到指定的队列
我们在common中实现这个类
@Data
public class ConsumerEnv {
// 1. 消费者的身份标识
private String consumerTag;
// 2. 消费者消费队列的名字
private String queueName;
// 3. 是否自动应答
private boolean autoAck;
// 4. 通过这个回调函数来处理收到的消息.
private Consumer consumer;
public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
this.consumerTag = consumerTag;
this.queueName = queueName;
this.autoAck = autoAck;
this.consumer = consumer;
}
}
(1) 按照指定的队列名找到这个类.
(2) 创建消费者环境对象,进行添加,同时如果这个队列的消息存在,就需要进行消费这些信息,调用consumeMessage()方法传入队列的名字.
/**
* 2. 新增Consumer对象到指定的对列
*/
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
// 找到对应的队列.
MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
synchronized (queue) {
queue.addConsumerEnv(consumerEnv);
// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for (int i = 0; i < n; i++) {
// 这个方法调用一次就消费一条消息.
consumeMessage(queue);
}
}
}
4. 消费消息 consumeMessage()
(1) 因为一个队列中可能会有多个消费者,我们按照轮询的方式进行挑选消费者进行消费消息,在队列的类中,设置方法chooseConsumer()
/**
* 挑选订阅者 进行消费队列中的消息 (轮询的方式)
* @return
*/
public ConsumerEnv chooseConsumer(){
// 1. 如果当前队列对应的消费者的数量为0,直接返回null,表示没有筛选到消费者
if (consumerEnvList.size() == 0){
return null;
}
// 2. 使用当前订阅到的下标进行对消费者列表取模,然后进行挑选消费者记性消费消息,实现消息的轮询消费
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.getAndIncrement();
return consumerEnvList.get(index);
}
(2) 从队列中取出消息
(3) 把消息带入到回调方法,交给线程池进行执行
/**
* 消费者进行消费信息
* @param queue
*/
private void consumeMessage(MSQueue queue) {
// 1. 按照轮询的方式, 找个消费者出来.
ConsumerEnv luckyDog = queue.chooseConsumer();
if (luckyDog == null) {
// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.
return;
}
// 2. 从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
System.out.println(message);
if (message == null) {
// 当前队列中还没有消息, 也不需要消费.
return;
}
// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
workPool.submit(() -> {
try {
// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
// 2. 真正执行回调操作
luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.
// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.
if (luckyDog.isAutoAck()) {
// 此时是自动应答,表示直接删除
// 1) 删除硬盘上的消息
if (message.getDeliverMode() == 2) {
parent.getDiskDataCenter().deleteMessage(queue, message);
}
// 2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageID());
// 3) 删除内存中消息中心里的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageID());
System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
3.3 订阅消息小结
4. 消息确认 basicAck()
此处是消费者在回调函数中对消息进行处理之后再回调函数中执行的.
- 1. 获取要删除消息以及所在队列的对象
- 2. 删除硬盘和内存的数据
- 3. 删除未确认消息集合的数据
/**
* 消费者消费完消息进行手动应答
* @return
*/
public boolean basicAck(String queueName, String messageId){
queueName = virtualHostName + queueName;
try {
// 1. 获取要删除消息以及所在队列的对象
Message message = memoryDataCenter.getMessage(messageId);
if (message == null){
throw new MqException("[VirtualHost] 确认的信息不存在 messageId="+messageId);
}
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null){
throw new MqException("[VirtualHost] 确认的队列不存在 queueName="+queueName);
}
// 2
// 1.)删除硬盘中的数据
if(message.getDeliverMode() == 2){
diskDataCenter.deleteMessage(queue,message);
}
// 2.) 删除消息中心的消息
memoryDataCenter.removeMessage(message.getMessageID());
// 3.) 删除委未确认消息集合的消息
memoryDataCenter.removeMessageWaitAck(queue.getName(),message.getMessageID());
System.out.println("[VirtualHost] basicAck成功 消息被确认成功 queueName=" + queueName
+ ",messageId:." + messageId);
return true;
} catch (MqException | ClassNotFoundException | IOException e) {
e.printStackTrace();
System.out.println("[VirtualHost] basicAck失败 消息被确认失败 queueName=" + queueName
+ ",messageId:." + messageId);
return false;
}
}
至此以上就是VirtualHost的全部内容,内容很多,很繁琐需要,静下心来仔细的体会.
5. VirtualHost单元测试
package com.example.demo.mqserver;
import ch.qos.logback.core.util.FileUtil;
import com.example.demo.DemoApplication;
import com.example.demo.common.Consumer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.*;
/**
* Created with IntelliJ IDEA.
* Description:虚拟主机的操作测试
* User: YAO
* Date: 2023-08-01
* Time: 18:26
*/
class VirtualHostTest {@Autowired
public VirtualHost virtualHost = null;
@BeforeEach
void setUp() {
DemoApplication.context = SpringApplication.run(DemoApplication.class);
// 创建好虚拟主机对象
virtualHost = new VirtualHost("default");
}
@AfterEach
void tearDown() throws IOException {
DemoApplication.context.close();
//把硬盘的目录进行删除
File dataDir = new File("./data");
FileUtils.deleteDirectory(dataDir);
}
@Test
void exchangeDeclare() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
Assertions.assertTrue(ok);
}
@Test
void exchangeDelete() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.exchangeDelete("testExchange");
Assertions.assertTrue(ok);
}
@Test
void queueDeclare() {
boolean ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
Assertions.assertTrue(ok);
}
@Test
void queueDelete() {
boolean ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.queueDelete("testQueue");
Assertions.assertTrue(ok);
}
@Test
void queueBind() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.queueBind("testQueue","testExchange",
"testBindingKey");
Assertions.assertTrue(ok);
}
@Test
void queueUnbind() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.queueBind("testQueue","testExchange",
"testBindingKey");
ok = virtualHost.queueUnbind("testQueue","testExchange");
Assertions.assertTrue(ok);
}
@Test
void basicPublish() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.basicPublish("testExchange","testQueue",null,"Hello".getBytes(StandardCharsets.UTF_8));
Assertions.assertTrue(ok);
}
/**
* 1. 先订阅, 后发布消息
*/
@Test
public void testBasicConsume1() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true,
false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true, false, null);
Assertions.assertTrue(ok);
// 先订阅队列
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消费者自身设定的回调方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
// 再发送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
}
/**
* 先发送消息, 后订阅队列.
*/
@Test
public void testBasicConsume2() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true,
false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true, false, null);
Assertions.assertTrue(ok);
// 先发送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
// 再订阅队列
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消费者自身设定的回调方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicConsumeFanout() throws InterruptedException {
// 创建一个交换机,并且绑定两个队列
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue1", "testExchange", "");
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue2", "testExchange", "");
Assertions.assertTrue(ok);
// 发布消息发到交换机
ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
Assertions.assertTrue(ok);
Thread.sleep(500);
// 两个消费者订阅上述的两个队列.
ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicConsumeTopic() throws InterruptedException {
// 1. 创建交换机(主题交换机)
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);
Assertions.assertTrue(ok);
// 2. 创建队列
ok = virtualHost.queueDeclare("testQueue", false, false, false, null);
Assertions.assertTrue(ok);
// 3. 将交换机和队列进行绑定(设置bindingKey)
ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
Assertions.assertTrue(ok);
// 4. 发布消息(设置routingKey)
ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
Assertions.assertTrue(ok);
// 5. 订阅消息
ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicAck() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true,
false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true, false, null);
Assertions.assertTrue(ok);
// 先发送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
// 再订阅队列 [要改的地方, 把 autoAck 改成 false]
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消费者自身设定的回调方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
// [要改的地方, 新增手动调用 basicAck]
boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
Assertions.assertTrue(ok);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
}
结语
本文将整个VirtualHost进行了实现,实现了供BrokerServer调用的API.基础的消息队列框架已经搭建好了,接下来就是搭建服务器和客户端了.请持续关注,谢谢!!!
完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇
模拟实现消息队列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq