一.MsgQueue相关类介绍
二.MsgQueue类的实现
成员变量
MsgQueue
结构体用于描述一个消息队列的基本属性。
std::string _name; // 队列名称
bool _durable; // 队列是否持久化
bool _exclusive; // 队列是否独占
bool _auto_del; // 队列是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 队列的附加参数
构造函数与析构函数
结构体提供了一个带参数的构造函数用于初始化各成员变量。同时提供了一个默认构造函数以支持无参初始化。
MsgQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &_args)
: _name(name), _durable(durable), _exclusive(exclusive), _auto_del(auto_del), _args(_args) {}
成员函数
MsgQueue
提供了两个主要的成员函数,用于设置和获取队列的附加参数。
参数设置函数 setArgs
setArgs
方法接收一个格式化的字符串参数,并将其解析为键值对,存储在 _args
中。
该方法使用了 StrHelper::split
函数来分割字符串,通过 =
符号区分键和值,并存储在 _args
中。
参数获取函数 getArgs
getArgs
方法用于将 _args
中的键值对组合成格式化字符串,返回给调用者。
std::string getArgs()
{
std::string ret;
for (auto &kv : _args)
{
ret += kv.first + "=" + kv.second + "&";
}
return ret;
}
三.MsgQueueMapper类的实现
成员变量
SqliteHelper sql_helper; //数据库管理句柄
sql_helper
:SqliteHelper
是一个辅助类,封装了SQLite的基本操作,如执行SQL语句和管理数据库连接。
构造函数
MsgQueueMapper(const std::string &dbname) : sql_helper(dbname)
{
std::string path = FileHelper::getParentDirName(dbname);
FileHelper::createDir(path);
if (!sql_helper.open())
{
ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());
assert(0);
}
createTable();
}
构造函数接受一个数据库名称参数,初始化 sql_helper
,并在必要时创建数据库目录和表。
成员函数
创建表格函数 createTable
void createTable()
{
std::stringstream sql;
sql << "create table if not exists msg_queue(";
sql << "name varchar(64) primary key,";
sql << "durable int,";
sql << "exclusive int,";
sql << "auto_del int,";
sql << "args varchar(64));";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());
assert(0);
}
}
删除表格函数 dropTable
dropTable
方法用于删除现有的消息队列表格。该方法执行删除表格的SQL语句,如果表格存在,它将被移除。
void dropTable()
{
std::stringstream sql;
sql << "drop table if exists msg_queue;";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());
assert(0);
}
}
插入数据函数 insert
insert
方法将 MsgQueue
对象插入到数据库中。
void insert(MsgQueue::ptr &msg_queue_ptr)
{
std::stringstream sql;
sql << "insert into msg_queue(name, durable, exclusive, auto_del, args) values(";
sql << "'" << msg_queue_ptr->_name << "',";
sql << msg_queue_ptr->_durable << ",";
sql << msg_queue_ptr->_exclusive << ",";
sql << msg_queue_ptr->_auto_del << ",";
sql << "'" << msg_queue_ptr->getArgs() << "');";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper insert msg_queue failed:%s", sql.str().c_str());
assert(0);
}
}
删除数据函数 remove
remove
方法用于从数据库中删除指定名称的消息队列。
void remove(const std::string &name)
{
std::stringstream sql;
sql << "delete from msg_queue where name = '" << name << "';";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper remove msg_queue failed:%s", sql.str().c_str());
assert(0);
}
}
数据恢复函数 recover
recover
方法用于从数据库中恢复所有消息队列。该方法执行SQL查询语句,遍历结果集,并将每个消息队列恢复到内存中的 _msgQueues
容器中。
msgqueue_map recover()
{
msgqueue_map ret;
std::string sql = "select * from msg_queue;";
if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret))
{
ELOG("MsgQueueMapper recover failed:%s", sql.c_str());
assert(0);
}
return ret;
}
msgQueueMapCb
- 该静态回调函数在
recover
函数中被调用,用于将数据库查询结果中的每一行转化为一个 MsgQueue对象,并将其存储到 msgqueue_map中。
static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names)
{
msgqueue_map *ret = (msgqueue_map *)arg;
MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();
msg_queue_ptr->_name = col_values[0];
msg_queue_ptr->_durable = std::stoi(col_values[1]);
msg_queue_ptr->_exclusive = std::stoi(col_values[2]);
msg_queue_ptr->_auto_del = std::stoi(col_values[3]);
if (col_values[4])
{
msg_queue_ptr->setArgs((std::string)col_values[4]);
}
else
ELOG("没有其它参数");
ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));
return 0;
}
四.MsgQueueMapper类的实现
MsgQueueManager
类用于管理内存中的消息队列对象,并与 MsgQueueMapper
协作实现消息队列的持久化。
成员变量
_msgQueues
:一个键值对映射,用于存储当前内存中的所有消息队列。_mapper
:MsgQueueMapper
对象,用于与数据库进行持久化操作。_mutex
:用于保护消息队列操作的线程安全。
构造函数
构造函数在初始化时会调用 MsgQueueMapper
的 recover
方法,从数据库恢复所有消息队列。
MsgQueueManager(const std::string &dbname) : _mapper(dbname)
{
_mapper.recover(_msgQueues);
}
成员函数
声明队列函数 declareQueue
declareQueue
方法用于声明一个新的消息队列,并将其加入到内存和数据库中。
该方法首先检查队列是否已经存在,若不存在,则创建并插入新的队列。如果队列需要持久化,则还会将其插入到数据库中。
bool declareQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &args)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it != _msgQueues.end())
{
ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())
return true;
}
MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);
_msgQueues.insert(std::make_pair(name, msg_queue_ptr));
if (msg_queue_ptr->_durable)
_mapper.insert(msg_queue_ptr);
return true;
}
删除队列函数 removeQueue
removeQueue
方法用于删除指定名称的消息队列。
bool removeQueue(const std::string &name)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it == _msgQueues.end())
{
ELOG("MsgQueueManager removeQueue:%s not exists", name.c_str());
return false;
}
if (it->second->_durable)
_mapper.remove(name);
_msgQueues.erase(it);
return true;
}
查询队列函数 selectQueue
selectQueue
方法根据队列名称查询并返回消息队列对象
该方法用于在内存中查找指定名称的队列,并返回指向该队列的智能指针。
MsgQueue::ptr selectQueue(const std::string &name)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it == _msgQueues.end())
{
ELOG("MsgQueueManager selectQueue:%s not exists", name.c_str());
return nullptr;
}
return it->second;
}
查询所有队列函数 selectAll
msgqueue_map selectAll()
{
return _msgQueues;
}
队列是否存在函数 exists
exists
方法用于判断指定名称的消息队列是否存在。
bool exists(const std::string &name) { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.find(name) != _msgQueues.end(); }
该方法返回一个布尔值,指示队列是否存在。
队列数量函数 size
size
方法用于返回当前存在的消息队列数量。
size_t size() { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.size(); }
该方法返回一个整数值,表示内存中队列的数量。
清空所有队列函数 clear
clear
方法用于清空所有当前存在的消息队列。
void clear() { std::lock_guard<std::mutex> lock(_mutex); _msgQueues.clear(); _mapper.dropTable(); _mapper.createTable(); }
该方法清空内存中的所有队列,并在数据库中删除和重建消息队列表格。
五.MsgQueue.hpp所有代码
#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <google/protobuf/map.h>
namespace mq
{
struct MsgQueue
{
std::string _name;
bool _durable; // 是否持久化
bool _exclusive; // 是否独占
bool _auto_del; // 是否自动删除
google::protobuf::Map<std::string, std::string> _args;
using ptr = std::shared_ptr<MsgQueue>;
MsgQueue(const std::string &name, bool durable, bool exclusive,
bool auto_del, const google::protobuf::Map<std::string, std::string> &_args)
: _name(name),
_durable(durable),
_exclusive(exclusive),
_auto_del(auto_del),
_args(_args)
{
}
MsgQueue() {}
void setArgs(const std::string &str_args)
{
std::vector<std::string> args;
size_t sz = StrHelper::split(str_args, "&", args);
for (auto &kv : args)
{
size_t pos = kv.find("=");
if (pos == std::string::npos)
{
ELOG("MsgQueue args format error:%s", kv.c_str());
assert(0);
}
std::string key = kv.substr(0, pos);
std::string val = kv.substr(pos + 1);
_args[key] = val;
}
}
std::string getArgs()
{
std::string ret;
for (auto &kv : _args)
{
ret += kv.first + "=" + kv.second + "&";
}
return ret;
}
};
using msgqueue_map = std::unordered_map<std::string, std::shared_ptr<MsgQueue>>;
class MsgQueueMapper
{
private:
SqliteHelper sql_helper;
public:
MsgQueueMapper(const std::string &dbname)
: sql_helper(dbname)
{
// 数据库有path即可,open时自动创建文件
std::string path = FileHelper::getParentDirName(dbname);
FileHelper::createDir(path);
if (!sql_helper.open())
{
ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());
assert(0);
}
createTable();
}
// 1.创建,删除表
void createTable()
{
std::stringstream sql;
sql << "create table if not exists msg_queue(";
sql << "name varchar(64) primary key,";
sql << "durable int,";
sql << "exclusive int,";
sql << "auto_del int,";
sql << "args varchar(64));";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());
assert(0);
}
}
void dropTable()
{
std::stringstream sql;
sql << "drop table if exists msg_queue;";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());
assert(0);
}
}
// 2.插入/删除数据
void insert(MsgQueue::ptr &msg_queue_ptr)
{
std::stringstream sql;
sql << "insert into msg_queue (name,durable,exclusive,auto_del,args) values(";
sql << "'" << msg_queue_ptr->_name << "',";
sql << msg_queue_ptr->_durable << ",";
sql << msg_queue_ptr->_exclusive << ",";
sql << msg_queue_ptr->_auto_del << ",";
sql << "'" << msg_queue_ptr->getArgs() << "');";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper insert failed:%s", sql.str().c_str());
assert(0);
}
}
void remove(const std::string &name)
{
std::stringstream sql;
sql << "delete from msg_queue where name='" << name << "';";
if (!sql_helper.exec(sql.str(), nullptr, nullptr))
{
ELOG("MsgQueueMapper remove failed:%s", sql.str().c_str());
assert(0);
}
}
// 3.recover
msgqueue_map recover()
{
msgqueue_map ret;
std::string sql = "select * from msg_queue;";
if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret))
{
ELOG("MsgQueueMapper recover failed:%s", sql.c_str());
assert(0);
}
return ret;
}
private:
static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names)
{
msgqueue_map *ret = (msgqueue_map *)arg;
MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();
msg_queue_ptr->_name = col_values[0];
msg_queue_ptr->_durable = std::stoi(col_values[1]);
msg_queue_ptr->_exclusive = std::stoi(col_values[2]);
msg_queue_ptr->_auto_del = std::stoi(col_values[3]);
if (col_values[4])
{
msg_queue_ptr->setArgs((std::string)col_values[4]);
}
else
ELOG("没有其它参数");
ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));
return 0;
}
};
class MsgQueueManager
{
public:
using ptr = std::shared_ptr<MsgQueueManager>;
private:
msgqueue_map _msgQueues;
std::mutex _mutex;
MsgQueueMapper _mapper;
public:
MsgQueueManager(const std::string &dbname)
: _mapper(dbname)
{
_msgQueues = _mapper.recover();
}
// 1. 插入/删除数据
bool declareQueue(const std::string &name,
bool durable,
bool exclusive,
bool auto_del,
const google::protobuf::Map<std::string, std::string> &args)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it != _msgQueues.end())
{
ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())
return true;
}
MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);
_msgQueues.insert(std::make_pair(name, msg_queue_ptr));
if (msg_queue_ptr->_durable)
_mapper.insert(msg_queue_ptr);
// std::cout<<"declare队列:"<<msg_queue_ptr->_name<<std::endl;
// std::cout<<"队列的个数:"<<_msgQueues.size()<<std::endl;
return true;
}
bool removeQueue(const std::string &name)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it == _msgQueues.end())
{
ILOG("MsgQueueManager removeQueue:%s not exists", name.c_str());
return true;
}
_msgQueues.erase(it);
if (it->second->_durable)
_mapper.remove(name);
return true;
}
// 2. 查询一个/所有 queue
MsgQueue::ptr selectQueue(const std::string &name)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it == _msgQueues.end())
{
ILOG("MsgQueueManager select:%s not exists", name.c_str());
return MsgQueue::ptr();
}
return it->second;
}
msgqueue_map selectAll()
{
return _msgQueues;
}
// 3. 其它操作
bool exists(const std::string &name)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _msgQueues.find(name);
if (it == _msgQueues.end())
{
ILOG("MsgQueueManager :%s not exists", name.c_str());
return false;
}
return true;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
return _msgQueues.size();
}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.dropTable();
_msgQueues.clear();
}
};
};