工具模块及项目整体模块框架

news2024/11/27 12:37:37

文章目录

  • 工具模块
    • logger.hpp
    • helper.hpp
    • threadpool.hpp
  • 核心概念
    • 核心API
    • 交换机类型
    • 持久化
    • ⽹络通信
    • 消息应答
    • 持久化数据管理中心模块
    • 虚拟机管理模块
    • 交换路由模块
    • 消费者管理模块
    • 信道管理模块
    • 连接管理模块
    • Broker服务器模块
    • 消费者管理
    • 信道请求模块
    • 通信连接模块
    • 项⽬模块关系图

工具模块

这个模块没有什么好说的,就是一些工具的编写,方便进行使用

logger.hpp

#ifndef M_LOG_H__
#define M_LOG_H__
#include <iostream>
#include <ctime>

//封装一个日志宏,通过日志宏进行日志的打印,在打印的信息前带有系统时间以及文件名和行号
//     [17:26:24] [log.cpp:12] 打开文件失败! 

#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\
    if (level >= DEFAULT_LEVEL) {\
        time_t t = time(nullptr);\
        struct tm* ptm = localtime(&t);\
        char time_str[32];\
        strftime(time_str, 31, "%H:%M:%S", ptm);\
        printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\
    }\
}

#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)

#endif

helper.hpp

sqlite3

class SqliteHelper {
    public:
        typedef int(*SqliteCallback)(void*,int,char**,char**);
        SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
        
        bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX) {
            //int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
            int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);
            if (ret != SQLITE_OK) {
                ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));
                return false;
            }
            return true;
        }
        bool exec(const std::string &sql, SqliteCallback cb, void *arg) {
            //int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)
            int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
            if (ret != SQLITE_OK) {
                ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));
                return false;
            }
            return true;
        }
        void close() {
            if (_handler) sqlite3_close_v2(_handler);
        }
    private:
        std::string _dbfile;
        sqlite3 *_handler;
};

字符串分割

class StrHelper{
    public:
        static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result) {
            size_t pos, idx = 0;
            while(idx < str.size()) {
                pos = str.find(sep, idx);
                if (pos == std::string::npos) {
                    result.push_back(str.substr(idx));
                    return result.size();
                }
                if (pos == idx) {
                    idx = pos + sep.size();
                    continue;
                }
                result.push_back(str.substr(idx, pos - idx));
                idx = pos + sep.size();
            }
            return result.size();
        }
};

生成随机数

class UUIDHelper {
    public:
        static std::string uuid() {
            std::random_device rd;
            std::mt19937_64 gernator(rd());
            std::uniform_int_distribution<int> distribution(0, 255);
            std::stringstream ss;
            for (int i = 0; i < 8; i++) {
                ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator) ;
                if (i == 3 || i == 5 || i == 7) {
                    ss << "-";
                }
            }
            static std::atomic<size_t> seq(1);
            size_t num = seq.fetch_add(1);
            for (int i = 7; i >= 0; i--) {
                ss << std::setw(2) << std::setfill('0') << std::hex << ((num>>(i*8)) & 0xff);
                if (i == 6) ss << "-";
            }
            return ss.str();
        }
};

文件常用操作

