目录
1、线程池的实现逻辑
2、创建多线程
3、对线程池分配任务
3.1 任务类
3.2 发送与接收任务
结语
前言:
在Linux下实现一个线程池,线程池就是创建多个线程,然后对这些线程进行管理,并且可以发放任务给到线程池,让线程池完成任务。
1、线程池的实现逻辑
首先可以把线程池封装成一个类,既然用到了线程,那么必离不开三个关键要素:1、线程id,2、锁,3、条件变量。因此该线程类中必须有这三个成员变量,为了更好的区分线程彼此,可以对每个线程加上对应的名称,比如线程1,线程2,可以用string变量记录线程的名称,因此可以把线程id和string类进行封装成一个线程信息类,该类如下:
struct ThreadInfo
{
pthread_t tid;//线程id
std::string name;//线程名称
};
其次线程池里肯定存在多个线程,即多个线程id,为了管理他们,可以把这些线程id都放入一个容器中,因此可以在线程池类中定义一个vector<ThreadInfo>的成员变量,维护ThreadInfo类,实际上就是维护线程id。然后,给线程池发配任务并不是直接给线程本体发送,而是将任务发送到一块线程池能看到的区域内,线程池只需要在该区域内获取任务即可,发送方并不与线程池进行直接沟通,这就是一个简单的生产消费者模型,而这个区域可以采用容器(队列)的形式实现,即发送方往队列里push数据,线程池从队列中pop数据。
生产消费者模型如下:
再者,锁和条件变量是必须有的,因为多线程在访问同一块区域时需要对他们进行约束,即一次只能一个线程访问同一块区域,条件变量的目的是当区域达到极值(空或满)时,可以使用条件变量对该线程进行阻塞等待,等待区域不为满或者不为空时,就可以唤醒阻塞的线程。也就是说,当区域内为空时,线程池会阻塞在条件变量的等待队列中,当区域内为满时,发送方会阻塞在条件变量的等待队列中,可以理解为条件变量让线程访问共同资源时有了顺序性。
有了上述的逻辑,就可以定义出线程池的类了,线程池类如下:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5;//控制线程池的大小
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
vector<ThreadInfo> threads_;
queue<T> tasks_;//共同区域
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
2、创建多线程
有了线程池类的框架,下一步就是在该类里面使用容器threads_来创建多线程,因为在该类构造的时候,就已经创建了5个结构体ThreadInfo,然后这5个结构体ThreadInfo里面有各有一个tid,也就是说构造的时候就已经创建了5个tid了,我们直接使用这5个tid创建线程即可。
完善线程池中创建线程的代码:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5;//控制线程池的大小
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
//注意该函数要加static,因为类内函数默认有this指针,而执行函数只能有一个形参
static void *HandlerTask(void *args)
{
while (true)
{
cout<<"新的线程准备就绪"<<endl;
sleep(1);
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, nullptr);
}
}
void join()//等待线程
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
cout<<"开始等待新线程"<<endl;
pthread_join(threads_[i].tid,nullptr);
}
}
private:
vector<ThreadInfo> threads_;
queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
有了上述代码,我们就可以在主函数中先进行测试,主函数代码如下:
#include "threadpool.hpp"
int main()
{
ThreadPool<int> tp;
tp.Start();
tp.join();
return 0;
}
运行结果:
从结果可以看到,程序是正常运行的,下面就可以对线程进行任务分配了。
3、对线程池分配任务
这里使用队列作为共享区域的容器,因此发送任务和接收任务对应的操作是入队和出队,即线程池读取任务用pop,给线程池分配任务用push。
3.1 任务类
在实现pop和push前,先定义一个任务类,用于计算数据并得到结果,该任务类如下:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x),
data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
//oper_表示+-*/%操作符
//data1_ oper_ data2_ = ?
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
3.2 发送与接收任务
不管是发送任务还是接收任务,都需要对其进行上锁,因为生产消费者模型不允许发送方和接收方同时对共同区域访问,并且当共享区域里没有数据时线程池就不能在进行pop了,所以共享区域为空要将当前线程放入等待队列。(也可以设置一个阈值作为共享区域的最大容量,那样做的话则push数据时,若达到该阈值也会将发送方线程放入等待队列,但是这里就不设置阈值了)
发送与接收任务的整体代码如下:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5; // 控制线程池的大小
template <class T>
class ThreadPool
{
public:
// 复用函数
void Lock() // 申请锁
{
pthread_mutex_lock(&mutex_);
}
void Unlock() // 释放锁
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup() // 唤醒线程
{
pthread_cond_signal(&cond_);
}
void ThreadSleep() // 将线程放入等待队列中
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty() // 判断队列是否为空
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid) // 拿到线程的名称
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
// 注意该函数要加static,因为类内函数默认有this指针,而执行函数只能有一个形参
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
T t = tp->Pop();
t(); // 调用任务类里的仿函数
std::cout << name << " 开始计算, "
<< "结果: " << t.GetResult() << std::endl;
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "线程-" + std::to_string(i + 1);
//传的是this指针
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
void join()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
cout << "开始等待新线程" << endl;
pthread_join(threads_[i].tid, nullptr);
}
}
T Pop() // 复用STL队列的pop
{
Lock();
while (tasks_.empty())
{
ThreadSleep();
}
T t = tasks_.front();
tasks_.pop();
Unlock();
return t;
}
void Push(const T &t) // push是给发送方使用的
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
private:
vector<ThreadInfo> threads_;
queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
将功能都封装进入线程池类中,现在就只需要在主函数中调用这些功能即可,主函数代码如下:
#include "threadpool.hpp"
#include "Task.hpp"
#include <ctime>
int main()
{
ThreadPool<Task> tp;
tp.Start();
srand(time(0));
while(true)
{
//1. 构建任务
int x = rand() % 10 + 1;
usleep(10);
int y = rand() % 5;
char op = opers[rand()%opers.size()];
Task t(x, y, op);
//2. 交给线程池处理
tp.Push(t);
cout << "main thread make task: " << t.GetTask() << endl;
sleep(1);
}
return 0;
}
运行结果:
从结果可以看到,线程池里的每个线程都拿到并执行了分发的任务。
结语
以上就是关于线程池的实现和运用,实现线程池的核心在于对线程的基本运用以及对锁和条件变量的理解和运用。
最后如果本文有遗漏或者有误的地方欢迎大家在评论区补充,谢谢大家!!