ZLMediaKit是国人开发的开源C++流媒体服务器,同SRS一样是主流的流媒体服务器。
ZLToolKit是基于C++11的高性能服务器框架,和ZLMediaKit是同一个作者,ZLMediaKit正是使用该框架开发的。
ZLMediaKit开源地址:https://github.com/ZLMediaKit/ZLMediaKit
ZLToolKit开源地址:https://github.com/ZLMediaKit/ZLToolKit
推荐ZLToolKit的理由
1、基于C++11,大量使用C++11新特性,如智能指针、lambda表达式等,安全性高,是高度运用C++特性的框架。
2、ZLMediaKit是应用ZLToolKit开发的,可以看到框架的使用实例,且ZLMediaKit流媒体服务器被全世界开发者使用,相当于是在测试ZLToolKit框架,因此框架的实用性和稳定性很高。
3、看过一些github上star数量很高的C++服务器框架,功能模块大同小异,但ZLT是把C++发挥到极致的框架。
目录
- ZLToolKit源码框架
- Thread
- Poller
- Network
- Util
- ZLToolKit源码测试
- EventPollerPool事件循环线程池测试
- WorkThreadPool工作线程池
- Timer定时器测试
- TcpClient测试
- ThreadPool任务线程池测试
- ResourcePool内存池测试
- stampthread时间戳线程测试
- NoticeCenter广播中心测试
- onceToken测试
- Any数据结构测试
- function_traits测试
- ObjectStatistic类统计测试
ZLToolKit源码框架
主要分为Thread、Poller、Network、Util四大部分。
Thread
semaphore.h(自定义信号量,封装类,由条件变量实现)
class semaphore,接口:post、wait。
TaskExecutor.h(cpu负载计算,Task函数指针模板,任务执行器管理,管理任务执行线程池)
class ThreadLoadCounter,cpu负载计算器,基类,统计线程每一次的睡眠时长和工作时长,并记录样本,调用load计算cpu负载=工作时长/总时长。
class TaskCancelable : public noncopyable,抽象类,可取消任务基类。
class TaskCancelableImp<R(ArgTypes…)> : public TaskCancelable,函数指针模板,event poller async 任务、DelayTask 任务等均使用该类型,任务可取消,重载()运算符执行任务,根据返回值类型,返回默认返回值。
class TaskExecutorInterface,抽象类,提供任务执行接口:async、async_first、sync、sync_first。
class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface,任务执行器,抽象类,无新增接口。
class TaskExecutorGetter,获得任务执行器,抽象类,接口:getExecutor、getExecutorSize。
class TaskExecutorGetterImp : public TaskExecutorGetter,实现抽象类接口,提供接口:getExecutorLoad(cpu负载)、for_each(遍历所有线程)、addPoller(创建 EventPoller 线程池)。
TaskQueue.h(由信号量控制的任务队列,加了线程锁,线程安全)
class TaskQueue,接口:push_task、push_exit、get_task、size。
threadgroup.h(线程组管理,创建线程,移除线程)
class thread_group,成员:_threads(umap存储线程组),接口:create_thread、remove_thread、is_thread_in、join_all、size。
ThreadPool.h(线程池任务管理,管理线程组执行任务队列)
class ThreadPool : public TaskExecutor,成员:thread_group、TaskQueueTask::Ptr,接口:start(启动线程池)、async(异步加入任务到队列)。
WorkThreadPool.h(创建一个工作线程池,可以加入线程负载均衡分配算法,类似EventPollerPool)
class WorkThreadPool : public TaskExecutorGetterImp,接口:getPoller、getFirstPoller、setPoolSize。
Poller
Pipe.h(管道对象封装)
class Pipe,成员:std::shared_ptr、EventPoller::Ptr _poller。
PipeWrap.h(管道的封装,windows下由socket模拟)
class PipeWrap,成员:int _pipe_fd[2],接口:write、read。
SelectWrap.h(select 模型的简单封装)
class FdSet
Timer.h(定时器对象)
class Timer,成员:EventPoller::Ptr(引用),构造函数传参超时时长和超时回调函数,EventPoller 选传或自动获取,超时回调在 EventPoller 线程执行。
EventPoller.h(基于epoll事件轮询模块)
class EventPoller : public TaskExecutor, public AnyStorage,基于epoll,可监听fd网络事件,async管道触发执行异步任务,doDelayTask定时器回调任务,runLoop执行事件循环体,添加/删除/修改监听事件,_event_map<网络fd和管道fd,回调>,_delay_task_map<延迟触发时间,回调>,_list_task<异步任务列表Task>。
class EventPollerPool :public TaskExecutorGetterImp,管理 EventPoller 线程池,可创建多个 EventPoller 线程,使用cpu负载均衡算法均匀分配线程,getPoller-》getExecutor 获得线程池内cpu负载最低的 EventPoller 线程。
Network
Buffer.h
class Buffer : public noncopyable,缓存抽象类,纯虚函数:data、size、toString、getCapacity,成员:ObjectStatistic对象个数统计。
class BufferOffset : public Buffer,成员:typename _data,构造函数传参offset,data获取+offset偏移的buffer。
class BufferRaw : public Buffer,成员:char *_data,接口:setCapacity分配,assign赋值,指针式缓存,根据分配内存大小自动扩减容。
class BufferLikeString : public Buffer,成员:std::string _str,接口:erase、append、push_back、insert、assign、clear、capacity、reserve、resize、empty、substr等,字符串操作缓存。
BufferSock.h
class BufferSock : public Buffer,成员:Buffer::Ptr _buffer、sockaddr_storage,管理_buffer指向的缓存。
class BufferList : public noncopyable,抽象类,接口:create、empty、count、send。
内部类
class BufferCallBack,成员:BufferList::SendResult回调函数,List<std::pair<Buffer::Ptr, bool> > 缓存列表,接口:sendFrontSuccess、sendCompleted,发送结果回调。
class BufferSendMsg final : public BufferList, public BufferCallBack,成员:_remain_size(剩余字节数)、_iovec(data和len组成的vector)、_iovec_off(_iovec当前发送下标),接口:send、send_l(执行系统调用sendmsg),socket发送数据时的buffer封装,用于tcp发送。
class BufferSendTo final: public BufferList, public BufferCallBack,接口:send(执行系统调用::sendto和::send)。
class BufferSendMMsg : public BufferList, public BufferCallBack,和 BufferSendMsg 类似,用于udp发送。
Server.h
class SessionMap,成员:std::unordered_map<std::string, std::weak_ptr >,管理Session,add、del、get。
class SessionHelper,成员:Session::Ptr、SessionMap::Ptr、Server,记录session至全局的map,方便后面管理。
class Server : public mINI,成员:EventPoller::Ptr,初始化设置EventPoller线程。
Session.h
class TcpSession : public Session
class UdpSession : public Session
class Session : public SocketHelper,成员:std::unique_ptr<toolkit::ObjectStatistictoolkit::TcpSession >、std::unique_ptr<toolkit::ObjectStatistictoolkit::UdpSession >,用于存储一对客户端与服务端间的关系。
Socket.h
typedef enum ErrCode,自定义socket错误枚举。
class SockException : public std::exception,成员:ErrCode,错误信息类,用于抛出系统和自定义异常,接口:what、getErrCode、getCustomCode、reset。
typedef enum SockType,socket类型,udp、tcp、tcpserver。
class SockNum,成员:int _fd、SockType _type,析构时关闭socket。
class SockFD : public noncopyable,成员:SockNum、EventPoller,文件描述符fd的封装,析构时停止事件监听,关闭socket。
class MutexWrapper,接口:lock、unlock,线程锁的封装,默认使用递归锁recursive_mutex。
class SockInfo,抽象类,接口:get_local_ip、get_local_port、get_peer_ip、get_peer_port、getIdentifier。
class Socket : public noncopyable, public SockInfo,成员:SockFD、EventPoller(网络事件触发和异步执行在此线程),异步IO Socket对象,包括tcp客户端、服务器和udp套接字,包含:错误回调、接收数据回调、tcp服务监听进入回调,connect、listen、send等接口的封装。
class SockSender,抽象类,接口:send、shutdown,重载运算符<<发送数据,定义socket发送接口。
class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface,抽象类,成员:Socket、EventPoller,主要是对Socket类的二次封装,自定义类继承该类,实现纯虚接口即可创建一个完整的socket类,比如tcpclient。
class SockUtil,套接字工具类,封装了socket、网络的一些基本操作,提供静态全局接口,比如connect、listen等。
class TcpClient : public SocketHelper,抽象类,Tcp客户端,自定义类继承与该类,实现onConnect、onManager回调即可创建一个可运行的tcp客户端。
class TcpServer : public Server,可配置的TCP服务器。
class UdpServer : public Server,可配置的UDP服务器。
Util
NoticeCenter.h(通知中心)
class EventDispatcher,成员:std::unordered_multimap<void *, Any>(first指针,多个对象监听相同事件传的指针必须不同,second是监听该事件的回调),recursive_mutex,事件分发器,监听同一个事件的回调。
class NoticeCenter,成员:std::unordered_map<std::string, EventDispatcher::Ptr>(first事件名,second分发器),recursive_mutex,接口:emitEvent,addListener,delListener,广播中心,全局单例。
ResourcePool.h(资源池)
class shared_ptr_imp : public std::shared_ptr,对智能指针封装,增加接口:quit,放弃或回收到资源池。
class ResourcePool_l,成员:std::vector<C *> _objs(C对象指针内存数组),std::atomic_flag _busy(原子锁,线程安全),接口:obtain、obtain2,内存池功能实现。
class ResourcePool,成员:std::shared_ptr<ResourcePool_l> pool,接口:obtain、obtain2,封装内存池对外接口。
mini.h(读写配置文件)
class mINI_basic : public std::map<key, variant>,接口:parseFile(解析配置文件)、dumpFile(保存配置文件),实际上是个map,保存的是配置文件键值对。
struct variant : public std::string,把任何配置项按 std::string 字符串处理。
using mINI = mINI_basic<std::string, variant>,mINI::Instance() 是全局单例对象,管理<key,value>配置项。
CMD.h(命令行参数解析)
class Option,选项类,成员:_short_opt(短选项名)、_long_opt(长选项名)、_des(描述)、_default_value(默认值)、_cb(回调)、_type(参数类型)。
class OptionParser,选项解析类,成员:Option _helper(初始化帮助选项)、std::map<char, int> _map_char_index(短选项名映射)、std::map<int, Option> _map_options(选项映射),接口:重载 operator<< 增加选项,delOption 删除选项。
class CMD : public mINI,成员:std::shared_ptr _parser,接口:重载 operator() 解析命令行参数,hasKey(是否存在key),splitedVal(按分隔符分隔字符串)。
class CMDRegister,全局单例对象,成员:std::map<std::string, std::shared_ptr > _cmd_map,接口:registCMD,宏:GET_CMD、CMD_DO、REGIST_CMD。
ZLToolKit源码测试
在tests/文件夹中有作者写的测试程序,这里记录我对框架关键模块的测试。
EventPollerPool事件循环线程池测试
框架的核心是 EventPoller 事件循环线程,由 EventPollerPool 管理多个 EventPoller 组成的线程池。
EventPollerPool,获取一个可用的线程池。
EventPollerPool 是全局单例对象,用来管理多个 EventPoller 组成的线程池,后者是基于 epoll 实现的线程。
EventPoller 对象 只能在 EventPollerPool 中构造,EventPollerPool 管理 EventPoller 线程池。
EventPollerPool 负责创建和管理 EventPoller 对象, 可获取当前EventPoller线程,最低负荷EventPoller线程,第一个EventPoller线程等,也可以自定义规则获取EventPoller线程对象。
EventPoller 线程主要处理:定时器(Timer)、异步任务(async)、网络事件(socket),epoll 监听管道fd和 socket fd 事件,加入 _event_map<fd,CB>。
1、定时器(Timer):可由任意线程调用,线程安全,异步加入延时任务队列 _delay_task_map<触发时间(ms),Task> ,距离最近触发定时器时间传入 epoll_wait 的超时时间,每次循环检测定时器队列,触发回调。
2、异步任务(async):可由任意线程调用,任务加入 _list_task 队列,有锁线程安全,并通过写管道 _pipe 唤醒epoll线程执行任务。
3、网络事件(socket):EventPoller 智能指针可以和 socket 绑定,监听处理fd接收/断开/错误等网络事件;socket 数据发送可以在单线程或任意线程进行(enable_mutex),线程锁 _mtx_sock_fd 。
#ifndef TEST_EVENTPOLLERPOOL_H
#define TEST_EVENTPOLLERPOOL_H
#include <csignal>
#include <iostream>
#include "Util/logger.h"
#include "Network/TcpClient.h"
using namespace std;
using namespace toolkit;
void test_EventPollerPool() {
//全局单例,获取实例即执行构造,addPoller 创建线程池,线程保存在其基类成员: std::vector<TaskExecutor::Ptr> _threads;
//默认创建线程个数=CPU个数,也可以 setPoolSize 设置线程个数
EventPollerPool::Instance();
//从线程池返回一个 EventPoller 线程的智能指针,增加其引用计数
//可以选择优先返回当前线程,或返回最低负荷线程
std::shared_ptr<EventPoller> poller1 = EventPollerPool::Instance().getPoller();
std::shared_ptr<EventPoller> poller2 = EventPollerPool::Instance().getPoller();
printf("use_count=%ld\n",poller1.use_count());//use_count=3
printf("main threadid=%ld\n",pthread_self());
//异步执行,可以在任意线程调用,lambda 表达式在 EventPoller 线程异步执行
//通过 lambda 表达式传参,把 lambda 这个匿名函数加入 EventPoller 线程的 List<Task::Ptr> _list_task 任务队列,Task 无参无返回值。
int num = 15;
poller1->async([num](){
printf("poller1 threadid=%ld,num=%d\n",pthread_self(),num);
});
//定时器(Timer)参考: test_timer.h
//网络事件(socket)参考: TestClient.h
/**
* 打印:
* use_count=3
* main threadid=139907182602176
* poller1 threadid=139907174205184,num=15
*/
//退出程序事件处理
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
}
#endif // TEST_EVENTPOLLERPOOL_H
WorkThreadPool工作线程池
WorkThreadPool,获取一个可用的线程池。
WorkThreadPool 是全局单例对象,和 EventPollerPool 功能几乎一致,都是管理 EventPoller 所组成的线程池。
EventPollerPool 为了线程安全,支持优先返回当前线程,也可以选择返回最低负荷线程; WorkThreadPool 只返回最低负荷线程。
EventPollerPool 通常用于实时性较高的业务,比如定时器、fd网络事件等,该线程不应该被耗时业务阻塞。
ZLM 使用 WorkThreadPool 主要用于文件读写、DNS解析、mp4关闭等耗时的工作,完成后再通过 EventPollerPool::async 切回自己的线程。
Timer定时器测试
测试定时器
Ticker类:可以统计代码执行时间,一般的计时统计。
Timer类:定时器类,构造函数传参超时时长、回调函数等,根据回调函数判断是否重复下次任务,回调函数在event poller线程异步执行。
程序运行5个线程:stamp thread、QT_ZLToolKit、async log、event poller 0、event poller 1。
调用栈
1、添加定时器(任意线程):Timer::Timer-》EventPoller::doDelayTask-》async_first(异步执行,把定时器任务添加到 _delay_task_map)-》EventPoller::async_l(_list_task.emplace_front)。
2、定时器超时:EventPoller::runLoop-》EventPoller::getMinDelay-》EventPoller::flushDelayTask-》(*(it->second))(),在这里执行 Timer 构造函数第二个参数传的回调函数。
实现原理
创建定时器时把延迟触发时间和回调函数传参加入 _delay_task_map,时间会加上当前时间(now+delay_ms),event poller 线程轮询所有定时器,比较当前时间now与上述 _delay_task_map 里的时间,超时则执行回调函数。
实现技巧
1、先看 event poller 线程的唤醒,如果是网络事件(比如接收tcp数据)可以直接唤醒 epoll_wait ,新加入异步任务是通过管道唤醒;
2、Timer 定时器任务则是依赖 event poller 线程轮询检测是否超时,那多久检测一次(也就是 epoll_wait 超时时间)?这里在每次执行 getMinDelay 时会把最近定时器超时时长作为返回值,传参给 epoll_wait,下次线程唤醒时正好最近的定时器超时,执行任务;
3、这样既保证线程不会过度轮询浪费cpu资源,也可以保证定时器任务能尽快执行。
4、EventPoller::doDelayTask 加入 _delay_task_map 是在 event poller 线程异步执行,通过写管道唤醒 event poller 线程,可以刷新 minDelay=getMinDelay 时间,也就是下次唤醒 epoll_wait 的时间。
#ifndef TEST_TIMER_H
#define TEST_TIMER_H
#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
using namespace std;
using namespace toolkit;
void test_timer() {
//设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
Ticker ticker0;
Timer::Ptr timer0 = std::make_shared<Timer>(0.5f,[&](){
TraceL << "timer0重复:" << ticker0.elapsedTime();
ticker0.resetTime();
return true;
}, nullptr);
Timer::Ptr timer1 = std::make_shared<Timer>(1.0f,[](){
DebugL << "timer1不再重复";
return false;
},nullptr);
Ticker ticker2;
Timer::Ptr timer2 = std::make_shared<Timer>(2.0f,[&]() -> bool {
InfoL << "timer2,测试任务中抛异常" << ticker2.elapsedTime();
ticker2.resetTime();
throw std::runtime_error("timer2,测试任务中抛异常");
},nullptr);
//退出程序事件处理
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
}
#endif // TEST_TIMER_H
TcpClient测试
自定义类,继承于TcpClient,并重写onConnect、onRecv等虚函数,根据需求实现相应功能
程序运行5个线程:stamp thread、QT_ZLToolKit、async log、event poller 0、event poller 1
调用栈:
发起连接(主线程或其他线程):test_TcpClient->TcpClient::startConnect->Socket::connect,_poller->async加入 event poller 线程异步执行连接任务。
异步执行连接(event poller 线程):EventPoller::runLoop->EventPoller::onPipeEvent->Socket::connect->Socket::connect_l->async_con_cb(SockUtil::connect)->::connect,开始连接。
这里使用的是非阻塞连接,connect返回EINPROGRESS则表示正在连接,async_con_cb->_poller->addEvent给fd添加可写事件,添加成功则说明连接成功。
连接结果回调(eventpoller线程):async_con_cb-》Socket::onConnected(Socket::attachEvent 添加epoll监听读事件,监听接收数据)-》con_cb-》con_cb_in-》TcpClient::onSockConnect-》TestClient::onConnect。
数据接收回调:TcpClient::onSockConnect-》Socket::setOnRead-》_on_read=TestClient::onRecv
数据接收(eventpoller线程):EventPoller::runLoop-》epoll_wait监听到事件-》Socket::onRead-》_on_read-》TestClient::onRecv。
数据发送:demo里 TcpClient::startConnect 会创建定时器:_timer,每隔2秒回调一次 TestClient::onManager ,执行数据发送,定时器超时回调是在 event poller 线程,socket跨线程安全,线程锁:Socket::_mtx_sock_fd。
EventPoller::runLoop->EventPoller::getMinDelay->EventPoller::flushDelayTask(_timer 定时器超时)->TestClient::onManager->SockSender::<< ->SockSender::send->SocketHelper::send->Socket::send->Socket::send_l->Socket::flushAll->Socket::flushData->BufferSendMsg::send->BufferSendMsg::send_l->sendmsg(系统调用)。
小结
发起tcp连接可以在任意线程,非阻塞连接任务是在 event poller 线程异步执行,连接成功会添加 epoll 事件监听数据接收,发送数据可以在任意线程,使用线程锁保证线程安全。
#ifndef TESTCLIENT_H
#define TESTCLIENT_H
#include <csignal>
#include <iostream>
#include "Util/logger.h"
#include "Network/TcpClient.h"
using namespace std;
using namespace toolkit;
class TestClient: public TcpClient {
public:
using Ptr = std::shared_ptr<TestClient>;
TestClient():TcpClient() {
DebugL;
}
~TestClient(){
DebugL;
}
protected:
virtual void onConnect(const SockException &ex) override{
//连接结果事件
InfoL << (ex ? ex.what() : "success");
}
virtual void onRecv(const Buffer::Ptr &pBuf) override{
//接收数据事件
DebugL << pBuf->data() << " from port:" << get_peer_port();
}
virtual void onFlush() override{
//发送阻塞后,缓存清空事件
DebugL;
}
virtual void onError(const SockException &ex) override{
//断开连接事件,一般是EOF
WarnL << ex.what();
}
virtual void onManager() override{
//定时发送数据到服务器
auto buf = BufferRaw::create();
if(buf){
buf->assign("[BufferRaw]\0");
(*this) << _nTick++ << " "
<< 3.14 << " "
<< string("string") << " "
<<(Buffer::Ptr &)buf;
}
}
private:
int _nTick = 0;
};
int test_TcpClient() {
// 设置日志系统
Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
TestClient::Ptr client(new TestClient());//必须使用智能指针
client->startConnect("192.168.3.64",9090);//连接服务器
// TcpClientWithSSL<TestClient>::Ptr clientSSL(new TcpClientWithSSL<TestClient>());//必须使用智能指针
// clientSSL->startConnect("192.168.3.64",9090);//连接服务器
//退出程序事件处理
static semaphore sem;
///SIGINT:Ctrl+C发送信号,结束程序
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
return 0;
}
#endif // TESTCLIENT_H
ThreadPool任务线程池测试
线程池,可以输入functional任务至后台线程执行。
构造函数中创建线程组 thread_group _thread_group,从任务队列 TaskQueueTask::Ptr _queue 获取任务在线程池执行。
线程池中所有线程共用一个任务队列 _queue,ThreadPool 无锁,但 _queue 中有锁,目前ZLM中没有 ThreadPool 应用。
#ifndef TEST_THREADPOOL_H
#define TEST_THREADPOOL_H
#include <chrono>
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/TimeTicker.h"
#include "Thread/ThreadPool.h"
using namespace std;
using namespace toolkit;
/**
* @brief thread_group :线程组,移植自boost。
* create_thread 快速创建一组线程,并指定参数和线程处理函数。
*/
int test_ThreadPool() {
//初始化日志系统
Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
ThreadPool pool(thread::hardware_concurrency(), ThreadPool::PRIORITY_HIGHEST, true);
//每个任务耗时3秒
auto task_second = 3;
//每个线程平均执行4次任务,总耗时应该为12秒
auto task_count = thread::hardware_concurrency() * 4;
semaphore sem;
vector<int> vec;
vec.resize(task_count);
Ticker ticker;
{
//放在作用域中确保token引用次数减1
auto token = std::make_shared<onceToken>(nullptr, [&]() {
sem.post();
});
for (auto i = 0; i < task_count; ++i) {
pool.async([token, i, task_second, &vec]() {
setThreadName(("thread pool " + to_string(i)).data());
std::this_thread::sleep_for(std::chrono::seconds(task_second)); //休眠三秒
InfoL << "task " << i << " done!";
vec[i] = i;
});
}
}
sem.wait();
InfoL << "all task done, used milliseconds:" << ticker.elapsedTime();
//打印执行结果
for (auto i = 0; i < task_count; ++i) {
InfoL << vec[i];
}
return 0;
}
#endif // TEST_THREADPOOL_H
ResourcePool内存池测试
ResourcePool :基于智能指针实现的一个循环池,不需要手动回收对象。
ResourcePool 是个类模板,可传入自定义数据类型C,内存池中存放的是该数据类型C的指针数组 std::vector<C > _objs。
setSize(_pool_size) 可设置内存池最大容量,当超出最大容量,recycle 不再回收该资源,而是直接释放。
obtain 获取内存,返回的是指向内存对象C的自定义智能指针 shared_ptr_imp,特点是提供接口quit,当内存对象使用完后,可以选择不再回收到内存池,此时可以自定义回收或直接释放。
obtain2 获取内存,返回指向内存对象C的智能指针 std::shared_ptr,当智能指针离开作用域时自动回收内存对象C到内存池。
私有接口:
getPtr 获取C原始指针,当 _objs 为空时分配一个新的C返回,当 _objs 不为空则从 _objs 尾部取已一个C*,并从 _objs 中删除。
recycle 回收C*,如果 _objs 已满(>=_pool_size),直接释放C*,否则回收到 _objs 尾部。
总结
1、内存池中存放的是任意类型数据指针C*,C大小固定或可动态扩容,刚开始内存池是空的,使用时分配内存,用完后回收到内存池,下次再使用时就不用重新分配了,直接用上次分配并回收的C*;
2、不用担心高并发内存池不够用,因为当内存池为空时总会立即分配内存,如果分配的太多,回收时超出内存池大小后会直接释放,合理的内存池大小在高并发时会减少分配和释放的次数。
#ifndef TEST_RESOURCEPOOL_H
#define TEST_RESOURCEPOOL_H
#include <csignal>
#include <iostream>
#include <random>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/ResourcePool.h"
#include "Thread/threadgroup.h"
#include <list>
using namespace std;
using namespace toolkit;
//程序退出标志
bool g_bExitFlag = false;
class string_imp : public string{
public:
template<typename ...ArgTypes>
string_imp(ArgTypes &&...args) : string(std::forward<ArgTypes>(args)...){
DebugL << "创建string对象:" << this << " " << *this;
};
~string_imp(){
WarnL << "销毁string对象:" << this << " " << *this;
}
};
//后台线程任务
void onRun(ResourcePool<string_imp> &pool,int threadNum){
std::random_device rd;
while(!g_bExitFlag){
//从循环池获取一个可用的对象
auto obj_ptr = pool.obtain();
if(obj_ptr->empty()){
//这个对象是全新未使用的
InfoL << "后台线程 " << threadNum << ":" << "obtain a emptry object!";
}else{
//这个对象是循环使用的
InfoL << "后台线程 " << threadNum << ":" << *obj_ptr;
}
//标记该对象被本线程使用
obj_ptr->assign(StrPrinter << "keeped by thread:" << threadNum );
//随机休眠,打乱循环使用顺序
usleep( 1000 * (rd()% 10));
obj_ptr.reset();//手动释放,也可以注释这句代码。根据RAII的原理,该对象会被自动释放并重新进入循环列队
usleep( 1000 * (rd()% 1000));
}
}
int test_ResourcePool() {
//初始化日志
Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
//大小为50的循环池
ResourcePool<string_imp> pool;
pool.setSize(50);
//获取一个对象,该对象将被主线程持有,并且不会被后台线程获取并赋值
auto reservedObj = pool.obtain();
//在主线程赋值该对象
reservedObj->assign("This is a reserved object , and will never be used!");
thread_group group;
//创建4个后台线程,该4个线程模拟循环池的使用场景,
//理论上4个线程在同一时间最多同时总共占用4个对象
WarnL << "主线程打印:" << "开始测试,主线程已经获取到的对象应该不会被后台线程获取到:" << *reservedObj;
for(int i = 0 ;i < 4 ; ++i){
group.create_thread([i,&pool](){
onRun(pool,i);
});
}
//等待3秒钟,此时循环池里面可用的对象基本上最少都被使用过一遍了
sleep(3);
//但是由于reservedObj早已被主线程持有,后台线程是获取不到该对象的
//所以其值应该尚未被覆盖
WarnL << "主线程打印: 该对象还在被主线程持有,其值应该保持不变:" << *reservedObj;
//获取该对象的引用
auto &objref = *reservedObj;
//显式释放对象,让对象重新进入循环列队,这时该对象应该会被后台线程持有并赋值
reservedObj.reset();
WarnL << "主线程打印: 已经释放该对象,它应该会被后台线程获取到并被覆盖值";
//再休眠3秒,让reservedObj被后台线程循环使用
sleep(3);
//这时,reservedObj还在循环池内,引用应该还是有效的,但是值应该被覆盖了
WarnL << "主线程打印:对象已被后台线程赋值为:" << objref << endl;
{
WarnL << "主线程打印:开始测试主动放弃循环使用功能";
List<decltype(pool)::ValuePtr> objlist;
for (int i = 0; i < 8; ++i) {
reservedObj = pool.obtain();
string str = StrPrinter << i << " " << (i % 2 == 0 ? "此对象将脱离循环池管理" : "此对象将回到循环池");
reservedObj->assign(str);
reservedObj.quit(i % 2 == 0);
objlist.emplace_back(reservedObj);
}
}
sleep(3);
//通知后台线程退出
g_bExitFlag = true;
//等待后台线程退出
group.join_all();
return 0;
}
#endif // TEST_RESOURCEPOOL_H
stampthread时间戳线程测试
测试时间戳线程
只要执行了静态方法initMillisecondThread,就会创建时间戳线程,最多创建1个,线程名称:stamp thread。
提供getCurrentMillisecond和getCurrentMicrosecond接口,获取程序启动到当前时间的毫秒数和微秒数,或从1970年开始到当前时间的毫秒数和微秒数。
程序启动后有两个线程:QT_ZLToolKit(主线程)和stamp thread。
#ifndef TEST_STAMPTHREAD_H
#define TEST_STAMPTHREAD_H
#include "Util/util.h"
#include <sys/time.h>
using namespace std;
using namespace toolkit;
void test_stampthread() {
uint64_t cur_ms = getCurrentMillisecond(true);
printf("cur_ms = %lu\n",cur_ms);
usleep(100*1000);
cur_ms = getCurrentMillisecond(true);
printf("cur_ms = %lu\n",cur_ms);
}
#endif // TEST_STAMPTHREAD_H
NoticeCenter广播中心测试
广播中心,可以在程序的任意线程添加监听事件并定义回调;可以在任意线程发出一个事件,通知所有监听了该事件的地方执行回调。
每个事件创建一个分发器 EventDispatcher ,分发器存放监听该事件的key和回调,加线程锁,多线程安全。
NoticeCenter::Instance() 定义对外接口,是全局单例对象,加线程锁,添加/删除事件、发出事件均是多线程安全。
addListener(指针key,事件名,回调) 可以在任意线程添加监听,emitEvent(事件名,参数列表) 可以在任意线程发出一个事件,注意:监听回调是在 emitEvent 所在线程执行的。
class EventDispatcher,成员:std::unordered_multimap<void *, Any>(first指针,多个对象监听相同事件传的指针必须不同,second是监听该事件的回调),recursive_mutex,事件分发器,监听同一个事件的回调。
class NoticeCenter,成员:std::unordered_map<std::string, EventDispatcher::Ptr>(first事件名,second分发器),recursive_mutex,接口:emitEvent,addListener,delListener,全局单例。
下面实验:多线程监听相同事件,线程安全。
#ifndef TEST_NOTICECENTER_H
#define TEST_NOTICECENTER_H
#include <csignal>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/NoticeCenter.h"
using namespace std;
using namespace toolkit;
//定义两个事件,事件是字符串类型
//广播名称1
#define NOTICE_NAME1 "NOTICE_NAME1"
//广播名称2
#define NOTICE_NAME2 "NOTICE_NAME2"
//程序退出标记
bool g_bExitFlag = false;
static void *tag1;
static void *tag2;
void* func0(void*) {
//addListener方法第一个参数是标签,用来删除监听时使用
//需要注意的是监听回调的参数列表个数类型需要与emitEvent广播时的完全一致,否则会有无法预知的错误
NoticeCenter::Instance().addListener(tag1,NOTICE_NAME1,
[](int &a,const char * &b,double &c,string &d){
printf("func0=%d\n",a);
});
return nullptr;
}
void* func1(void*) {
NoticeCenter::Instance().addListener(tag2,NOTICE_NAME1,
[](int &a,const char * &b,double &c,string &d){
printf("func1=%d\n",a);
});
return nullptr;
}
void* func2(void*) {
//监听NOTICE_NAME2事件
NoticeCenter::Instance().addListener(0,NOTICE_NAME2,
[](string &d,double &c,const char *&b,int &a){
printf("func2=%d\n",a);
});
return nullptr;
}
int test_NoticeCenter() {
//设置程序退出信号处理函数
signal(SIGINT, [](int){g_bExitFlag = true;});
//设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>());
pthread_t tid[5];
pthread_create(&tid[0],nullptr,func0,nullptr);
pthread_create(&tid[1],nullptr,func1,nullptr);
pthread_create(&tid[2],nullptr,func2,nullptr);
int a = 0;
while(!g_bExitFlag){
const char *b = "b";
double c = 3.14;
string d("d");
//每隔1秒广播一次事件,如果无法确定参数类型,可加强制转换
NoticeCenter::Instance().emitEvent(NOTICE_NAME1,++a,(const char *)"b",c,d);
NoticeCenter::Instance().emitEvent(NOTICE_NAME2,d,c,b,a);
sleep(1); // sleep 1 second
}
return 0;
}
#endif // TEST_NOTICECENTER_H
onceToken测试
RAII [1] (Resource Acquisition Is Initialization)
也称为“资源获取就是初始化”,是C++语言的一种管理资源、避免泄漏的惯用法。
RAII的思想:构造时获取资源,在对象生命周期内保持资源有效,最后对象析构时释放资源。
onceToken
使用RAII模式实现,可以在对象构造和析构时执行一段代码。
也就是在构造时执行一段代码(传nullptr则什么都不执行),在离开作用域时执行一段代码。
在ZLM中,onceToken 主要用于防止在程序抛出异常时提前返回,没有执行接下来的代码。
把一定要执行的代码放在 onceToken 析构时执行,防止程序抛出异常提前返回。
如果要等待异步执行后再析构,在执行 async 时把 onceToken 智能指针作为行参传递给Lambda表达式。
#ifndef TEST_ONCETOKEN_H
#define TEST_ONCETOKEN_H
#include <csignal>
#include "Util/onceToken.h"
#include "Poller/EventPoller.h"
using namespace std;
using namespace toolkit;
void token_start() {
printf("token start\n");
}
void test_onceToken() {
EventPoller::Ptr poller = EventPollerPool::Instance().getPoller();
//异步执行时传递 onceToken 智能指针行参,引用计数加1,等所有的异步执行结束后引用计数变为0,执行析构
/**
* 打印:
* token start
* async=0
* async=1
* async=2
* token destruct
*/
{
auto token = std::make_shared<onceToken>(token_start, []() {
printf("token destruct\n");
});
for (auto i = 0; i < 3; ++i) {
poller->async([token, i]() {//EventPoller 线程异步执行
printf("async=%d\n",i);
std::this_thread::sleep_for(std::chrono::seconds(1)); //休眠1秒
});
}
}
//退出程序事件处理
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
}
#endif // TEST_ONCETOKEN_H
Any数据结构测试
Any可以保存任意类型的数据。
#ifndef TEST_ANY_H
#define TEST_ANY_H
#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
using namespace std;
using namespace toolkit;
class Student{
public:
Student(int age):age(age) {
printf("Student\n");
}
~Student() {
printf("~Student\n");
}
Student(Student &s) {
printf("Student copy\n");
age = s.age;
}
int age;
};
template <typename FUNC>
void test_func(FUNC &&func) {
}
/**
* @brief Any :可以保存任意的对象
*/
void test_Any() {
//1.Any保存类的对象
{
Any aa;
//创建 Student 的智能指针,(17)是构造函数的参数
aa.set<Student>(17);
//拷贝构造,get<Student>(bool可选参数)返回智能指针所管理的原始指针的对象引用
Student S1 = aa.get<Student>();
// aa.reset();//如果天提前释放aa,则捕获异常,打印: ex=Any is empty
try{
printf("aa age=%d\n",aa.get<Student>().age);
}catch(std::exception& e) {
printf("ex=%s\n",e.what());
}
printf("s1 age=%d\n",S1.age);
//离开作用域打印:
// Student
// Student copy
// aa age=17
// s1 age=17
// ~Student
// ~Student
}
//2.Any保存 function 函数指针模板
Any bb;
//set<function函数指针模板的数据类型>(function 的实例化,lambda表达式)
bb.set<std::function<void(int)>>([](int a){
printf("a=%d\n",a);
});
//获取bb所管理的对象并调用方法,(bool可选参数)(10:调用对象所传的参数)
bb.get<std::function<void(int)>>()(10);//调用lambda表达式,打印:a=10
//退出程序事件处理
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
}
#endif // TEST_ANY_H
function_traits测试
源码类似于: 《深入应用C++11 代码优化与工程级应用》3.3.6 function_traits
function_traits 用来获取所有函数语义类型的信息(函数类型、返回类型、参数个数和参数的具体类型),通过 stl_function_type 把任意函数转换成 std::function。
函数类型包括:普通函数、函数指针、function/lambda、成员函数、函数对象。
实现function_traits的关键技术:
要通过模板特化和可变参数模板来获取函数类型和返回类型。
先定义一个基本的function_traits的模板类:
template
struct function_traits;
再通过特化,将返回类型和可变参数模板作为模板参数,就可以获取函数类型、函数返回值和参数的个数了。
如:
int func(int a, string b);
1## 获取函数类型
function_traits<decltype(func)>::function_type; // int __cdecl(int, string)
2# 获取函数返回值
function_traits<decltype(func)>::return_type; // int
3# 获取函数的参数个数
function_traits<decltype(func)>::arity; // 2
4# 获取函数第一个入参类型
function_traits<decltype(func)>::args<0>::type; // int
5# 获取函数第二个入参类型
function_traits<decltype(func)>::args<1>::type; // string
6# 将函数转换为 std::function
stl_function_type
#ifndef TEST_FUNCTION_TRAITS_H
#define TEST_FUNCTION_TRAITS_H
#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
#include "Util/function_traits.h"
using namespace std;
using namespace toolkit;
//打印数据类型
template<typename T>
void printType()
{
printf("%s\n",demangle(typeid(T).name()).c_str());
}
//自定义类
class Student2{
};
//函数指针
float(*cast_func)(int, int, int, int);
//普通函数
int func(int a, Student2 b)
{
printf("a=%d\n",a);
return 0;
}
struct AA
{
int f(int a, int b)volatile { return a + b; }//成员函数
int operator()(int)const { return 0; }//函数对象
};
//function 函数包装模板,指向 lambda 表达式
std::function<int(int)> func_lam = [](int a) {return a; };
template <typename FUNC>
void to_function(FUNC &&func) {
using stl_func = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
stl_func f = func;
f(10,Student2());//调用func,打印: a=10
}
void test_function_traits() {
//1.获取函数信息
printf("func:%s\n",demangle(typeid(function_traits<decltype(func)>::function_type).name()).c_str());//打印函数类型
printf("func ret:%s\n",demangle(typeid(function_traits<decltype(func)>::return_type).name()).c_str());//打印返回值类型
printf("fucn arg num:%d\n",function_traits<decltype(func)>::arity);//打印参数个数
printf("fucn arg[0]:%s\n",demangle(typeid(function_traits<decltype(func)>::args<0>::type).name()).c_str());//打印第一个参数的类型
printType<function_traits<std::function<int(int)>>::function_type>();
printType<function_traits<std::function<int(int)>>::args<0>::type>();
printType<function_traits<decltype(func_lam)>::function_type>();
printType<function_traits<decltype(cast_func)>::function_type>();
printType<function_traits<AA>::function_type>();
using T = decltype(&AA::f);
printType<T>();
printType<function_traits<decltype(&AA::f)>::function_type>();
static_assert(std::is_same<function_traits<decltype(func_lam)>::return_type, int>::value, "");
//2.使用 stl_function_type 把任意函数转换为 std::function
to_function(func);
/**
* 打印:
* func:int (int, Student2)
* func ret:int
* fucn arg num:2
* fucn arg[0]:int
* int (int)
* int
* int (int)
* float (int, int, int, int)
* int (int)
* int (AA::*)(int, int) volatile
* int (int, int)
* a=10
*/
//退出程序事件处理
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
}
#endif // TEST_FUNCTION_TRAITS_H
ObjectStatistic类统计测试
ObjectStatistic :统计类所实例化对象的个数。
创建私有成员: ObjectStatistic<Test_Obj> _statistic;并使用宏声明: StatisticImp(Test_Obj)。
在任意实例化的对象里均可调用接口 _statistic.count 查询实例化对象的总数。
#ifndef TEST_OBJECTSTATISTIC_H
#define TEST_OBJECTSTATISTIC_H
#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
using namespace std;
using namespace toolkit;
class Test_CLS {
public:
int get_count() {
return _statistic.count();
}
private:
ObjectStatistic<Test_CLS> _statistic;
};
StatisticImp(Test_CLS)
void test_ObjectStatistic() {
Test_CLS tTest_CLS1;
Test_CLS tTest_CLS2;
printf("count=%d\n",tTest_CLS1.get_count());//count=2
{
Test_CLS tTest_CLS3;
printf("count=%d\n",tTest_CLS1.get_count());//count=3
}
printf("count=%d\n",tTest_CLS1.get_count());//count=2
//退出程序事件处理
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait();
}
#endif // TEST_OBJECTSTATISTIC_H