【Linux多线程】信号量以及线程池
目录
- 【Linux多线程】信号量以及线程池
- POSIX信号量
- 基于环形队列的生产者消费者模型
- 线程池
作者:爱写代码的刚子
时间:2024.4.2
前言:本篇博客将会介绍Linux线程中的信号量以及线程池,完善生产者消费者模型
POSIX(可移植操作系统接口)信号量和 System V 信号量是两种不同的 IPC(进程间通信)机制,用于在多个进程之间同步和共享资源。它们有一些区别,下面是它们的主要区别:
-
API 和标准:
- POSIX 信号量:基于 POSIX 标准,通常在 Unix 系统上实现。
- System V 信号量:System V IPC(Inter-Process Communication)是 Unix 系统上的一种 IPC 机制,其中包括信号量、消息队列和共享内存。
-
命名:
- POSIX 信号量可以命名,因此可以在不相关的进程之间共享。
- System V 信号量通常是匿名的,只能由相关的进程共享。
-
使用方式:
- POSIX 信号量使用
sem_init()
、sem_wait()
、sem_post()
等函数进行操作。
- System V 信号量使用
semget()
、semop()
、semctl()
等函数进行操作。
- POSIX 信号量使用
-
错误处理:
- POSIX 信号量的错误处理更为简单,通常通过返回值或者设置全局变量
errno
来表示错误。 - System V 信号量的错误处理相对复杂,需要通过检查返回值和设置
errno
来判断错误。
- POSIX 信号量的错误处理更为简单,通常通过返回值或者设置全局变量
-
删除:
- POSIX 信号量在系统关闭时会自动删除,不需要显式删除。
- System V 信号量需要显式调用
semctl()
来删除。
-
可移植性:
- POSIX 信号量具有更好的可移植性,因为 POSIX 是一个标准,因此可以在不同的系统上使用相同的 API。
- System V 信号量在不同的 Unix 系统之间的行为可能会有所不同。
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
信号量的本质是一把计数器(临界资源的数量),类似不等同于
int cnt = n
,描述临界资源中资源数量的多少。
- 申请计数器成功,就表示具有访问资源的权限了
- 申请了计数器资源,不代表当前访问了资源,申请了计数器资源是对资源的预定机制
- 计数器可以有效保证进入共享资源的执行流的数量
信号量只是限定进入共享资源执行流的数量,但是需要我们程序员来保证每个执行流访问自己的资源
信号量用来描述资源数目,把资源是否就绪放在了临界区之外,申请信号量时其实就间接的已经在做判断了
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一篇生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序 (POSIX信号量):
基于环形队列的生产者消费者模型
- 环形队列采用数组模拟,用模运算来模拟环形特征
- 环形结构起始状态和结束状态都是一样的(指向的都是同一个位置),无法判断为空或者为满,
- 所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们有信号量这个计数器,就很简单的进行多线程间的同步过程
- 指向同一个位置,只能一个人访问:
空:生产者访问(P(生产者)关注还剩余多少空间)
满:消费者访问(C(消费者)关注还剩余多少数据)
所以我们需要两个信号量
- 小demo实验
Main.cc
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
const static int defaultcap = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)//申请信号量
{
sem_wait(&sem);
}
void V(sem_t &sem)//释放信号量
{
sem_post(&sem);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap),cap_(cap),c_step_(0),p_step_(0)
{
sem_init(&cdata_sem_,0,0);//第二个参数为0表示线程间共享,非0表示进程间共享
sem_init(&cspace_sem_,0,cap);
}
//在环形队列为空或为满时能表现出局部的互斥特征
void Push(const T&in)
{
P(cspace_sem_);
ringqueue_[p_step_] = in;
V(cdata_sem_);
//位置后移,维持环形操作
p_step_++;
p_step_ %= cap_;
}
void Pop(T*out)
{
P(cdata_sem_);
*out = ringqueue_[c_step_];
V(cspace_sem_);
c_step_++;
c_step_%=cap_;
}
~RingQueue(){
sem_destroy(&cdata_sem_);
sem_destroy(&cspace_sem_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_;//消费者下标
int p_step_;//生产者下标
sem_t cdata_sem_;//消费者关心数据资源
sem_t cspace_sem_;//生产者关心空间资源
};
RingQueue.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
const static int defaultcap = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)//申请信号量
{
sem_wait(&sem);
}
void V(sem_t &sem)//释放信号量
{
sem_post(&sem);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap),cap_(cap),c_step_(0),p_step_(0)
{
sem_init(&cdata_sem_,0,0);//第二个参数为0表示线程间共享,非0表示进程间共享
sem_init(&cspace_sem_,0,cap);
}
//在环形队列为空或为满时能表现出局部的互斥特征
void Push(const T&in)
{
P(cspace_sem_);
ringqueue_[p_step_] = in;
V(cdata_sem_);
//位置后移,维持环形操作
p_step_++;
p_step_ %= cap_;
}
void Pop(T*out)
{
P(cdata_sem_);
*out = ringqueue_[c_step_];
V(cspace_sem_);
c_step_++;
c_step_%=cap_;
}
~RingQueue(){
sem_destroy(&cdata_sem_);
sem_destroy(&cspace_sem_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_;//消费者下标
int p_step_;//生产者下标
sem_t cdata_sem_;//消费者关心数据资源
sem_t cspace_sem_;//生产者关心空间资源
};
注意,这个是在单生产者单消费者的情况下能够保证线程安全,因为生产者和消费者的下标表示是不同的,同时信号量在环形队列为空或为满时能表现出局部的互斥特征,但是在多生产者和多消费者的场景中可能会对同一个下标产生竞争,不能保证线程安全
改进代码使其满足多生产多消费的场景:
生产者与消费者的互斥关系已经由信号量来维护了,但是生产者与生产者,消费者与消费者之间的互斥关系还是需要我们自己来维护!
我们采用先申请信号量再加锁(理由:1. 信号量资源是不需要被保护的,它本身就是原子的 2. 如果先申请锁,那么申请锁和申请信号量就会变成串行,反之先申请信号量能一定程度上变成并行(提高并发量))
采用了环形队列,当队列不为空不为满时,生产者和消费者不会互相干扰,因为采用的是不同的下标,但是队列为空为满时,由信号量来维护生产者和消费者之间的关系(重点)
改进后的完整代码:
RingQueue.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
const static int defaultcap = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)//申请信号量
{
sem_wait(&sem);
}
void V(sem_t &sem)//释放信号量
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap),cap_(cap),c_step_(0),p_step_(0)
{
sem_init(&cdata_sem_,0,0);//第二个参数为0表示线程间共享,非0表示进程间共享
sem_init(&cspace_sem_,0,cap);
pthread_mutex_init(&c_mutex_,nullptr);
pthread_mutex_init(&p_mutex_,nullptr);
}
//在环形队列为空或为满时能表现出局部的互斥特征
void Push(const T&in)
{
P(cspace_sem_);
Lock(p_mutex_);
ringqueue_[p_step_] = in;
//位置后移,维持环形操作
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T*out)
{
P(cdata_sem_);
Lock(c_mutex_);
*out = ringqueue_[c_step_];
c_step_++;
c_step_%=cap_;
Unlock(c_mutex_);
V(cspace_sem_);
}
~RingQueue(){
sem_destroy(&cdata_sem_);
sem_destroy(&cspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_;//消费者下标
int p_step_;//生产者下标
sem_t cdata_sem_;//消费者关心数据资源
sem_t cspace_sem_;//生产者关心空间资源
pthread_mutex_t c_mutex_;
pthread_mutex_t p_mutex_;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
std::string opers = "+-*/%";
enum
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
Task(){}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
}
break;
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r+=oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
Main.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"
using namespace std;
struct ThreadData
{
RingQueue<Task> *rq;
std::string threadname;
};
void *Productor(void *args)
{
// sleep(3);
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 1. 获取数据
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 2. 生产数据
rq->Push(t);
cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
sleep(1);
}
return nullptr;
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
// 1. 消费数据
Task t;
rq->Pop(&t);
// 2. 处理数据
t();
cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
// sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(50);
pthread_t c[5], p[3];
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, td);
}
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, td);
}
for (int i = 0; i < 1; i++)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 1; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}
线程池
/*threadpool.h*/
/* 线程池:
* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着 监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利 用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:
* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技 术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误.
* 线程池的种类: * 线程池示例:
* 1. 创建固定数量线程池,循环从任务队列中获取任务对象,
* 2. 获取到任务对象后,执行任务对象中的任务接口
*/
Main.cc
#include <iostream>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"
pthread_spinlock_t slock;
int main()
{
// pthread_spin_init(&slock, 0);
// pthread_spin_destroy(&slock);
std::cout << "process runn..." << std::endl;
sleep(3);
// ThreadPool<Task> *tp = new ThreadPool<Task>(5);
ThreadPool<Task>::GetInstance()->Start();
srand(time(nullptr) ^ getpid());
while(true)
{
//构建任务
int x = rand() % 10 + 1;
usleep(10);
int y = rand() % 5;
char op = opers[rand()%opers.size()];
Task t(x, y, op);
ThreadPool<Task>::GetInstance()->Push(t);
//交给线程池处理
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
}
ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
std::cout << name << " run, "
<< "result: " << t.GetResult() << std::endl;
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
static ThreadPool<T> *GetInstance()
{
if (nullptr == tp_) // ???
{
pthread_mutex_lock(&lock_);
if (nullptr == tp_)
{
std::cout << "log: singleton create done first!" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
std::vector<ThreadInfo> threads_;
std::queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;