线程池
- Lock.hpp
- 说明
- Task.hpp
- 代码
- 代码说明
- Threadpool.hpp
- 代码说明
- Threadpool.cc
- 代码说明
- Log.hpp
- 代码说明
- Makefile
- 运行结果
Lock.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&lock_, nullptr);
}
void lock()
{
pthread_mutex_lock(&lock_);
}
void unlock()
{
pthread_mutex_unlock(&lock_);
}
~Mutex()
{
pthread_mutex_destroy(&lock_);
}
private:
pthread_mutex_t lock_;
};
class LockGuard
{
public:
LockGuard(Mutex *mutex) : mutex_(mutex)
{
mutex_->lock();
std::cout << "加锁成功..." << std::endl;
}
~LockGuard()
{
mutex_->unlock();
std::cout << "解锁成功...." << std::endl;
}
private:
Mutex *mutex_;
};
说明
-这段代码定义了两个C++类:Mutex和LockGuard,这两个类用于处理多线程中的同步问题。
Mutex 类:这个类封装了互斥量(mutex)的初始化、锁定、解锁和销毁操作。使用这个类的对象可以确保在多线程环境中,某一时刻只有一个线程能访问某些特定的代码区域(即临界区)。具体解释如下:
Mutex():构造函数中,调用了pthread_mutex_init来初始化互斥量lock_。
lock():此函数调用pthread_mutex_lock来锁定互斥量,从而确保调用该函数的线程独占临界区,直到它调用unlock释放互斥量。
unlock():此函数调用pthread_mutex_unlock来解锁互斥量,允许其他线程进入临界区。
~Mutex():析构函数中,调用pthread_mutex_destroy来销毁互斥量。
LockGuard 类:这个类是一个典型的RAII(Resource Acquisition Is Initialization,资源获取即初始化)实现,用于自动管理Mutex的锁定和解锁。对象在创建时获取资源(在这种情况下是锁定一个互斥量),在对象生命周期结束时释放资源(解锁互斥量)。这样可以确保即使出现异常,也能正确释放资源。具体解释如下:
LockGuard(Mutex *mutex):构造函数中,接受一个Mutex对象的指针,并对其进行锁定。然后输出"加锁成功…“。
~LockGuard():析构函数中,解锁之前在构造函数中锁定的Mutex对象。然后输出"解锁成功…”。
使用这两个类,可以方便地在多线程环境中保护临界区的代码,确保其线程安全。例如:
Mutex mutex;
//...
{
LockGuard lock(&mutex);
// 临界区的代码
// ...
} // 在这里,lock对象离开其作用域,触发析构函数,互斥量自动解锁。
这样可以确保,即使临界区的代码出现异常,析构函数仍会被调用,从而解锁互斥量。这就避免了因为异常而导致的死锁情况。
Task.hpp
代码
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0')
{
}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
代码说明
构造函数:有两个构造函数,一个默认构造函数,一个带参数的构造函数,用于初始化类的成员变量 elemOne_、elemTwo_ 和 operator_。
operator() 函数:这是一个函数调用运算符重载,它使得对象可以像函数一样被调用。在本例中,operator() 调用了 run() 函数并返回其结果。
run() 函数:这是类的一个成员函数,根据成员变量 operator_ 的值执行相应的数学运算,并返回运算结果。
get() 函数:这个函数用于获取 Task 对象中的元素和运算符。它通过指针参数返回 elemOne_、elemTwo_ 和 operator_ 的值。
Task 类可以用于表示简单的数学任务,并通过调用 run() 函数来计算结果。它还提供了获取元素和运算符的功能。如果运算符无效,run() 函数会输出相应的错误消息。
总的来说,Task 类是一个用于处理数学任务的简单类,并且提供了一些实用的方法。
Threadpool.hpp
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Log.hpp"
#include "Lock.hpp"
using namespace std;
int gThreadNum = 5;
template <class T>
class ThreadPool
{
private:
ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T>&) = delete;
public:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (nullptr == instance) //仅仅是过滤重复的判断
{
LockGuard lockguard(&mutex); //进入代码块,加锁。退出代码块,自动解锁
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
//类内成员, 成员函数,都有默认参数this
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
prctl(PR_SET_NAME, "follower");
while (1)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
//这个任务就被拿到了线程的上下文中
T t = tp->pop();
tp->unlockQueue();
// for debug
int one, two;
char oper;
t.get(&one, &two, &oper);
//规定,所有的任务都必须有一个run方法
Log() << "新线程完成计算任务: " << one << oper << two << "=" << t.run() << "\n";
}
}
void start()
{
assert(!isStart_);
for (int i = 0; i < threadNum_; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
void push(const T &in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool haveTask() { return !taskQueue_.empty(); }
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
private:
bool isStart_;
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *instance;
// const static int a = 100;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
代码说明
这是一个简单的线程池实现。以下是代码的功能和要点:
头文件包含:代码包含了一些头文件,用于支持线程、互斥锁、条件变量、队列、日志等功能。
ThreadPool 类模板:这是一个模板类,用于创建线程池。模板参数 T 用于表示任务的类型。
单例模式:getInstance() 函数实现了线程池的单例模式,确保程序中只有一个线程池实例。
成员变量:isStart_ 用于标记线程池是否已经启动,threadNum_ 表示线程池中线程的数量,taskQueue_ 是任务队列用于保存任务,mutex_ 和 cond_ 是互斥锁和条件变量用于对任务队列进行同步操作。
构造函数:私有化了构造函数,使得用户不能直接创建 ThreadPool 对象,只能通过 getInstance() 函数获取实例。
hreadRoutine 函数:这是线程执行的函数,每个线程将在该函数中循环等待任务,获取任务,执行任务,然后再次等待。
start 函数:用于启动线程池,创建指定数量的线程。
push 函数:用于向线程池中添加任务。
同步机制:lockQueue() 和 unlockQueue() 函数用于对任务队列进行加锁和解锁,haveTask() 函数用于判断任务队列是否为空,waitForTask() 函数用于等待任务,choiceThreadForHandler() 函数用于通知线程有任务可执行。
pop 函数:用于从任务队列中取出任务。
该线程池实现了简单的任务调度和线程池管理功能,可以用于处理需要多线程执行的任务,提高程序的并发性和效率。
Threadpool.cc
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <thread>
// 如何对一个线程进行封装, 线程需要一个回调函数,支持lambda
// class tread{
// };
int main()
{
prctl(PR_SET_NAME, "master");
const string operators = "+/*/%";
// unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());
unique_ptr<ThreadPool<Task> > tp(ThreadPool<Task>::getInstance());
tp->start();
srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());
// 派发任务的线程
while(true)
{
int one = rand()%50;
int two = rand()%10;
char oper = operators[rand()%operators.size()];
Log() << "主线程派发计算任务: " << one << oper << two << "=?" << "\n";
Task t(one, two, oper);
tp->push(t);
sleep(1);
}
}
代码说明
设置主线程名称:通过 prctl(PR_SET_NAME, “master”) 设置主线程的名称为 “master”,方便识别线程。
定义运算符:operators 字符串保存了支持的运算符 +、-、*、/ 和 %。
创建线程池:通过 ThreadPool::getInstance() 获取 Task 类型的线程池实例,并调用 start() 函数启动线程池。
派发任务的循环:while (true) 循环中,随机生成两个操作数 one 和 two,随机选择一个运算符 oper,然后创建一个 Task 对象 t,并将其添加到线程池中执行。每次派发任务后,主线程休眠 1 秒。
该代码模拟了主线程动态地生成计算任务,并将任务交给线程池中的线程进行处理。线程池的实现将有效地管理线程的生命周期,提供任务调度和线程复用的功能,提高程序的并发处理能力
Log.hpp
#pragma once
#include <iostream>
#include <ctime>
#include <pthread.h>
std::ostream &Log()
{
std::cout << "Fot Debug |" << " timestamp: " << (uint64_t)time(nullptr) << " | " << " Thread[" << pthread_self() << "] | ";
return std::cout;
}
代码说明
这个 Log() 函数可以用于在程序中输出调试信息,方便程序员在调试阶段查看程序的运行状态、变量值等信息。由于在输出信息时获取了当前线程的ID,可以帮助区分不同线程的调试信息。
Makefile
CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=threadpool
src=ThreadPoolTest.cc
$(bin):$(src)
$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:
rm -f $(bin)