前言
大家好呀,欢迎来到我的Linux学习笔记~
本篇承上Linux多线程创建,线程互斥(互斥锁),线程同步(条件变量),继下接着学习线程同步的另一个信号量,以及后序的线程池,线程的懒汉单例模式和其他锁相关知识。(注意本篇博客代码居多)
Linux多线程笔记上篇:【Linux】Linux多线程(上)_柒海啦的博客-CSDN博客
话不多说,我们直接开始吧~
目录
一、线程同步-信号量
1.何为信号量?
2.信号量相关接口
-sem_init-
-sem_destroy-
p操作:-sem_wait-
v操作:-sem_post-
3.基于环形队列的生产消费者模型
结构设计:
代码设计:
编码:
二、线程池
大致思路:
Thread.hpp 实现线程对象的文件
Mutex.hpp 实现锁的对象管理 RAII进行上锁和解锁操作
log.hpp 日志文件
Task.hpp 任务文件
ThreadPool.hpp 线程池实现文件
TestMain.cpp 源文件-测试文件派发任务
三、线程安全与其他的锁
1.线程安全的单例模式
2.其他的常见锁
读者写者问题:
读写锁接口介绍:
一、线程同步-信号量
关于线程同步,另外还有一个POSIX信号量方案,与进程间通信的信号量作用相同,用于同步操作,达到访问共享资源的目的。
首先,我们需要了解的是信号量究竟是什么,为什么能够起到线程同步的效果呢?
1.何为信号量?
在电影院里我们看电影,是不是需要事先的去购票,购得座位后在特定时间便可占此座位观看,事后此座位供给下一场购得此座位的人观看电影。
那么我们不妨将上述例子中的座位视为资源,我们人去买票如图多个执行流去并发的争取座位。观看完电影后在将此座位供给下一人观看就如同资源释放。
我们知道,此时视为资源之物就是临界资源,因为必须互斥,即一场电影一个座位只能有一人拥有。但是整个电影场便可视为一整个资源,里面的座位便可分区分块,便于并发执行。
如果多个线程访问的是资源的不同区域的话,是互斥去访问整个资源的话(整场电影),结果是对的,但是效率太低了,所以此时可以让多线程并发的去访问。 但是:1.你怎么知道一共多少个资源,还剩多少个?(信号量的值)2.你怎么保证这个资源就是给你的呢(程序员编码)?我一定可以具有一个共享资源呢?(信号量本质就是一个计数器,是预定的机制)
如上的电影院例子,买票就是确定座位,而买票的本质就是资源(电影院中的座位)的预定机制。那么信号量的本质就是一个计数器,访问临界资源的时候必须先申请信号量资源(sem--)预定资源(p),使用完毕对应的信号量资源(sem++)释放资源(v)。
所以,针对当临界资源存在多个区域,每次执行流只需要访问其中一块的话,就可以使用信号量的策略实现线程同步,即如果一个执行流申请到了一个信号量:当前执行流一定具有一个资源可以被其使用,至于是哪一个资源需要程序员结合场景,自定义编码完成。
2.信号量相关接口
基于Linux原生线程库,线程同步的信号量有如下接口:
首先是初始化信号量变量(sem_t)。
-sem_init-
man 3 sem_init
头文件:
#include <semaphore.h>
函数原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_init()将未命名信号量初始化为sem指向的地址,参数value指定了的初始值信号量。线程间同步/进程间同步。
函数参数:
sem:信号量变量
pshared:0表示线程间共享,线程可见,非零表示进程间共享。
value:设置此信号量初始值
返回值:
成功返回0,错误返回-1,并设置errno来指出错误。
信号量变量的释放(摧毁)。
-sem_destroy-
man 3 sem_destroy
头文件:
#include <semaphore.h>
函数原型:
int sem_destroy(sem_t *sem);
sem_destroy()会销毁sem指向的地址处的未命名信号量。
函数参数:
sem:信号量变量
返回值:
成功返回0,错误返回-1,并设置errno来指出错误。
*信号量的pv操作:
p操作:-sem_wait-
man 3 sem_wait
头文件:
#include <semaphore.h>
函数原型:
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
sem_wait()减少(锁定)sem指向的信号量。如果信号量的值大于零,则递减继续执行,函数立即返回。如果信号量当前值为0,则调用将阻塞,直到其中一个为0为止可以执行递减(即信号量值上升到零以上),或者信号处理程序中断调用。sem_trywait()与sem_wait()相同,不同之处是如果不能立即执行递减操作,则调用将返回错误(errno set to EAGAIN)而不是阻塞。sem_timedwait()与sem_wait()相同,不同之处是abs_timeout指定了调用应该阻塞的时间量的限制。
函数参数:
sem:信号量变量
返回值:
成功返回0,错误返回-1,并设置errno来指出错误。
v操作:-sem_post-
man 3 sem_post
头文件:
#include <semaphore.h>
函数原型:
int sem_post(sem_t *sem);
sem_post()增加(解锁)sem指向的信号量。如果信号量的值最终大于零,然后,在sem_wait(3)调用中阻塞的其中一个进程或线程将被唤醒,并继续锁定信号量。
函数参数:
sem:信号量变量
返回值:
成功返回0,错误返回-1,并设置errno来指出错误。
3.基于环形队列的生产消费者模型
介绍完上述接口后,我们不妨借助此线程同步,来实现一个不同于阻塞队列的生产消费者模型。
结构设计:
在之前的阻塞队列中,我们的中间环节即321原则中的1个交易场所就是一个整体,每次只能由一个消费者线程或者生产者线程进行访问。但是如果此交易场所分为几块,并且能够环形的进行流动控制的话,不妨并发执行,生产者生产放它的资源,消费者拿走它的资源效率更高。
那么能够胜任此结构的就是环形队列了,如下图所示:
此环状结构可以使用数组进行模拟。但是由于环状结构的特殊性,我们需要注意的就是:1.判空2.判满。
在正常情况下,判断环形空和满可以用如下两计:a、加计数器 b、专门浪费一个格子 -- 镂空设计。由于是多线程设计,我们此处利用信号量当做计数器使用即可。
代码设计:
设计一个生产者消费模型,此时生产者操作临界资源就是写端,消费者就是读端。
环形结构中,为空消费者不可进行,为满生产者不可进行.此时两者是处于同一位置的.如果生产和消费指向了环状结构的同一个位置(为空或者为满):生产者和消费要有互斥和同步问题.(无论消费者还是生产者,只能有一个再跑)但是此事件是小概率事件,大部分都是生产和消费指向的是不同的位置.-->*想让当生产者和消费指向同一个位置,具有互斥同步关系即可.而当不指向同一个位置的时候,让他们并发执行!!
如上图,左侧就可判空,右侧就可判满。
期望:
生产者不可将消费者套圈.-数据覆盖
消费者不能超过生产者.
为空:一定要生产者先运行.
为满:一定要消费者先运行.
其他情况并发执行为了为空和为满条件,根据信号量为0进行p操作阻塞的条件可以分别设计两个信号量:
生产者:空间资源 ->spaceSem-> N
消费者:数据资源 ->dataSem->0生产:p(spaceSem--)(大于0--,否则会阻塞挂起)->信号量申请成功:特定位置生产->v(dataSem++)注意此时生产者生产完了退出,此时当前信号量不可被释放,因为此资源依旧是被占用的.但是数据资源需要增加.
消费:p(dataSem--)(大于0--,否则会阻塞挂起)->信号量申请成功,特定位置消费->v(spaceSem++)数据被消费了,此空间释放.
除了生产者和消费者线程之间完成同步互斥外,生产者之间以及消费者之间别忘了需要进行互斥(三二一原则中的三种关系)。互斥的话可以利用锁进行保护push和pop操作临界资源。临界资源自然是访问环形队列的时候。但是注意,我们访问环形队列前是需要先申请信号量的,那么信号量是否也要加入呢?
首先,信号量本身就是原子操作,自然不必保护。并且效率是体现在快速的派发任务,申请信号量在申请互斥锁前面的话,可以快速并发争取位置,所以效率自然会有所提高。(形象的例子:先上锁-> 电影院排队买票入座 先申请信号量->先购票(网上购票),凭票入座)
编码:
设计的代码如下:
Mutex.hpp
#ifndef _MUTEX_
#define _MUTEX_
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mtx, nullptr);
}
void lock()
{
pthread_mutex_lock(&_mtx);
}
void unlock()
{
pthread_mutex_unlock(&_mtx);
}
~Mutex()
{
pthread_mutex_destroy(&_mtx);
}
private:
pthread_mutex_t _mtx;
};
// 互斥锁的RAII模式
class MutexGuardian
{
public:
MutexGuardian(Mutex& mtx)
:_mtx(mtx)
{
_mtx.lock();
}
~MutexGuardian()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
#endif
Sem.hpp
#ifndef _SEM_
#define _SEM_
#include <semaphore.h>
class SemObject
{
public:
SemObject(unsigned int num)
{
sem_init(&_sem, 0, num); // 初始化信号量 线程间共享
}
void p()
{
//信号量--操作
sem_wait(&_sem);
}
void v()
{
// 信号量++操作
sem_post(&_sem);
}
~SemObject()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};
#endif
RingQueue.hpp
#ifndef _RING_QUEUE_ // 如果没有定义宏_RING_QUEUE_就执行下面步骤
#define _RING_QUEUE_
// 防止头文件重复定义,和微软#pragma once同理
#include <iostream>
#include <vector>
#include "Mutex.hpp"
#include "Sem.hpp"
static const int defaultCapacity = 6; // 默认给6个空间
template<class T>
class RingQueue
{
public:
RingQueue(int capacity = defaultCapacity)
:_rq(capacity), _capacity(capacity)
,_PSem(capacity), _CSem(0)
,_pindex(0), _cindex(0)
{}
// 生产者
void push(const T& val)
{
_PSem.p(); // 剩余空间数量--
MutexGuardian mtx(_mtx); // 上锁 生产者生产者之间保持互斥
_rq[_pindex++] = val;
_pindex %= _capacity; // 环形队列
_CSem.v(); // 资源空间++
}
// 消费者
void pop(T* val)
{
_CSem.p(); // 资源空间--
MutexGuardian mtx(_mtx);
*val = _rq[_cindex++];
_cindex %= _capacity;
_PSem.v(); // 剩余空间++
}
void debug()
{
// 测试函数
}
private:
std::vector<T> _rq;
size_t _capacity;
SemObject _PSem; // 生产者信号量,初始化为申请空间个数,为剩余空间个数
SemObject _CSem; // 消费者信号量,初始化为0个,为生产者生产还未消费的资源个数
Mutex _mtx; // 互斥锁对象
int _pindex; // 生产者下标
int _cindex; // 消费者下标
};
#endif // 预处理结束
TestMain.cpp
#include <iostream>
#include <pthread.h>
#include <cstdlib>
#include <time.h>
#include <unistd.h>
#include "RingQueue.hpp"
// 生产者
void* producer(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*) args;
while (true)
{
int num = rand() % 99 + 1;
rq->push(num);
std::cout << "生产者" << pthread_self() << " 生产数据:" << num << std::endl;
}
return nullptr;
}
// 消费者
void* consumer(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*) args;
while (true)
{
sleep(1);
int num;
rq->pop(&num);
std::cout << "消费者" << pthread_self() << " 取得数据:" << num << std::endl;
}
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid());
RingQueue<int>* rq = new RingQueue<int>;
pthread_t ptid[4];
pthread_t ctid[2];
pthread_create(ptid, nullptr, producer, (void*)rq);
pthread_create(ptid + 1, nullptr, producer, (void*)rq);
pthread_create(ptid + 2, nullptr, producer, (void*)rq);
pthread_create(ptid + 3, nullptr, producer, (void*)rq);
pthread_create(ctid, nullptr, consumer, (void*)rq);
pthread_create(ctid + 1, nullptr, consumer, (void*)rq);
for (int i = 0; i < 4; ++i) pthread_join(ptid[i], nullptr);
for (int j = 0; j < 2; ++j) pthread_join(ctid[j], nullptr);
delete rq;
return 0;
}
测试代码为多线程环境,由于并发执行打印问题,可以自行测试:
因为消费者线程执行慢,所以先生产者生产一堆出来,然后消费者消费一点,生产者生产一点执行下去。
二、线程池
线程池的存在和空间适配器类似。比如申请空间,如果需要才向系统申请空间,每次系统申请空间会非常麻烦,耗时间,导致整体的效率降低... 所以一般就会预先多申请一部分.提高空间的使用效率 空间换时间的做法。
类似的,预先创建一批线程,每次需要创建线程执行任务提供其中的一个线程进行执行即可.而不是每次需要创建线程就去系统去创建.更何况线程的创建可不单单是申请空间那么简单.
上述利用的就是池化技术,实际上就是资源的预分配机制,以空间换取时间便于提高效率。
但是我们可以对此池简单一想,我们向线程池派发任务,线程池派发线程执行,中间存储任务,这实际上就是一个生产者与消费者模型呀。那么编码实现起来就非常方便了。
如下代码为我简单实现的一个线程池,多加了一点日志文件。
大致思路:
首先ThreadPool.hpp文件存放实现线程池的代码。因为需要预先创建一批线程,创建一批线程的话就需要pthread_create,pthread_join,pthread_destroy一系列的接口,并且一开始还可以不用启动,只是先预备好,那么实际上我们可以设计一个类对线程的创建、等待、摧毁一系列接口进行封装。
所以用文件Thread.hpp对线程进行封装,这样每次创建一个线程对象,先把回调函数以及参数传递进去。并且为了区分线程之间的不同,可以设置线程的名字(利用编号)。在编写启动接口创建线程加载函数,join接口等待此线程。
创建一批线程,那么一批线程对象需要容器保存,可以使用vector进行保存,然后在线程池创建之初new好即可。提供接口run让当前线程池的所有线程跑起来,准备接收任务。提供接口pushtask传入特定类型的仿函数对象,方便每个线程接收后调用。由于存在消费者和生产者(消费者-线程池的线程,生产者:派发任务),那么利用缓冲区(这里利用队列)内的个数对消费者和生产者之间进行互斥和同步操作-即条件变量,同样的互斥也要做好(3种关系的互斥)
可以通过日志选择打印消息到屏幕或者文件内。
Thread.hpp 实现线程对象的文件
#ifndef _THREAD_
#define _THREAD_
#include <pthread.h>
#include <string>
#include <iostream>
#include "log.hpp"
typedef void* (*func_t) (void*); // 定义函数指针
class ThreadData
{
public:
std::string _name;
void* _args;
};
class Thread
{
public:
Thread(int num, func_t func, void* args)
:_func(func)
{
_data._args = args;
// _data._name = "thread" + std::to_string(num);
char buffer[64];
snprintf(buffer, sizeof buffer, "%s-%d", "thread", num);
_data._name = buffer;
}
void start()
{
// 启动线程
pthread_create(&_tid, nullptr, _func, (void*)&_data);
// std::cout << _data._name << "启动成功!" << std::endl;
logMessage(WARNING, "%s:%s", _data._name.c_str(), "启动成功");
}
void join()
{
pthread_join(_tid, nullptr);
}
~Thread()
{}
private:
pthread_t _tid;
func_t _func;
ThreadData _data;
};
#endif
Mutex.hpp 实现锁的对象管理 RAII进行上锁和解锁操作
#ifndef _MUTEX_
#define _MUTEX_
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mtx, nullptr);
}
void lock()
{
pthread_mutex_lock(&_mtx);
}
void unlock()
{
pthread_mutex_unlock(&_mtx);
}
~Mutex()
{
pthread_mutex_destroy(&_mtx);
}
pthread_mutex_t _mtx;
};
// 互斥锁的RAII模式
class MutexGuardian
{
public:
MutexGuardian(Mutex* mtx)
:_mtx(mtx)
{
_mtx->lock();
}
~MutexGuardian()
{
_mtx->unlock();
}
private:
Mutex* _mtx;
};
#endif
log.hpp 日志文件
#ifndef _LOG_
#define _LOG_
#include <time.h>
#include <iostream>
#include <stdarg.h>
#include <cstring>
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define FATAL 3
const char* levels[] = {
"DEBUG",
"NORMAL", // 正常的
"WARNING", // 警告
"FATAL" // 致命的错误
};
const char* logPath = "./ThreadPool.log";
// 日志包含的必要元素:错误等级 时间 支持用户自定义等...
static void logMessage(int level, const char* format, ...) // 可变参数
{
#ifndef _DEBUG_
if (level == DEBUG) return;
#endif
char headar[1024];
time_t timeStamp = time(nullptr);// 获取时间戳 默认时区
struct tm* tm = localtime(&timeStamp); // 获取相关结构体
char* timeString = asctime(tm);
timeString[strlen(timeString) - 1] = '\0';
snprintf(headar, sizeof headar, "[%s] [%s]", levels[level], timeString);
// std::cout << timeString << std::endl;
char endar[1024];
va_list ap;
va_start(ap, format);
vsprintf(endar, format, ap); // 自定义内容于格式化于endar
va_end(ap);
#ifdef MYFILE
FILE* fp = fopen(logPath, "a");
fprintf(fp, "%s:%s\n", headar, endar);
fclose(fp);
#else
printf("%s:%s\n", headar, endar);
#endif
}
#endif
Task.hpp 任务文件
#ifndef _TASK_
#define _TASK_
#include <functional>
typedef std::function<int (int, int, char)> func_;
// 任务类型 - 仿函数
class Task
{
public:
Task() = default;
Task(int x, int y, char type, func_ func):_x(x), _y(y), _type(type), _func(func)
{}
int operator()() // 重载() - 仿函数
{
return _func(_x, _y, _type);
}
int _x;
int _y;
char _type;
func_ _func;
};
#endif
ThreadPool.hpp 线程池实现文件
#ifndef _THREAD_POOL_
#define _THREAD_POOL_
// 线程池
#include "Thread.hpp"
#include "Mutex.hpp"
#include "log.hpp"
#include <unistd.h>
#include <queue>
#include <iostream>
const int threadDefaultNum = 4; // 默认线程池的线程个数
template<class T>
class ThreadPool
{
public:
T popTask()
{
T task = _task.front();
_task.pop();
return task;
}
Mutex* outMutxObj()
{
return &_mtx;
}
bool taskEmpty()
{
return _task.empty();
}
void CondWait()
{
// 条件变量等待
pthread_cond_wait(&_cond, &_mtx._mtx);
}
public:
ThreadPool(int threadNum = threadDefaultNum):_threadNum(threadNum)
{
pthread_cond_init(&_cond, nullptr); // 初始化条件变量
for (int i = 1; i <= threadNum; ++i)
{
_thread.push_back(new Thread(i, routine, this));
}
}
void run()
{
// 线程池启动函数 全体线程启动起来,准备分配任务进行执行
for (int i = 0; i < _threadNum; ++i) _thread[i]->start();
}
static void* routine(void* args) // 静态方法,这样参数里才可以不出现this指针
{
ThreadData* td = (ThreadData*)args;
ThreadPool<T>* tp = (ThreadPool<T>*)(td->_args);
T task;
while (true)
{
{
MutexGuardian mtx(tp->outMutxObj()); // 上锁
while (tp->taskEmpty()) tp->CondWait(); // 等待
task = tp->popTask();
}
// std::cout << td->_name << "执行任务:" << task._x << task._type << task._y << "=" << task() << std::endl; // 执行任务
logMessage(NORMAL, "%s 执行任务: %d%c%d=%d", td->_name.c_str(), task._x, task._type, task._y, task());
}
return nullptr;
}
void pushTask(T task)
{
MutexGuardian mtx(&_mtx); // 上锁
_task.push(task);
// std::cout << "生产任务:" << task._x << task._type << task._y << "=?" << std::endl;
logMessage(NORMAL, "生产任务: %d%c%d=?", task._x, task._type, task._y);
pthread_cond_signal(&_cond); // 发信号
}
~ThreadPool()
{
pthread_cond_destroy(&_cond);
for (int i = 0; i < _threadNum; ++i)
{
_thread[i]->join();
delete _thread[i]; // new空间自然delete掉
}
}
private:
std::vector<Thread*> _thread; // 线程池 封装线程类型,便于创建线程 321中2个角色中的消费者
std::queue<T> _task; // 任务缓冲区 321中的1个场景
int _threadNum; // 线程池中的线程个数
Mutex _mtx; // 一把互斥锁即可 自定义的互斥锁对象
pthread_cond_t _cond; // 条件变量,用户同步线程和派发任务
};
#endif
TestMain.cpp 源文件-测试文件派发任务
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "log.hpp"
#include <iostream>
#include <time.h>
#include <unistd.h>
int main()
{
// logMessage(DEBUG, "%s-%d", "test", 6);
ThreadPool<Task> tp;
tp.run();
char types[] = {'+', '-', '*', '/'};
auto func = [](int x, int y, int type){
switch(type)
{
case '+':
return x + y;
break;
case '-':
return x - y;
break;
case '/':
return x / y;
break;
case '*':
return x * y;
break;
}
};
srand(time(nullptr) ^ getpid());
while (true)
{
// std::cout << "1" << std::endl;
int x = rand() % 10 + 1;
int y = rand() % 10 + 1;
Task task(x, y, types[rand() % 4], func);
usleep(rand() % 1000);
tp.pushTask(task);
sleep(1);
}
return 0;
}
禁止-DMYFILE操作:
允许-DMYFILE操作:
注意上述线程池编写存在优化选项。
三、线程安全与其他的锁
1.线程安全的单例模式
当然,像线程池这种一般程序里只能存在一个,所以我们一般按照单例模式进行设计。
单例模式分为饿汉和懒汉模式。博客【C++】特殊类相关设计_柒海啦的博客-CSDN博客对两种模式在单执行流情况下进行了详细介绍。饿汉不存在线程安全问题,但是存在初始化顺序以及初始化过多造成的程序启动满等问题。懒汉虽然可以有效避免饿汉的问题,但是存在线程安全,需要将此问题处理好。
我们这里线程池采用懒汉的单例模式。由于多线程环境下,所以要切实考虑到线程安全问题。
单例模式就是当前程序只能存在一个此类型的对象,所以自然就不可将创建此类型的构造函数公开,由于是懒汉模式,所以一般使用调出接口的时候如果自身对象指针为空就进行创建,否则就直接输出即可。
但是如果一开始就是多个线程并发的访问此单例对象,那么可能出现并发初始化的问题,即可能创建出多个new对象出来 - 自身对象指针就是一个临界资源。所以我们就需要进行互斥操作。但是由于是没有创建对象就要发生的事情,所以我们的互斥锁自然需要一个全局的或者静态的进行操作。
但是需要注意的是,如果加上互斥锁,那么在第一次初始化后进行多次并发访问的话压根就不会访问临界区,但是还是要竞争锁,互斥访问,这样的话效率就十分低下,所以我们可以在互斥锁外面在多一层条件判断提升效率。
template<class>
class ThreadPool
{
...
private:
// 单例模式,构造函数私有化
ThreadPool(int threadNum):_threadNum(threadNum)
{
pthread_cond_init(&_cond, nullptr); // 初始化条件变量
for (int i = 1; i <= threadNum; ++i)
{
_thread.push_back(new Thread(i, routine, this));
}
}
// 禁止拷贝构造和赋值
ThreadPool(const ThreadPool<T>&) = delete; // 删除拷贝构造
ThreadPool<T>& operator= (const ThreadPool<T>&) = delete; // 删除赋值
public:
static ThreadPool<T>* getThreadPool(int threadNum = threadDefaultNum)
{
// 设置为静态方法 通过类名调用静态方法 存在线程安全问题
if (_tp == nullptr)
{
MutexGuardian mtx(mtx_); // 互斥锁 RAII
if (_tp == nullptr) _tp = new ThreadPool(threadNum);
}
return _tp;
}
...
static ThreadPool<T>* _tp; // 指向本身类型的静态成员变量 - 类变量
static Mutex* mtx_; // 静态的互斥锁对象,用来保护单例模式
};
template<class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr; // 加上const就可以在类里进行初始化了c++特性
template<class T>
Mutex* ThreadPool<T>::mtx_ = new Mutex; // 定义
当然,上述的互斥锁可以利用原生的接口或者C++的接口,我这里使用的是上面自己封装的Mutex接口哦。
关于线程安全,除此之外STL容器为了实现方便以及使用方便并没有考虑线程安全问题,所以我们一般使用C++的STL库的时候需要自己注意线程安全问题;智能指针的话 - unique_ptr 只在当前代码块范围内生效 -不涉及 shared_ptr 多个对象需要引用计数 -存在线程安全问题。
2.其他的常见锁
上述我们介绍的互斥锁实际上是属于悲观锁,在上层的划分锁大致如下:
锁 | 介绍 |
---|---|
悲观锁 | 在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。 |
乐观锁 | 每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。 |
自旋锁 | 通过不断的检测锁的状态,来进行资源是否就绪的方案。(互斥锁会直接阻塞挂起,但是自旋不会,会不断的去等) |
关于自旋锁和悲观锁(互斥锁),什么时候使用自旋锁呢?这就要根据我们的业务需求等待事件的容忍度来决定了。如果时间短的话就可以进行申请。另外自旋锁的接口类似,锁变量类型为pthread_spinlock_t,接口为pthread_spin_xxx 使用起来和互斥锁类似,这里不再过多介绍。
另外,在多线程的环境下,有可能一种数据修改的机会非常少,但是读此数据的会非常频繁,并且读取数据的话并不会取走数据,这就造成了读数据的线程可以并发进行,不用互相互斥,是一种共享的状态。如果此时是使用的生产者消费者模型-互斥锁,那么效率会非常低下。
所以针对此种情况,存在一种锁可以解决效率问题,那就是读写锁。
读写锁实际上是一种特殊的自旋锁,在了解读写锁之前,我们不妨先了解一下和生产者消费者类似的一个模型-读者写者模型:
读者写者问题:
首先,在前面我们介绍了生产者与消费者的三二一原则,对于读者写者问题也存在三二一原则。这里我们利用的例子可以是我平时写博客:
首先还是三种关系:读者和读者之间,读者和写者之间,写者和写者之间。
读者和读者之间存在像消费者与消费者之间的互斥关系么?显然不存在。对于一种资源,消费者自然要互斥的进行竞争,但是读者却是共享的拥有此资源。比如我这篇博客,你现在正在看,是读者,但是其余地方也存在读者正在观看,两者观看存在影响吗?自然不存在,因为是共享的,所以不存在互斥的关系。(这也是和生产者与消费者模型最大的区别)
读者和写者之间:这个就类似于消费者和生产者之间了。首先存在互斥关系:我写博客的时候自然不存在读者去读我这篇文章。并且也存在同步关系,比如我的文章没有读了,我可能会继续修改或者重新写文章让有读者去读(这里例子比较牵强,用黑板报的例子更加具体:读者在看黑板报的时候写者总不会去擦掉读者正在看的部分,等读者看完在进行修改不迟)
写者和写者之间:这个自然是互斥的,因为博客是不允许抄袭的,别人写的就是别人的,不可将别人的东西拿过来当做自己的。
二个角色自然就是写者和读者,一个场所就比如现在这个csdn平台。读者和写者共享的平台上的文章(一个读文章,一个写文章)
读写锁接口介绍:
为了举出一个实例,这里简单介绍一下读写锁的相关接口:
类型/函数 描述 pthread_rwlock_t 读写锁变量类型,类似于pthread_mutex_t 互斥锁变量类型。
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref); 设置读写锁优先级:(当读者和写者同时申请锁的时候,默认读锁优先申请,但是可以进行设置,只不过写锁优先目前存在bug) int pthread_rwlock_init (pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);读写锁初始化,读写锁变量和读写锁的属性。 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); 读者锁加锁。(读者共享,和写者互斥) int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); 写者锁加锁。(读者和写者均互斥) int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); 读写锁解锁(读者和写者均是) int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 释放读写锁空间 另外,对于第二个设置属性里的pref公有三种选择(pthread_rwlockattr_t 属于一种属性类型,注意首先需要使用pthread_rwlockattr_init对其进行初始化)
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
我们利用上面的读写锁实现一个简单的代码:存在总数1000,多个读者线程对齐读取,检测每次总数个数,少量写线程对其进行--。代码如下:
// 读写锁测试代码 -20230211-
#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>
#include <unistd.h>
#include "log.hpp" // 日志文件
typedef void* (*func_t)(void*); // 设置函数指针
static const int RTHREAD = 0; // 读线程
static const int WTHREAD = 1; // 写线程
volatile int nums = 1000; // volatile表示此变量不可优化到寄存器,存在对其修改的操作
pthread_rwlock_t rwlock; // 读写锁对象
// 简单封装一下线程对象
class Thread
{
public:
Thread(int flag, int num, func_t func) // 什么线程 编号 回调函数
:_func(func)
{
if (flag == RTHREAD) _id = "ReadThread-";
else if (flag == WTHREAD) _id = "WriteThread-";
else
{
std::cerr << "flag is RTHREAD and WTHREAD" << std::endl;
exit(-1); // 终止程序
}
_id += std::to_string(num);
}
void start()
{
// 启动线程
pthread_create(&_tid, nullptr, _func, (void*)_id.c_str());
}
void join()
{
// 等待回收线程
pthread_join(_tid, nullptr);
}
private:
pthread_t _tid;
std::string _id;
func_t _func;
};
// 对读写锁初始化函数
void initRwlock(int flag = RTHREAD) // flag为上面的RTHREAD-读优先 WTHRERAD-写优先 默认读优先或者其余错误选项
{
if (flag == WTHREAD)
{
pthread_rwlockattr_t attr; // 属性类型,首先需要进行初始化
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NP); // 设置写者优先 -存在bug
pthread_rwlock_init(&rwlock, &attr);
}
else pthread_rwlock_init(&rwlock, nullptr); // 默认读者优先
}
// 读者回调函数
void* readThread(void* args)
{
// 注意读写锁需要进行初始化以及确定优先级
// 读者反复读取数据即可
char* str = (char*) args;
while (true)
{
pthread_rwlock_rdlock(&rwlock); // 读者锁 读者同步 写者互斥
// 临界资源区
if (nums > 0) logMessage(NORMAL, "%s:%d", str, nums);
else
{
pthread_rwlock_unlock(&rwlock);
break;
}
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
// 写者回调函数
void* writeThread(void* args)
{
// 注意读写锁需要进行初始化以及确定优先级
// 写者需要进行--操作
char* str = (char*) args;
while (true)
{
pthread_rwlock_wrlock(&rwlock); // 写者锁 写者-读者互斥
// 临界资源区
if (nums > 0)
{
--nums;
logMessage(WARNING, "%s进行了写操作!", str);
}
else
{
pthread_rwlock_unlock(&rwlock);
break;
}
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
// 对读者线程进行创建 - 启动函数函数
void readThreadCreate(std::vector<Thread*>& reads, int nums)
{
// 创建 + 启动
for (int i = 1; i <= nums; ++i)
{
reads.push_back(new Thread(RTHREAD, i, readThread));
reads[i - 1]->start();
}
}
// 对写者线程进行创建 - 启动函数函数
void writeThreadCreate(std::vector<Thread*>& writes, int nums)
{
// 创建 + 启动
for (int i = 1; i <= nums; ++i)
{
writes.push_back(new Thread(WTHREAD, i, writeThread));
writes[i - 1]->start();
}
}
// 对读线程进行等待
void readThreadJoin(std::vector<Thread*>& reads)
{
for (int i = 0; i < reads.size(); ++i)
{
reads[i]->join();
delete reads[i]; // 别忘了new的需要释放
}
}
// 对写线程进行等待
void writeThreadJoin(std::vector<Thread*>& writes)
{
for (int i = 0; i < writes.size(); ++i)
{
writes[i]->join();
delete writes[i]; // 别忘了new的需要释放
}
}
int main()
{
// 主线程
// 决定读读线程个数和写线程个数
int readNums = 10;
int writeNums = 2;
// 初始化读写锁
initRwlock(WTHREAD); // 默认读优先
// 创建多线程环境
std::vector<Thread*> reads;
std::vector<Thread*> writes;
readThreadCreate(reads, readNums);
writeThreadCreate(writes, writeNums);
// 运行多线程后进行线程等待
readThreadJoin(reads);
writeThreadJoin(writes);
// 别忘了最后的读写锁变量释放
pthread_rwlock_destroy(&rwlock);
return 0;
}
上述代码分模块编写,经测试打印到文件内效果好点,引入了上面编写的log日志文件(实际效果感觉不出来),也可以自行设计,上述代码仅供参考,错误可以指出!