目录
一.VirtualHost虚拟机模块介绍
二.VirtualHost的实现
1. 类概述
2. 交换机操作
3. 队列操作
4. 绑定操作
5. 消息操作
6. 清理操作
总结
三.全部代码
一.VirtualHost虚拟机模块介绍
虚拟机是对之前几个数据管理模块的整合,并封装了之前的一些操作。
实际上虚拟机也可以有很多个,这里为了操作简便,整个项目中只有一个虚拟机对象。
二.VirtualHost的实现
1. 类概述
VirtualHost
类代表一个虚拟主机的抽象,主要用于管理消息队列系统的核心模块,如交换机、消息队列、绑定关系和消息管理。每个虚拟主机有自己独立的资源,通过类成员 _emp
、_mqmp
、_bmp
和 _mmp
来管理相应的交换机、队列、绑定和消息。
保证了每个虚拟机内部数据的独立性。
private:
std::string _name; // 虚拟主机名称
ExchangeManager::ptr _emp; // 交换机管理
MsgQueueManager::ptr _mqmp; // 队列管理
BindingManager::ptr _bmp; // 绑定管理
MessageManager::mmp _mmp; // 消息管理
public:
using ptr = std::shared_ptr<VirtualHost>;
VirtualHost(const std::string &name, const std::string &dbname, const std::string &msgdir)
: _name(name),
_emp(std::make_shared<ExchangeManager>(dbname)),
_mqmp(std::make_shared<MsgQueueManager>(dbname)),
_bmp(std::make_shared<BindingManager>(dbname)),
_mmp(std::make_shared<MessageManager>(msgdir))
{
// _emp,_mqmp,_bmp创建后会自动recover
// _mmp 需要手动调用initQueueMsg来recover文件中的数据
// 1.获取_mqmp中的所有队列
msgqueue_map queue_map = _mqmp->selectAll();
// 2.遍历队列,recover每个队列的历史消息
for (auto &kv : queue_map)
{
_mmp->initQueueMsg(kv.first);
}
}
-
私有成员变量:
_name
:虚拟主机的名称。_emp
:交换机管理器,管理虚拟主机内的所有交换机。_mqmp
:消息队列管理器,负责管理消息队列的声明与操作。_bmp
:绑定管理器,处理交换机与队列之间的绑定关系。_mmp
:消息管理器,负责管理消息的存储和传输。
-
构造函数
VirtualHost()
:- 通过参数
name
、dbname
和msgdir
初始化虚拟主机名称、数据库名和消息存储目录。 - 初始化成员
_emp
,_mqmp
,_bmp
,_mmp
,并通过各自的构造函数进行recover
操作。 - 通过
MsgQueueManager::selectAll()
获取所有队列,并使用MessageManager::initQueueMsg()
还原队列的历史消息。
- 通过参数
2. 交换机操作
// 1. 声明/删除/查找交换机
bool declareExchange(const std::string &name, msg::ExchangeType type,
bool durable,
bool auto_del,
const google::protobuf::Map<std::string, std::string> args)
{
return _emp->declareExchange(name, type, durable, auto_del, args);
}
bool removeExchange(const std::string &name)
{
// 删除交换机+对应的绑定
_bmp->removeByExchange(name);
return _emp->removeExchange(name);
}
bool exchangeExists(const std::string &name)
{
return _emp->exists(name);
}
Exchange::ptr selectExchange(const std::string &name)
{
return _emp->selectExchange(name);
}
-
declareExchange()
:- 声明一个交换机,包括交换机名称、类型、持久性、自动删除等属性。实际操作由
ExchangeManager
类的declareExchange()
函数完成。
- 声明一个交换机,包括交换机名称、类型、持久性、自动删除等属性。实际操作由
-
removeExchange()
:- 删除一个交换机,调用
BindingManager::removeByExchange()
删除与该交换机关联的绑定关系,然后移除交换机。
- 删除一个交换机,调用
-
exchangeExists()
和selectExchange()
:- 分别用于检查交换机是否存在以及获取交换机实例。
3. 队列操作
// 2. 声明/删除队列
bool declareQueue(const std::string &name, bool durable,
bool exclusive, bool auto_del,
const google::protobuf::Map<std::string, std::string> &args)
{
// 初始化队列的消息+队列
_mmp->initQueueMsg(name);
return _mqmp->declareQueue(name, durable, exclusive, auto_del, args);
}
bool removeQueue(const std::string &name)
{
// 删除队列的消息+对应的绑定+队列
_mmp->destroyQueueMsg(name);
_bmp->removeByQueue(name);
return _mqmp->removeQueue(name);
}
bool queueExists(const std::string &name)
{
return _mqmp->exists(name);
}
-
declareQueue()
:- 声明一个队列,同时初始化该队列的消息存储。通过调用
MessageManager::initQueueMsg()
处理消息初始化后,再由MsgQueueManager::declareQueue()
声明队列。
- 声明一个队列,同时初始化该队列的消息存储。通过调用
-
removeQueue()
:- 删除队列时,调用
MessageManager::destroyQueueMsg()
删除该队列的所有消息,并移除与该队列相关的绑定关系。
- 删除队列时,调用
-
queueExists()
:- 检查队列是否存在,封装了
MsgQueueManager::exists()
的调用。
- 检查队列是否存在,封装了
4. 绑定操作
// 3. 绑定/解绑
bool bind(const std::string &ename, const std::string &qname, const std::string &key)
{
Exchange::ptr ep = _emp->selectExchange(ename);
if (ep.get() == nullptr)
{
DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());
return false;
}
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if (mqp.get() == nullptr)
{
DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());
return false;
}
return _bmp->bind(ename, qname, key, ep->_durable && mqp->_durable);
}
bool unbind(const std::string &ename, const std::string &qname)
{
Exchange::ptr ep = _emp->selectExchange(ename);
if (ep.get() == nullptr)
{
DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());
return false;
}
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if (mqp.get() == nullptr)
{
DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());
return false;
}
return _bmp->unbind(ename, qname);
}
bool bindExists(const std::string &ename, const std::string &qname)
{
return _bmp->exists(ename, qname);
}
-
bind()
:- 实现队列与交换机之间的绑定,首先验证交换机和队列是否存在,然后通过
BindingManager::bind()
进行绑定。绑定关系通常涉及路由键(key
)的定义。
- 实现队列与交换机之间的绑定,首先验证交换机和队列是否存在,然后通过
-
unbind()
:- 解绑定操作,与
bind()
相对应,用于移除队列与交换机之间的绑定。
- 解绑定操作,与
-
bindExists()
:- 检查特定的交换机与队列之间是否存在绑定关系。
-
selectByExchange()
:- 获取某一交换机下所有绑定的队列信息。该函数依赖于
BindingManager::selectByExchange()
来检索绑定数据。
- 获取某一交换机下所有绑定的队列信息。该函数依赖于
5. 消息操作
// 5. 消息的发布,获取,确认
bool basicPublish(const std::string &qname, msg::BasicAttributes *bp, const std::string &body)
{
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if (mqp.get() == nullptr)
{
DLOG("发布消息失败,队列%s不存在!", qname.c_str());
return false;
}
// std::cout<<mqp->_name<<std::endl;
return _mmp->insertMsg(qname, bp, body, mqp->_durable);
}
msg_ptr basicConsume(const std::string &qname)
{
return _mmp->front(qname);
}
void basicAck(const std::string &qname, const std::string &id)
{
_mmp->ack(qname, id);
}
-
basicPublish()
:- 向指定队列发布消息,首先确认队列是否存在,若存在则调用
MessageManager::insertMsg()
插入消息。插入过程中,队列的持久性属性将被传递,以决定消息是否需要持久化到磁盘。
- 向指定队列发布消息,首先确认队列是否存在,若存在则调用
-
basicConsume()
:- 从指定队列中消费一条消息,调用
MessageManager::front()
获取队列中的第一条消息。
- 从指定队列中消费一条消息,调用
-
basicAck()
:- 确认消息已被成功消费,通过
MessageManager::ack()
标记该消息为已处理。
- 确认消息已被成功消费,通过
6. 清理操作
void clear()
{
_emp->clear();
_mqmp->clear();
_bmp->clear();
_mmp->clear();
};
clear()
:- 清除虚拟主机内的所有资源,包括交换机、队列、绑定和消息。这通常用于重置或关闭虚拟主机时的资源清理操作。
总结
将消息队列系统中的各个模块分层管理。通过 VirtualHost
类将交换机、队列、绑定、消息管理等功能模块统一整合,提供了虚拟主机层级的操作接口。
在实现中,利用 shared_ptr
进行智能指针管理,保证了内存的安全使用。
同时,消息的持久化处理以及与消息队列工具的结合,提高了系统的可靠性。
三.全部代码
#pragma once
#include "exchange.hpp"
#include "msg_queue.hpp"
#include "binding.hpp"
#include "message.hpp"
namespace mq
{
using msg_ptr = std::shared_ptr<msg::Message>;
class VirtualHost
{
private:
std::string _name; // 虚拟主机名称
ExchangeManager::ptr _emp; // 交换机管理
MsgQueueManager::ptr _mqmp; // 队列管理
BindingManager::ptr _bmp; // 绑定管理
MessageManager::mmp _mmp; // 消息管理
public:
using ptr = std::shared_ptr<VirtualHost>;
VirtualHost(const std::string &name, const std::string &dbname, const std::string &msgdir)
: _name(name),
_emp(std::make_shared<ExchangeManager>(dbname)),
_mqmp(std::make_shared<MsgQueueManager>(dbname)),
_bmp(std::make_shared<BindingManager>(dbname)),
_mmp(std::make_shared<MessageManager>(msgdir))
{
// _emp,_mqmp,_bmp创建后会自动recover
// _mmp 需要手动调用initQueueMsg来recover文件中的数据
// 1.获取_mqmp中的所有队列
msgqueue_map queue_map = _mqmp->selectAll();
// ILOG("数量:%ld",queue_map.size());
// 2.遍历队列,recover每个队列的历史消息
for (auto &kv : queue_map)
{
//ILOG("初始化:%s", kv.first.c_str());
_mmp->initQueueMsg(kv.first);
}
}
// 1. 声明/删除/查找交换机
bool declareExchange(const std::string &name, msg::ExchangeType type,
bool durable,
bool auto_del,
const google::protobuf::Map<std::string, std::string> args)
{
return _emp->declareExchange(name, type, durable, auto_del, args);
}
bool removeExchange(const std::string &name)
{
// 删除交换机+对应的绑定
_bmp->removeByExchange(name);
return _emp->removeExchange(name);
}
bool exchangeExists(const std::string &name)
{
return _emp->exists(name);
}
Exchange::ptr selectExchange(const std::string &name)
{
return _emp->selectExchange(name);
}
// 2. 声明/删除队列
bool declareQueue(const std::string &name, bool durable,
bool exclusive, bool auto_del,
const google::protobuf::Map<std::string, std::string> &args)
{
// 初始化队列的消息+队列
_mmp->initQueueMsg(name);
return _mqmp->declareQueue(name, durable, exclusive, auto_del, args);
}
bool removeQueue(const std::string &name)
{
// 删除队列的消息+对应的绑定+队列
_mmp->destroyQueueMsg(name);
_bmp->removeByQueue(name);
return _mqmp->removeQueue(name);
}
bool queueExists(const std::string &name)
{
return _mqmp->exists(name);
}
// 3. 绑定/解绑
bool bind(const std::string &ename, const std::string &qname, const std::string &key)
{
Exchange::ptr ep = _emp->selectExchange(ename);
if (ep.get() == nullptr)
{
DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());
return false;
}
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if (mqp.get() == nullptr)
{
DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());
return false;
}
return _bmp->bind(ename, qname, key, ep->_durable && mqp->_durable);
}
bool unbind(const std::string &ename, const std::string &qname)
{
Exchange::ptr ep = _emp->selectExchange(ename);
if (ep.get() == nullptr)
{
DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());
return false;
}
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if (mqp.get() == nullptr)
{
DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());
return false;
}
return _bmp->unbind(ename, qname);
}
bool bindExists(const std::string &ename, const std::string &qname)
{
return _bmp->exists(ename, qname);
}
// 4. 获取对应交换机的所有绑定信息(关联路由模块 )
MsgQueueBindingMap selectByExchange(const std::string &ename)
{
if (!_emp->exists(ename))
{
ILOG("交换机:%s 不存在", ename.c_str());
return MsgQueueBindingMap();
}
return _bmp->selectByExchange(ename);
}
// 5. 消息的发布,获取,确认
bool basicPublish(const std::string &qname, msg::BasicAttributes *bp, const std::string &body)
{
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if (mqp.get() == nullptr)
{
DLOG("发布消息失败,队列%s不存在!", qname.c_str());
return false;
}
// std::cout<<mqp->_name<<std::endl;
return _mmp->insertMsg(qname, bp, body, mqp->_durable);
}
msg_ptr basicConsume(const std::string &qname)
{
return _mmp->front(qname);
}
void basicAck(const std::string &qname, const std::string &id)
{
_mmp->ack(qname, id);
}
void clear()
{
_emp->clear();
_mqmp->clear();
_bmp->clear();
_mmp->clear();
};
};
};