class FileHelper {
    public:
        FileHelper(const std::string &filename):_filename(filename){}
        bool exists() //判断文件是否存在
        {
            struct stat st;
            return (stat(_filename.c_str(), &st) == 0);
        }
        size_t size() {  //文件大小
            struct stat st;
            int ret = stat(_filename.c_str(), &st);
            if (ret < 0) {
                return 0;
            }
            return st.st_size;
        }
        bool read(char *body, size_t offset, size_t len) //读取文件,从offset位置读取len长度
        {
            //1. 打开文件
            std::ifstream ifs(_filename, std::ios::binary | std::ios::in); 
            if (ifs.is_open() == false) {
                ELOG("%s 文件打开失败!", _filename.c_str());
                return false;
            }
            //2. 跳转文件读写位置
            ifs.seekg(offset, std::ios::beg);
            //3. 读取文件数据
            ifs.read(body, len);
            if (ifs.good() == false) {
                ELOG("%s 文件读取数据失败!!", _filename.c_str());
                ifs.close();
                return false;
            }
            //4. 关闭文件
            ifs.close();
            return true;
        }
        bool read(std::string &body) {
            //获取文件大小,根据文件大小调整body的空间
            size_t fsize = this->size();
            body.resize(fsize);
            return read(&body[0], 0, fsize);
        }
        bool write(const char *body, size_t offset, size_t len) {//向文件写入
            //1. 打开文件
            std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out); 
            if (fs.is_open() == false) {
                ELOG("%s 文件打开失败!", _filename.c_str());
                return false;
            }
            //2. 跳转到文件指定位置
            fs.seekp(offset, std::ios::beg);
            //3. 写入数据
            fs.write(body, len);
            if (fs.good() == false) {
                ELOG("%s 文件写入数据失败!!", _filename.c_str());
                fs.close();
                return false;
            }
            //4. 关闭文件
            fs.close();
            return true;
        }
        bool write(const std::string &body) {
            return write(body.c_str(), 0, body.size());
        }
        bool rename(const std::string &nname) {//重命名
            return (::rename(_filename.c_str(), nname.c_str()) == 0);
        }
        static std::string parentDirectory(const std::string &filename) {//获取父文件目录
            // /aaa/bb/ccc/ddd/test.txt
            size_t pos = filename.find_last_of("/");
            if (pos == std::string::npos) {
                // test.txt
                return "./";
            }
            std::string path = filename.substr(0, pos);
            return path;
        }
        static bool createFile(const std::string &filename) {//创建文件
            std::fstream ofs(filename, std::ios::binary | std::ios::out); 
            if (ofs.is_open() == false) {
                ELOG("%s 文件打开失败!", filename.c_str());
                return false;
            }
            ofs.close();
            return true;
        }
        static bool removeFile(const std::string &filename) {//删除文件
            return (::remove(filename.c_str()) == 0);
        }
        static bool createDirectory(const std::string &path) {//创建父文件目录
            //  aaa/bbb/ccc    cccc
            // 在多级路径创建中,我们需要从第一个父级目录开始创建
            size_t pos, idx = 0;
            while(idx < path.size()) {
                pos = path.find("/", idx);
                if (pos == std::string::npos) {
                    return (mkdir(path.c_str(), 0775) == 0);
                }
                std::string subpath = path.substr(0, pos);
                int ret = mkdir(subpath.c_str(), 0775);
                if (ret != 0 && errno != EEXIST) {
                    ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));
                    return false;
                }
                idx = pos + 1;
            }
            return true;
        }
        static bool removeDirectory(const std::string &path) {//删除父文件目录
            // rm -rf path
            // system()
            std::string cmd = "rm -rf " + path;
            return (system(cmd.c_str()) != -1);
        }
    private:
        std::string _filename;
};

threadpool.hpp

class threadpool {
    public:
        using ptr = std::shared_ptr<threadpool>;
        using Functor = std::function<void(void)>;
        threadpool(int thr_count = 1) : _stop(false){
            for (int i = 0; i < thr_count; i++) {
                _threads.emplace_back(&threadpool::entry, this);
            }
        }
        ~threadpool() {
            stop();
        }
        void stop() {
            if (_stop == true) return;
            _stop = true;
            _cv.notify_all();
            for (auto &thread : _threads) {
                thread.join();
            }
        }
        //push传入的是首先有一个函数--用户要执行的函数, 接下来是不定参,表示要处理的数据也就是要传入到函数中的参数
        //push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),
        //使用lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中,由工作线程取出进行执行
        template<typename F, typename ...Args>
        auto push(F &&func, Args&& ...args) -> std::future<decltype(func(args...))> {
            //1. 将传入的函数封装成一个packaged_task任务
            using return_type = decltype(func(args...));
            auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
            auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
            std::future<return_type> fu = task->get_future();
            //2. 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象
            {
                std::unique_lock<std::mutex> lock(_mutex);
                //3. 将构造出来的匿名函数对象,抛入到任务池中
                _taskpool.push_back(   [task](){ (*task)(); }  );
                _cv.notify_one();
            }
            return fu;
        }

