W...Y的主页 😊
代码仓库分享 💕
前言:在之前的进程间通信时我们就讲到过信号量,他的本质就是一个计数器,用来描述临界资源的一个计数器。我们当时使用电影院的例子来说明信号量。电影院的座位被我们称为临界资源,只有买到票才能有座位去看电影,申请信号量就是预定买票,申请成功才可以继续往下走下去。而我们看完电影就会释放临界资源,这些资源就可以被别人申请了。
目录
POSIX信号量
基于环形队列的生产消费模型
线程池
STL,智能指针和线程安全
线程安全的单例模式
什么是单例模式
什么是设计模式
单例模式的特点
饿汉实现方式和懒汉实现方式
饿汉方式实现单例模式
懒汉方式实现单例模式
其他常见的各种锁
自旋锁
读者写者问题
读写锁接口
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上述就是我信号量的基本操作,现在我们需要实现一个基于环形队列一个生产消费者模型,我们为什么不用上篇博客中的阻塞队列呢。因为阻塞队列我们将其看作一个整体只能进行首尾操作,不能访问队列中间内容。而环形队列我们使用的是vecotor数组进行模拟,可以使用下标进行访问中间内容。
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
当hend与tail指向同一块位置时,数组可能为满也可能为空,其余的时候可以进行并发访问数组。
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
ringqueue.hpp
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
using namespace std;
template<typename T>
class RingQueueu
{
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueueu(int cap)
:_cap(cap),_ring_queue(cap)
{
sem_init(&_room_sem,0,_cap);
sem_init(&_data_sem,0,0);
pthread_mutex_init(&_productor_mutex,nullptr);
pthread_mutex_init(&_consumer_mutex,nullptr);
}
void Enqueue(const T& in)
{
Lock(_productor_mutex);
P(_room_sem);
_ring_queue[_productor_step++] = in;
_productor_step %= _cap;
V(_data_sem);
Unlock(_productor_mutex);
}
void Pop(T* out)
{
Lock(_consumer_mutex);
P(_data_sem);
*out = _ring_queue[_consumer_step++];
_consumer_step %= _cap;
V(_room_sem);
Unlock(_consumer_mutex);
}
~RingQueueu()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
vector<T> _ring_queue;
int _cap;
int _productor_step = 0;
int _consumer_step = 0;
sem_t _room_sem;
sem_t _data_sem;
//加锁,维护多生产多消费
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
单生产单消费时可以不用加锁,而多生产多消费时就需要加锁防止同时访问临界资源。
而加锁解锁应该放在申请信号量的后面进行才是比较好的,为什么呢?因为申请信号量是预定机制是原子的,不会出现线程安全问题, 这样可以先预定再等锁,锁来了之后直接运行后面代码,可以提高效率。(等也是等,不如再等的时候申请信号量)
所以这样写更好:
void Enqueue(const T &in)
{
P(_room_sem);
Lock(_productor_mutex);
_ring_queue[_productor_step++] = in;
_productor_step %= _cap;
Unlock(_productor_mutex);
V(_data_sem);
}
void Pop(T *out)
{
P(_data_sem);
Lock(_consumer_mutex);
*out = _ring_queue[_consumer_step++];
_consumer_step %= _cap;
Unlock(_consumer_mutex);
V(_room_sem);
}
main.cc
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>
int a = 10;
using namespace ThreadModule;
using namespace std;
using ringqueue_t = RingQueueu<Task>;
void PrintHello()
{
cout << "hello" << endl;
}
void Consumer(ringqueue_t &rq)
{
while (true)
{
Task t;
rq.Pop(&t);
t();
}
}
void Productor(ringqueue_t &rq)
{
while (true)
{
sleep(1);
rq.Enqueue(Download);
}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads->emplace_back(func, rq, name);
//(*threads)[threads->size()-1].Start();
//threads->back().Start();
}
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Consumer);
}
void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Productor);
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> threads)
{
for(auto thread : threads)
{
thread.Join();
}
}
void StartAll(vector<Thread<ringqueue_t>>& threads)
{
for(auto& thread : threads)
{
thread.Start();
}
}
int main()
{
ringqueue_t *bq = new ringqueue_t(10);
std::vector<Thread<ringqueue_t>> threads;
InitConsumer(&threads, 1, *bq);
InitProductor(&threads, 1, *bq);
StartAll(threads);
WaitAllThread(threads);
return 0;
}
这里使用的也不是原生线程库,而是我们自己封装的thread。上述代码都是完整无误的,但是这里我们要说一个非常隐蔽的问题。我们先将Thread对象放入vector中,再使用StartAll()函数遍历了vector创建了线程,但是为什么我们要分开写,不直接使用threads->back().Start();这个写法呢?
因为线程的创建是跳转到Start函数中去,但是主线程还会继续循环进行,可能下一个Thread即将放入vector中导致vector最后一个内容发生变化,最终某个线程创建失败。这就是一个非常隐讳的并发问题。
线程池
线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
线程池:
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include <vector>
#include"Log.hpp"
using namespace std;
using namespace ThreadModule;
const static int defaultthreadnum = 3;
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeAll()
{
pthread_cond_broadcast(&_cond);
}
public:
ThreadPool(int threadnum = defaultthreadnum)
: _threadnum(threadnum)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()");
}
void InitThreadPool()
{
for (int num = 0; num < _threadnum; num++)
{
string name = "thread-" + to_string(num + 1);
// _threads.emplace_back(Print, name);
_threads.emplace_back(bind(&ThreadPool::HandlerTask, this, placeholders::_1), name);
LOG(INFO, "init thread %s done", name.c_str());
}
_isrunning = true;
}
void HandlerTask(string name)
{
LOG(INFO, " %s is running ...", name.c_str());
while (true)
{
LockQueue();
while (_task_queue.empty() && _isrunning)
{
waitnum++;
ThreadSleep();
waitnum--;
}
if(_task_queue.empty() && !_isrunning)
{
UnlockQueue();
break;
}
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
LOG(DEBUG, "%s get a task", name.c_str());
t();
LOG(DEBUG, "%s hander a task result: %s", name.c_str(), t.ResultToString().c_str());
}
}
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if (_isrunning)
{
_task_queue.push(t);
if (waitnum > 0)
{
ThreadWakeup();
}
LOG(DEBUG, "enqueue task success");
ret = true;
}
UnlockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeAll();
UnlockQueue();
}
void Start()
{
for (auto &thread : _threads)
{
thread.Start();
}
}
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
LOG(INFO, "%s is quit", thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
vector<Thread> _threads;
queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int waitnum = 0;
bool _isrunning = false;
};
我们创建一个日志Log.hpp
#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"
bool gIsSave = false;
const std::string logname = "log.txt";
// 1. 日志是由等级的
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
void SaveFile(const std::string &filename, const std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 2. 日志是由格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
std::string levelstr = LevelToString(level);
std::string timestr = GetTimeString();
pid_t selfid = getpid();
char buffer[1024];
va_list arg;
va_start(arg, format);
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
LockGuard lockguard(&lock);
// pthread_mutex_lock(&lock);
if (!issave)
{
std::cout << message;
}
else
{
SaveFile(logname, message);
}
// pthread_mutex_lock(&lock); // bug??
// std::cout << levelstr << " : " << timestr << " : " << filename << " : " << line << ":" << buffer << std::endl;
}
// C99新特性__VA_ARGS__
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
} while (0)
#define EnableFile() \
do \
{ \
gIsSave = true; \
} while (0)
#define EnableScreen() \
do \
{ \
gIsSave = false; \
} while (0)
其中日志中的锁是我们使用RALL思想管理的锁。
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex); // 构造加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#endif
同样我们使用的Thread是分装后的接口。
Main.cc
#include "ThreadPool.hpp"
#include <iostream>
#include <vector>
#include "Task.hpp"
#include <string>
#include "Log.hpp"
#include <unistd.h>
#include <memory>
#include <ctime>
using namespace std;
int main()
{
srand(time(nullptr) ^ getpid() ^ pthread_self());
EnableScreen();
// EnableFile();
unique_ptr<ThreadPool<Task>> tp = make_unique<ThreadPool<Task>>(5);
tp->InitThreadPool();
tp->Start();
int tasknum = 10;
while (tasknum)
{
int a = rand() % 10 + 1;
usleep(1234);
int b = rand() % 5 + 1;
Task t(a, b);
LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());
tp->Enqueue(t);
sleep(1);
tasknum--;
}
tp->Stop();
tp->Wait();
// LogMessage(DEBUG, "helloworld");
return 0;
}
以上是线程池的部分代码,线程池的思想与生产消费者模型相似,需要用vector来存放线程,使用queue来存放任务。使用智能指针来控制线程池。
STL,智能指针和线程安全
STL中的容器是否是线程安全的?
不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
线程安全的单例模式
什么是单例模式
单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
什么是设计模式
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
饿汉实现方式和懒汉实现方式
[洗完的例子]
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.
这两种方法在结果是相同的,但是在最开始使用时,饿汉模式是空间换时间,懒汉模式是时间换空间。
我们对上述的线程池代码进行修改让其也可以实现懒汉模式下的单例模式。
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include <vector>
#include "Log.hpp"
using namespace std;
using namespace ThreadModule;
const static int defaultthreadnum = 3;
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeAll()
{
pthread_cond_broadcast(&_cond);
}
ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()");
}
void InitThreadPool()
{
// 指向构建出所有的线程,并不启动
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
LOG(INFO, "init thread %s done", name.c_str());
}
_isrunning = true;
}
void Start()
{
for (auto &thread : _threads)
{
thread.Start();
}
}
void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!
{
LOG(INFO, "%s is running...", name.c_str());
while (true)
{
// 1. 保证队列安全
LockQueue();
// 2. 队列中不一定有数据
while (_task_queue.empty() && _isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
// 2.1 如果线程池已经退出了 && 任务队列是空的
if (_task_queue.empty() && !_isrunning)
{
UnlockQueue();
break;
}
// 2.2 如果线程池不退出 && 任务队列不是空的
// 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出
// 3. 一定有任务, 处理任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
LOG(DEBUG, "%s get a task", name.c_str());
// 4. 处理任务,这个任务属于线程独占的任务
t();
LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
}
}
// 复制拷贝禁用
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *GetInstance()
{
// 如果是多线程获取线程池对象下面的代码就有问题了!!
// 只有第一次会创建对象,后续都是获取
// 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
if (nullptr == _instance) // 保证第二次之后,所有线程,不用在加锁,直接返回_instance单例对象
{
LockGuard lockguard(&_lock);
if (nullptr == _instance)
{
_instance = new ThreadPool<T>();
_instance->InitThreadPool();
_instance->Start();
LOG(DEBUG, "创建线程池单例");
return _instance;
}
}
LOG(DEBUG, "获取线程池单例");
return _instance;
}
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if (_isrunning)
{
_task_queue.push(t);
if (_waitnum > 0)
{
ThreadWakeup();
}
LOG(DEBUG, "enqueue task success");
ret = true;
}
UnlockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeAll();
UnlockQueue();
}
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
LOG(INFO, "%s is quit", thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
vector<Thread> _threads;
queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitnum;
bool _isrunning = false;
static ThreadPool<T> *_instance;
static pthread_mutex_t _lock;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
我们一定要注意懒汉模式的线程安全问题,因为单例是在不创建对象的前提通过调用函数来实现的,函数是不可重入的所以在多线程并发访问时可能出现执行多次函数导致创建多个单例而违背单例初衷。所以我们要在函数中加锁来保护。
其他常见的各种锁
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
自旋锁:自旋锁(Spinlock)是一种用于多线程同步的锁机制,它与互斥锁(mutex)在某些方面相似,但有一个关键的区别:当一个线程尝试获取一个已经被其他线程持有的自旋锁时,该线程不会立即阻塞(即不会进入睡眠状态),而是在当前位置“自旋”,也就是循环等待,直到锁被释放。
自旋锁
我们从自旋锁的定义就可以看出自旋锁与互斥锁mutex唯一区别是线程是否需要阻塞等待,而这就取决于申请到锁的线程在临界区执行时长的问题。如果时间比较就我们就要使用mutex挂起等待,反之可以使用自旋锁spinlock一直去申请访问。
自旋锁的接口与互斥锁大相径庭,我们来学习一下:
使用时我们只需要定义一个pthread_spinlock_t对象即可。
无论是自旋锁还是互斥锁都需要我们程序员去判断然后使用,正确使用可以提高效率。反之无条件使用自旋锁可能会将CPU打满死机!!!
读者写者问题
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
什么是读者写者问题,这个与生产消费者模型有极大相似性。举个例子,我们写CSDN文章,出黑板报等待都是读者写者问题。写者将内容发送到一个临界区,读者只需要读即可。所以本质也是321原则。一个交易场所、两个角色:读者、写者,三种关系:读者VS读者、写者VS写者、读者VS写者。
其中读者与写者一定有互斥和同步的关系,写者与写者之间有互斥的关系。但是读者与读者之间却没有关系,这与生产消费者模型就有不同之处。因为消费者是需要将临界区的数据拿走,而读者只需要拷贝数据,不会讲数据拿走,这就导致读者之间没有关系。
这种关系自己使用加锁解锁逻辑是没问题的,下面有一段伪代码就是整体思路:
int reader_count = 0; //读者计数器
pthread_mutex_t wlock; //写者锁
pthread_mutex_t rlock; //读者锁
//读者
lock(&rlock);
if(reader_count == 0)
lock(&wlock);
++reader_count;
unlock(&rlock);
//读者读操作
lock(&rlock);
--reader_count;
if(reader_count == 0)
unlock(&wlock);
unlock(&rlock);
//写者
lock(&wlock);
//写者写操作
unlock(&wlock);
写者思路非常简单,只需要维护只有一个写者进入写即可。因为读者计数器也属于临界资源我们要加锁保护。当我们判断读者为0时证明是第一个读者进入,所以我们要申请写者锁防止写者进入,如果我们申请不到锁证明写者持有锁读者进入不了,这就做到了互斥与同步。后面当最后一个--后计数器为0证明是最后一个走的读者,里面没有读者了就可以释放写者锁了。
通过上述伪代码我们可以看出,其读者写者问题都是围绕读者优先实现的,所以当读者基数过大时肯定会导致写者饥饿问题。所以会出现读者优先(上述伪代码逻辑)与写者优先,写者优先的思路就是当写者到来时,我们会阻挡还未进入的读者,当已进入的读者全部出来时写者先进入!!!但是实现就有点复杂,这里我们不给予实现。
为了不这么使用,pthread库提供了读写锁,而上述问题就会在库中得到解决!
读写锁接口
设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写加锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//读写解锁都可以使用
下面代码是一段读写锁模型实例,可以参考怎么使用:
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void * reader(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void * writer(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const& vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend(); ++it) {
pthread_t const& tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
以上就是本次全部内容,感谢大家观看。