文章目录
- 工具模块
- logger.hpp
- helper.hpp
- threadpool.hpp
- 核心概念
- 核心API
- 交换机类型
- 持久化
- ⽹络通信
- 消息应答
- 持久化数据管理中心模块
- 虚拟机管理模块
- 交换路由模块
- 消费者管理模块
- 信道管理模块
- 连接管理模块
- Broker服务器模块
- 消费者管理
- 信道请求模块
- 通信连接模块
- 项⽬模块关系图
工具模块
这个模块没有什么好说的,就是一些工具的编写,方便进行使用
logger.hpp
#ifndef M_LOG_H__
#define M_LOG_H__
#include <iostream>
#include <ctime>
//封装一个日志宏,通过日志宏进行日志的打印,在打印的信息前带有系统时间以及文件名和行号
// [17:26:24] [log.cpp:12] 打开文件失败!
#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\
if (level >= DEFAULT_LEVEL) {\
time_t t = time(nullptr);\
struct tm* ptm = localtime(&t);\
char time_str[32];\
strftime(time_str, 31, "%H:%M:%S", ptm);\
printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\
}\
}
#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
#endif
helper.hpp
sqlite3
class SqliteHelper {
public:
typedef int(*SqliteCallback)(void*,int,char**,char**);
SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX) {
//int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);
if (ret != SQLITE_OK) {
ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));
return false;
}
return true;
}
bool exec(const std::string &sql, SqliteCallback cb, void *arg) {
//int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)
int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
if (ret != SQLITE_OK) {
ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));
return false;
}
return true;
}
void close() {
if (_handler) sqlite3_close_v2(_handler);
}
private:
std::string _dbfile;
sqlite3 *_handler;
};
字符串分割
class StrHelper{
public:
static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result) {
size_t pos, idx = 0;
while(idx < str.size()) {
pos = str.find(sep, idx);
if (pos == std::string::npos) {
result.push_back(str.substr(idx));
return result.size();
}
if (pos == idx) {
idx = pos + sep.size();
continue;
}
result.push_back(str.substr(idx, pos - idx));
idx = pos + sep.size();
}
return result.size();
}
};
生成随机数
class UUIDHelper {
public:
static std::string uuid() {
std::random_device rd;
std::mt19937_64 gernator(rd());
std::uniform_int_distribution<int> distribution(0, 255);
std::stringstream ss;
for (int i = 0; i < 8; i++) {
ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator) ;
if (i == 3 || i == 5 || i == 7) {
ss << "-";
}
}
static std::atomic<size_t> seq(1);
size_t num = seq.fetch_add(1);
for (int i = 7; i >= 0; i--) {
ss << std::setw(2) << std::setfill('0') << std::hex << ((num>>(i*8)) & 0xff);
if (i == 6) ss << "-";
}
return ss.str();
}
};
文件常用操作
class FileHelper {
public:
FileHelper(const std::string &filename):_filename(filename){}
bool exists() //判断文件是否存在
{
struct stat st;
return (stat(_filename.c_str(), &st) == 0);
}
size_t size() { //文件大小
struct stat st;
int ret = stat(_filename.c_str(), &st);
if (ret < 0) {
return 0;
}
return st.st_size;
}
bool read(char *body, size_t offset, size_t len) //读取文件,从offset位置读取len长度
{
//1. 打开文件
std::ifstream ifs(_filename, std::ios::binary | std::ios::in);
if (ifs.is_open() == false) {
ELOG("%s 文件打开失败!", _filename.c_str());
return false;
}
//2. 跳转文件读写位置
ifs.seekg(offset, std::ios::beg);
//3. 读取文件数据
ifs.read(body, len);
if (ifs.good() == false) {
ELOG("%s 文件读取数据失败!!", _filename.c_str());
ifs.close();
return false;
}
//4. 关闭文件
ifs.close();
return true;
}
bool read(std::string &body) {
//获取文件大小,根据文件大小调整body的空间
size_t fsize = this->size();
body.resize(fsize);
return read(&body[0], 0, fsize);
}
bool write(const char *body, size_t offset, size_t len) {//向文件写入
//1. 打开文件
std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out);
if (fs.is_open() == false) {
ELOG("%s 文件打开失败!", _filename.c_str());
return false;
}
//2. 跳转到文件指定位置
fs.seekp(offset, std::ios::beg);
//3. 写入数据
fs.write(body, len);
if (fs.good() == false) {
ELOG("%s 文件写入数据失败!!", _filename.c_str());
fs.close();
return false;
}
//4. 关闭文件
fs.close();
return true;
}
bool write(const std::string &body) {
return write(body.c_str(), 0, body.size());
}
bool rename(const std::string &nname) {//重命名
return (::rename(_filename.c_str(), nname.c_str()) == 0);
}
static std::string parentDirectory(const std::string &filename) {//获取父文件目录
// /aaa/bb/ccc/ddd/test.txt
size_t pos = filename.find_last_of("/");
if (pos == std::string::npos) {
// test.txt
return "./";
}
std::string path = filename.substr(0, pos);
return path;
}
static bool createFile(const std::string &filename) {//创建文件
std::fstream ofs(filename, std::ios::binary | std::ios::out);
if (ofs.is_open() == false) {
ELOG("%s 文件打开失败!", filename.c_str());
return false;
}
ofs.close();
return true;
}
static bool removeFile(const std::string &filename) {//删除文件
return (::remove(filename.c_str()) == 0);
}
static bool createDirectory(const std::string &path) {//创建父文件目录
// aaa/bbb/ccc cccc
// 在多级路径创建中,我们需要从第一个父级目录开始创建
size_t pos, idx = 0;
while(idx < path.size()) {
pos = path.find("/", idx);
if (pos == std::string::npos) {
return (mkdir(path.c_str(), 0775) == 0);
}
std::string subpath = path.substr(0, pos);
int ret = mkdir(subpath.c_str(), 0775);
if (ret != 0 && errno != EEXIST) {
ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));
return false;
}
idx = pos + 1;
}
return true;
}
static bool removeDirectory(const std::string &path) {//删除父文件目录
// rm -rf path
// system()
std::string cmd = "rm -rf " + path;
return (system(cmd.c_str()) != -1);
}
private:
std::string _filename;
};
threadpool.hpp
class threadpool {
public:
using ptr = std::shared_ptr<threadpool>;
using Functor = std::function<void(void)>;
threadpool(int thr_count = 1) : _stop(false){
for (int i = 0; i < thr_count; i++) {
_threads.emplace_back(&threadpool::entry, this);
}
}
~threadpool() {
stop();
}
void stop() {
if (_stop == true) return;
_stop = true;
_cv.notify_all();
for (auto &thread : _threads) {
thread.join();
}
}
//push传入的是首先有一个函数--用户要执行的函数, 接下来是不定参,表示要处理的数据也就是要传入到函数中的参数
//push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),
//使用lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中,由工作线程取出进行执行
template<typename F, typename ...Args>
auto push(F &&func, Args&& ...args) -> std::future<decltype(func(args...))> {
//1. 将传入的函数封装成一个packaged_task任务
using return_type = decltype(func(args...));
auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
std::future<return_type> fu = task->get_future();
//2. 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象
{
std::unique_lock<std::mutex> lock(_mutex);
//3. 将构造出来的匿名函数对象,抛入到任务池中
_taskpool.push_back( [task](){ (*task)(); } );
_cv.notify_one();
}
return fu;
}
private:
//线程入口函数---内部不断的从任务池中取出任务进行执行。
void entry() {
while(!_stop){
std::vector<Functor> tmp_taskpool;
{
//加锁
std::unique_lock<std::mutex> lock(_mutex);
//等待任务池不为空,或者_stop被置位返回,
_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });
//取出任务进行执行
tmp_taskpool.swap(_taskpool);
}
for (auto &task : tmp_taskpool) {
task();
}
}
}
private:
std::atomic<bool> _stop;
std::vector<Functor> _taskpool;//任务池
std::mutex _mutex;
std::condition_variable _cv;
std::vector<std::thread> _threads;
};
核心概念
- ⽣产者(Producer)
- 消费者(Consumer)
- 中间⼈(Broker)
- 发布(Publish)
- 订阅Subscribe)
- ⼀个⽣产者,一个消费者
- N个⽣产者,N个消费者
其中,Broker Server是最核⼼的部分, 负责消息的存储和转发。
上述数据结构,既需要在内存中存储,也需要在硬盘中存储
- 内存存储: ⽅便使⽤
- 硬盘存储:重启数据不丢失
核心API
对于Broker来说,要实现以下核⼼API,通过这些API来实现消息队列的基本功能
另⼀⽅⾯,Producer和Consumer则通过⽹络的⽅式,远程调⽤这些API,实现⽣产者消费者模型
交换机类型
对于RabbitMQ来说,主要⽀持三种交换机类型:
- Direct : ⽣产者发送消息时,直接指定被该交换机绑定的队列
- Fanout : ⽣产者发送的消息会被复制到该交换机的所有队列中
- Topic : 绑定队列到交换机上时,指定⼀个字符串为bindingKey。发送消息指定⼀个字符串为routingKey。当routingKey和bindingKey满⾜⼀定的匹配条件的时候,则把消息投递到指定队列。
持久化
Exchange,Queue,Binding,Message等数据都有持久化需求
当程序重启/主机重启保证 , 上述内容不丢失。
⽹络通信
⽣产者和消费者都是客⼾端程序,Broker则是作为服务器,通过⽹络进⾏通信。
在⽹络通信的过程中,客⼾端部分要提供对应的api,来实现对服务器的操作。
可以看到,在Broker的基础上,客⼾端还要增加Connection操作和Channel操作
- Connection对应⼀个TCP连接
- Channel则是Connection中的逻辑通道
⼀个Connection中可以包含多个Channel。Channel和Channel之间的数据是独⽴的,不会相互⼲扰。这样做主要是为了能够更好的复⽤TCP连接,达到⻓连接的效果,避免频繁的创建关闭TCP连接。
消息应答
被消费的消息,需要进⾏应答。应答模式分成两种:
持久化数据管理中心模块
虚拟机管理模块
因为交换机/队列/绑定都是基于虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块的整合模块。
交换路由模块
当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。
消费者管理模块
消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)
信道管理模块
本质上,在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。
连接管理模块
本质上,咱们仿照实现的服务器是通过muduo库来实现底层通信的,⽽这⾥的连接管理,更多的是对muduo库中的Connection进⾏⼆次封装管理,并额外提供项⽬所需操作。
Broker服务器模块
整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务。
消费者管理
消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在。
信道请求模块
与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。
通信连接模块
向⽤⼾提供⼀个⽤于实现⽹络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与服务端进⾏⽹络通信。