    private:
        //线程入口函数---内部不断的从任务池中取出任务进行执行。
        void entry() {
            while(!_stop){
                std::vector<Functor> tmp_taskpool;
                {
                    //加锁
                    std::unique_lock<std::mutex> lock(_mutex);
                    //等待任务池不为空,或者_stop被置位返回,
                    _cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });
                    //取出任务进行执行
                    tmp_taskpool.swap(_taskpool);
                }
                for (auto &task : tmp_taskpool) {
                    task();
                }
            }
        }
    private:
        std::atomic<bool> _stop;
        std::vector<Functor> _taskpool;//任务池
        std::mutex _mutex;
        std::condition_variable _cv;
        std::vector<std::thread> _threads;
};

核心概念

  • ⽣产者(Producer)
  • 消费者(Consumer)
  • 中间⼈(Broker)
  • 发布(Publish)
  • 订阅Subscribe)
  • ⼀个⽣产者,一个消费者
    在这里插入图片描述
  • N个⽣产者,N个消费者
    在这里插入图片描述
    其中,Broker Server是最核⼼的部分, 负责消息的存储和转发。

在这里插入图片描述
在这里插入图片描述
上述数据结构,既需要在内存中存储,也需要在硬盘中存储

  • 内存存储: ⽅便使⽤
  • 硬盘存储:重启数据不丢失

核心API

对于Broker来说,要实现以下核⼼API,通过这些API来实现消息队列的基本功能

在这里插入图片描述

另⼀⽅⾯,Producer和Consumer则通过⽹络的⽅式,远程调⽤这些API,实现⽣产者消费者模型

交换机类型

对于RabbitMQ来说,主要⽀持三种交换机类型:

  • Direct : ⽣产者发送消息时,直接指定被该交换机绑定的队列
  • Fanout : ⽣产者发送的消息会被复制到该交换机的所有队列中
  • Topic : 绑定队列到交换机上时,指定⼀个字符串为bindingKey。发送消息指定⼀个字符串为routingKey。当routingKey和bindingKey满⾜⼀定的匹配条件的时候,则把消息投递到指定队列。

持久化

Exchange,Queue,Binding,Message等数据都有持久化需求
当程序重启/主机重启保证 , 上述内容不丢失。

⽹络通信

⽣产者和消费者都是客⼾端程序,Broker则是作为服务器,通过⽹络进⾏通信。
在⽹络通信的过程中,客⼾端部分要提供对应的api,来实现对服务器的操作。

在这里插入图片描述
可以看到,在Broker的基础上,客⼾端还要增加Connection操作和Channel操作

  • Connection对应⼀个TCP连接
  • Channel则是Connection中的逻辑通道

⼀个Connection中可以包含多个Channel。Channel和Channel之间的数据是独⽴的,不会相互⼲扰。这样做主要是为了能够更好的复⽤TCP连接,达到⻓连接的效果,避免频繁的创建关闭TCP连接。

消息应答

被消费的消息,需要进⾏应答。应答模式分成两种:

在这里插入图片描述

持久化数据管理中心模块

在这里插入图片描述

虚拟机管理模块

因为交换机/队列/绑定都是基于虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块的整合模块。

在这里插入图片描述

交换路由模块

当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。

在这里插入图片描述
在这里插入图片描述

消费者管理模块

消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。

因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)

在这里插入图片描述

信道管理模块

本质上,在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。

在这里插入图片描述

连接管理模块

本质上,咱们仿照实现的服务器是通过muduo库来实现底层通信的,⽽这⾥的连接管理,更多的是对muduo库中的Connection进⾏⼆次封装管理,并额外提供项⽬所需操作。

在这里插入图片描述

Broker服务器模块

整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务。
在这里插入图片描述

消费者管理

消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在。

在这里插入图片描述

信道请求模块

与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。

在这里插入图片描述

通信连接模块

向⽤⼾提供⼀个⽤于实现⽹络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与服务端进⾏⽹络通信。

在这里插入图片描述

项⽬模块关系图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2187121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Oracle SQL语句没有过滤条件,究竟是否会走索引??

