这个模块需要具备那些基础知识。
线程创建相关操作,锁,条件变量。
设置线程数量:
_thread_count 是线程池中,记录线程数量的成员。
创建线程池:
上图就是线程池的创建,将线程与EventLoop对象 通过数组下标一一对应起来。
EventLoop对象的分配:
服务器是靠轮询的策略,来降低,每个线程的运行压力。
在TcpServer模块中调用,为新的客户端连接分配EventLoop对象:
线程创建与EventLoop对象进行绑定:
1,首先创建线程,进入线程函数,将EventLoop对象进行实例化,然后通过锁和条件变量来控制,先进行加锁,然后在条件变量_cond.notify_all(),这里等待,等待被唤醒,然后解锁。解锁这一步是_cond.notify_all()做的。
2,接着获取线程对应的EventLoop对象,先加锁,在_cond.wait()唤醒,等待的条件变量,这时线程会执行loop.Start()死循环。出了作用域,锁就会被销毁。
这就需要对锁和条件变量进行了解。
LoopThreadPool模块整体代码:
class LoopThread {
private:
/*用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化
std::thread _thread; // EventLoop对应的线程
private:
/*实例化 EventLoop 对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/
void ThreadEntry() {
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);//加锁
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
public:
/*创建线程,设定线程入口函数*/
LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
/*返回当前线程关联的EventLoop对象指针*/
EventLoop *GetLoop() {
EventLoop *loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex);//加锁
_cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞
loop = _loop;
}
return loop;
}
};
class LoopThreadPool {
private:
int _thread_count;
int _next_idx;
EventLoop *_baseloop;
std::vector<LoopThread*> _threads;
std::vector<EventLoop *> _loops;
public:
LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}
void SetThreadCount(int count) { _thread_count = count; }
void Create() {
if (_thread_count > 0) {
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++) {
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
return ;
}
EventLoop *NextLoop() {
if (_thread_count == 0) {
return _baseloop;
}
_next_idx = (_next_idx + 1) % _thread_count;
return _loops[_next_idx];
}
};