目录
一、POSIX信号量
二、基于环形队列和信号量的生产消费模型
三、线程池
一、POSIX信号量
POSIX信号量(POSIX Semaphores)是一种进程间或线程间同步机制,它允许进程或线程以协调的方式访问共享资源或进行其他形式的同步。与System V信号量相比,POSIX信号量提供了更简单的接口和更好的可移植性,并且可以用于线程间同步。下面是对POSIX信号量主要操作的详细解释:
信号量的本质是一把计数器,申请信号的本质就是预定资源。信号量有P操作和V操作,PV来源于荷兰语中的"Passeren"(等待/通过)和"Verhogen"(增加),但在英文文献中,它们通常被简单地称为P(或Wait)和V(或Signal)。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
- sem:指向sem_t类型变量的指针,该变量将被初始化为信号量。sem_t 代表一个信号量。
- pshared:控制信号量的共享属性。如果pshared的值为0,则信号量用于同一进程的线程间的同步;如果pshared的值非0,则信号量用于不同进程间的同步。
- value:信号量的初始值。通常,这个值是非负的,表示可用的资源数量或可以同时进入临界区的线程数。
销毁信号量
int sem_destroy(sem_t *sem);
参数:
- sem:指向要销毁的信号量的指针。
这个函数会释放信号量所占用的资源。在调用此函数之前,所有使用该信号量的线程都应该已经退出或完成了对该信号量的等待。
等待信号量(等同于 P操作)
int sem_wait(sem_t *sem);
参数:
- sem:指向要等待的信号量的指针。
当信号量的值大于0时,sem_wait会原子地将信号量的值减1并立即返回;如果信号量的值为0,调用线程将被阻塞,直到信号量的值变为正数,此时将其值减1,然后sem_wait返回。通常用于保护临界区或等待资源变得可用。
发布信号量(等同于 V操作)
int sem_post(sem_t *sem);
参数:
- sem:指向要发布的信号量的指针。
这个函数会原子地将信号量的值加1。如果这导致信号量的值大于0,且有一个或多个线程在sem_wait调用中阻塞,那么这些线程中的一个将被唤醒以继续执行。这通常用于表示资源已被释放或临界区已退出。
注意:
- 使用POSIX信号量时,必须确保在进程或线程结束前销毁所有已初始化的信号量,以避免资源泄漏。
- 在使用信号量时,必须确保没有死锁或竞态条件的发生。
- 在多线程程序中,如果信号量被多个线程共享,则需要确保对这些信号量的访问是原子的,但幸运的是,sem_init、sem_destroy、sem_wait和sem_post等函数本身就是线程安全的。
- 当使用进程间共享的信号量时(即pshared非0),可能需要考虑额外的同步机制来确保进程能够正确访问和修改信号量的值。这通常涉及到使用进程间通信(IPC)机制,如UNIX域套接字、命名管道或共享内存等。然而,对于大多数应用而言,线程间同步是更常见的用例。
举例演示POSIX信号量的用法:
#include <iostream>
#include <semaphore.h>
#include <thread>
#include <unistd.h>
#include <chrono>
sem_t sem;
void Producer()
{
for (int i = 0; i < 5; i++)
{
sem_wait(&sem); // 等待信号量
std::cout << "Producer produced item : " << i << std::endl;
sem_post(&sem);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
}
}
void Consumer()
{
for (int i = 0; i < 5; i++)
{
sem_wait(&sem);
std::cout << "Consumer consumed item " << i << std::endl;
sem_post(&sem);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟消费耗时
}
}
int main()
{
sem_init(&sem, 0, 1); // 初始化信号量为1,0表示信号量用于在同一进程内的同步
std::thread p_td(Producer);
std::thread c_td(Consumer);
p_td.join();
c_td.join();
sem_destroy(&sem);
return 0;
}
二、基于环形队列和信号量的生产消费模型
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,但是使用锁都是把公共资源当作一个整体使用。如果公共资源不当作整体,多线程不访问临界资源的同一个区域,来同时进入临界区。
现在基于固定大小的环形队列重写这个程序(POSIX信号量)
- 环形队列采用数组模拟,用模运算来模拟环状特性。
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
RingQueue.hpp如下:
#pragma once
#include <iostream>
#include <semaphore.h>
#include <thread>
#include <unistd.h>
#include <chrono>
#include <mutex>
#include <vector>
const int defaultSize = 3;
template <class T>
class RingQueue
{
public:
RingQueue(int size = defaultSize)
: _rq(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, size); // 生产者一开始的资源个数为size
sem_init(&_data_sem, 0, 0);
}
void Push(const T &in)
{
// 生产,先申请信号量再加锁
sem_wait(&_space_sem);
{
std::lock_guard<std::mutex> lock(_p_mutex);
_rq[_p_step] = in;
_p_step++;
_p_step %= _size;
}
sem_post(&_data_sem); // 通知消费者
}
void Pop(T *out)
{
// 生产,先申请信号量再加锁
sem_wait(&_data_sem);
{
std::lock_guard<std::mutex> lock(_c_mutex);
*out = _rq[_c_step];
_c_step++;
_c_step %= _size;
}
sem_post(&_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
}
private:
std::vector<T> _rq;
int _size;
int _p_step; // 生产者的生产位置
int _c_step; // 消费者的消费位置
sem_t _space_sem; // 生产者需要的资源是空间
sem_t _data_sem; // 消费者需要的资源是数据
std::mutex _p_mutex;
std::mutex _c_mutex;
};
Task.hpp如下:
#include <iostream>
#include <string>
#include <unistd.h>
const int defaultRet = 0;
const std::string opers = "+-*/%";
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op), _flag(ok), _ret(defaultRet)
{
}
void Run()
{
switch (_op)
{
case '+':
_ret = _x + _y;
break;
case '-':
_ret = _x - _y;
break;
case '*':
_ret = _x * _y;
break;
case '/':
{
if (_y == 0)
_flag = div_zero;
else
_ret = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_flag = mod_zero;
else
_ret = _x % _y;
}
break;
default:
_flag = unknow;
break;
}
}
void operator()()
{
Run();
}
std::string PrintTask()
{
std::string s;
s = std::to_string(_x);
s += _op;
s += std::to_string(_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(_x);
s += _op;
s += std::to_string(_y);
s += "=";
s += std::to_string(_ret);
s += " [";
s += std::to_string(_flag);
s += "]";
return s;
}
~Task()
{}
private:
int _x;
int _y;
char _op;
int _ret;
int _flag;//为0可信,其余不可信
};
Main.cc如下:
#include "RingQueue.hpp"
#include "Task.hpp"
void *Productor(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
int data1 = rand() % 10;
usleep(rand() % 200); // 防止速度过快数据相同
int data2 = rand() % 10;
usleep(rand() % 200);
char op = opers[rand() % (opers.size())];
Task t(data1, data2, op);
std::cout << "productor task: " << t.PrintTask() << std::endl;
rq->Push(t);
}
}
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
Task t;
rq->Pop(&t);
t();
std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ pthread_self());
pthread_t c[3], p[2];
RingQueue<Task> *rq = new RingQueue<Task>();
pthread_create(&p[0], nullptr, Productor, rq);
pthread_create(&p[1], nullptr, Productor, rq);
pthread_create(&c[0], nullptr, Consumer, rq);
pthread_create(&c[1], nullptr, Consumer, rq);
pthread_create(&c[2], nullptr, Consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
return 0;
}
三、线程池
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
ThreadPool.hpp如下:
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <pthread.h>
#include "Log.hpp"
#include "Task.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
const int defaultnum = 5; // 线程池默认线程数量
class ThreadData
{
public:
ThreadData(const std::string &threadname)
: _threadname(threadname)
{
}
~ThreadData()
{
}
public:
std::string _threadname;
};
template <class T>
class ThreadPool
{
public:
ThreadPool(int thread_num = defaultnum)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
// _threads.emplace_back(threadname,
// std::bind(&ThreadPool<T>::ThreadRun, this,
// std::placeholders::_1),
// td);
Thread<ThreadData> t(threadname,
std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
_threads.push_back(t);
// ThreadRun是成员函数,第一个参数为this,bind的作用是将this绑定到函数的第一个变量
lg.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;
void ThreadWait(const ThreadData &td)
{
lg.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
pthread_cond_wait(&_cond, &_mtx);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadRun(ThreadData &td)
{
while (true)
{
T t; // 取任务
{
LockGuard lock(&_mtx);
while (_q.empty())
{
ThreadWait(td);
lg.LogMessage(Debug, "thread %s is wakeup\n", td._threadname.c_str());
}
t = _q.front();
_q.pop();
}
// 处理任务
t();
lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
bool Start()
{
for (auto &thread : _threads)
{
thread.Start();
lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
void Push(T &in)
{
lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
LockGuard lock(&_mtx);
_q.push(in);
ThreadWakeup();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
for (auto &thread : _threads)
{
thread.Join();
}
}
private:
std::queue<T> _q; // 任务队列
std::vector<Thread<ThreadData>> _threads; // 线程集合
int _thread_num; // 线程池中线程的数量
pthread_mutex_t _mtx; // 互斥锁,同步对任务队列的访问
pthread_cond_t _cond; // 条件变量,和互斥锁一起使用
};
main.cc如下:
#include "ThreadPool.hpp"
#include <unistd.h>
int main()
{
lg.ChangeStyle(ClassFile);
ThreadPool<Task> tp;
tp.Start();
srand((uint64_t)time(nullptr) ^ getpid());
while (true)
{
int x = rand() % 100 + 1;
usleep(1234);
int y = rand() % 200;
usleep(1234);
char oper = opers[rand() % opers.size()];
Task t(x,y,oper);
tp.Push(t);
sleep(1);
}
return 0;
}
具体代码:模拟实现线程池