答案是&#xff1a;可能走索引也可能不走索引&#xff0c;具体要看列的值可不可为null&#xff0c;Oracle不会为所有列的nullable属性都为Y的sql语句走索引。 例子&#xff1a; create table t as select * from dba_objects; CREATE INDEX ix_t_name ON t(object_id, objec…

MySQL 中的 GTID 复制详解

MySQL 中的 GTID 复制详解 在 MySQL 的复制架构中&#xff0c;GTID&#xff08;Global Transaction Identifier&#xff09;复制是一种重要的技术&#xff0c;它为数据库的复制提供了更强大的功能和更高的可靠性。本文将深入探讨 MySQL 中的 GTID 复制是什么&#xff0c;以及它…

OpenCV计算机视觉库

计算机视觉和图像处理 Tensorflow入门深度神经网络图像分类目标检测图像分割OpenCVPytorchNLP自然语言处理 OpenCV 一、OpenCV简介1.1 简介1.2 OpenCV部署1.3 OpenCV模块 二、OpenCV基本操作2.1 图像的基本操作2.1.1 图像的IO操作2.1.2 绘制几何图像2.1.3 获取并修改图像的像素…

时间相关数据的统计分析(笔记更新中)

对事件相关数据的统计思路做一个笔记 可以用作肿瘤生长曲线&#xff08;Tumor Growth Curve&#xff09;/某一个药物处理后不同时间点表型的获取类型的数据。 总体来说合适的有两类&#xff0c;一类是以ANOVA为基础的方差分析&#xff0c;重复测量资料的方差分析&#xff1b;…

D - Connect the Dots Codeforces Round 976 (Div. 2)

原题 D - Connect the Dots 思路 直接去做的话会超时, 因此用差分去优化 代码 #include <bits/stdc.h> using namespace std;int f[200020]; int z; int b[11][200020];// 并查集的 find 函数 int find(int x) {return f[x] ! x ? f[x] find(f[x]) : x; }// 检查是…

食品饮料小程序搭建私域会员管理

食品饮料是商超主要经营类目之一&#xff0c;多样化的品牌/厂商/渠道/经销商&#xff0c;客户在消费方面购物渠道和选择范围广&#xff0c;无论厂商还是线下门店/线上电商都需要围绕流量/会员开展生意获得更多营收。 小程序开店基于微信平台生态分享宣传、用户店铺方便购物及提…

Flutter与原生代码通信

文章目录 1. 知识回顾2. 示例代码3. 经验总结我们在上一章回中介绍了通道相关的内容,本章回中将介绍其中的一种通道:MethodChannnel.闲话休提,让我们一起Talk Flutter吧。 1. 知识回顾 我们在上一章回中介绍了通道的概念和作用,并且提到了通道有不同的类型,本章回将其中一…

【C++】类与对象基础概念解析

恭喜你学习完C语言与数据结构的有关内容&#xff0c;现在让我们开始进行对C的学习吧~ &#x1f49d;&#x1f49d;&#x1f49d;如果你对C语言或数据结构还存在疑惑&#xff0c;欢迎观看我之前的作品 &#x1f449;【数据结构】 &#x1f449;【C语言】 目录 一、引言 二、类…

【2024年最新】基于springboot+mysql就业信息管理系统

技术摘要 技术框架&#xff1a;以springboot作为框架&#xff0c;业务模式&#xff1a;B/S模式数据库&#xff1a;MySql作为后台运行的数据库服务器&#xff1a;使用Tomcat用为系统的服务器 系统展示 系统实现功能 本次实现一个就业信息管理系统&#xff0c;通过这个系统能够…

vscode安装及c++配置编译

1、VScode下载 VS Code官网下载地址&#xff1a;Visual Studio Code - Code Editing. Redefined。 2、安装中文插件 搜索chinese&#xff0c;点击install下载安装中文插件。 3、VS Code配置C/C开发环境 3.1、MinGW-w64下载 VS Code是一个高级的编辑器&#xff0c;只能用来写代…

嵌入式系统中qt开发 Qdebug输出中文的时候变成了问号 ??? bulideroot制作的根文件系统

嵌入式系统中qt开发 Qdebug输出&#xff1f;&#xff1f;&#xff1f; bulideroot制作的根文件系统 这个问题我找了三四天了&#xff0c;因为的字符也配置了 /etc/profile中qt的环境变量我也配置了 我的/usr/share/fonts也是有字库的&#xff0c;但是qt输出的中文全是&#…

windows 11 LTSC 26100.1742 官方简体中文版

系统简介 Windows 11 LTSC&#xff08;长期服务通道&#xff09;是一个专为长期稳定性和可靠性设计的Windows 11变体&#xff0c;适合于需要最小更新和更改的关键任务系统和设备。与常规版本相比&#xff0c;LTSC版本的特点是更新频率较低&#xff0c;目的是为了保持系统的稳定…

从零开始掌握YOLOv11:揭秘三大损失函数的理想值(源码+实战)

相关文章&#xff1a; YOLOv1–v11: 版本演进及其关键技术解析-CSDN博客 YOLOv11&#xff1a;重新定义实时目标检测的未来-CSDN博客 Yolo v11目标检测实战1&#xff1a;对象分割和人流跟踪&#xff08;附源码&#xff09;-CSDN博客 YOLOv11目标检测实战2&#xff1a;人流统计…

win10下cuda12.1 +troch2.4.1+vs2022环境下编译安装flash-attn

步骤一 下载项目 先下载 https://github.com/Dao-AILab/flash-attention&#xff0c;然后在conda环境中进入项目目录 步骤二 安装依赖项 执行以下命令&#xff0c;安装cutlass库&#xff0c;该库为编译flash-attn的必须依赖 conda update --force conda conda install conda…

Linux文件重定向文件缓冲区

目录 一、C文件接口 二、系统文件I/O 2.1认识系统文件I/O 2.2系统文件I/O 2.3系统调用和库函数 2.4open( )的返回值--文件描述符 2.5访问文件的本质 三、文件重定向 3.1认识文件重定向 3.2文件重定向的本质 3.3在shell中添加重定向功能 3.4stdout和stderr 3.5如何理…

Java | Leetcode Java题解之第446题等差数列划分II-子序列

题目&#xff1a; 题解&#xff1a; class Solution {public int numberOfArithmeticSlices(int[] nums) {int ans 0;int n nums.length;Map<Long, Integer>[] f new Map[n];for (int i 0; i < n; i) {f[i] new HashMap<Long, Integer>();}for (int i 0;…

深度学习中的优化方法(Momentum,AdaGrad,RMSProp,Adam)详解及调用

深度学习中常用的优化方法包括啦momentum(动量法),Adagrad(adaptive gradient自适应梯度法),RMSProp(root mean square propagation均方根传播算法),Adam(adaptive moment estimation自适应矩估计法) 指数加权平均算法 所谓指数加权平均算法是上述优化算法的基础,其作用是对历…

定制化CRM如何重塑科技服务领域的生态链?

企业不仅面临着技术创新与知识产权保护的双重挑战&#xff0c;还需在激烈的市场竞争中构建稳固的客户关系与广泛的合作网络。传统的CRM&#xff08;客户关系管理&#xff09;系统&#xff0c;往往局限于企业内部的数据管理与流程优化&#xff0c;难以满足当前复杂多变的业务需求…

初识Linux · 进程替换

目录 前言&#xff1a; 1 直接看代码和现象 2 解释原理 3 将代码改成多进程版本 4 认识所有函数并使用 前言&#xff1a; 由前面的章节学习&#xff0c;我们已经了解了进程状态&#xff0c;进程终止以及进程等待&#xff0c;今天&#xff0c;我们学习进程替换。进程替换我…

【2023工业3D异常检测文献】Shape-Guided: 基于形状引导和双记忆库的异常检测方法

Shape-Guided Dual-Memory Learning for 3D Anomaly Detection 1、Background 提出了一个以形状为指导的专家学习框架&#xff0c;用于解决无监督3D异常检测的问题。 该方法建立在两个专门的专家模型及其协同作用的基础上&#xff0c;以从颜色和形状模态中定位异常区域。 第…