目录
一、什么是线程
1、线程的基本认识
2、Linux线程与接口关系的认识
3、创建线程
4、线程等待
5、线程终止
6、线程分离
二、线程的优点
三、线程的缺点
四、线程与进程的关系
1、线程安全与重入
2、不可重入情况
3、可重入情况
4、可重入与线程安全的联系
五、互斥锁
1、为什么要有互斥锁?
2、互斥锁的基本操作
3、互斥锁的底层实现原理
4、死锁
六、Linux线程同步
1、条件变量
2、同步概念与竞态条件
3、条件变量函数
七、生产者消费者模型
1、基于BlockingQueue的生产者消费者模型
2、基于POSIX信号量的循环队列生产消费模型
1、POSIX信号量概念及操作
2、循环队列
3、具体实现
八、线程池
1、什么是线程池?
2、线程池的应用场景
3、线程池的具体实现
4、单例模式实现线程池
九、其它常见锁
1、读写锁
1、读者写者模型
2、读者写者模型优先级
3、读写锁常见接口
2、自旋锁
1、自旋锁VS挂起等待锁
2、自旋锁常见接口
总结
一、什么是线程
1、线程的基本认识
一般而言线程:是在进程内部运行的一个执行分支(执行流),属于进程的一部分,粒度要比进程更加细和轻量化。
所谓的在进程的内部指的是线程在进程的地址空间中运行
执行分支:CPU调度时只看PCB,每一个PCB曾经被指派过指向方法和数据,CPU可以直接调度
Linux与Windows等系统不同,并没有直接的线程控制块,而是通过进程去模拟线程
只创建task_struct,共享同一个地址空间,当前进程的代码和数据被划分为若干份,让每一个PCB使用,这样不用维护复杂的进程和线程的关系,不用单独为线程设计任何算法,直接使用进程的一套相关方法,OS只需要聚焦在线程间的资源分配就可以了
创建进程的成本是非常高的,是从0到1创建进程
创建线程,并不需要创建新的mm_struct,页表,映射内存等等,只需创建task_struct就可以
进程:是承担分配系统资源的基本实体
线程:是CPU调度的基本单位,承担进程资源一部分的基本实体(进程划分资源给线程)
2、Linux线程与接口关系的认识
Linux因为是用进程模拟实现进程的,所以Linux下不会给我们提供直接操纵线程的接口,而是给我们提供了在同一个地址空间创建PCB的方法,分配资源给指定PCB,这种操作非常不友好,所以系统级别工程师,在用户层对Linux轻量级进程接口进行封装,给我们打包成库,让用户直接使用库接口——原生线程库(用户层)
所以我们后面使用的是原生线程库,所以需要我们使用动态库链接方式
3、创建线程
创建线程可以使用pthread_create接口,第一个参数是线程id,它是一个pthread_t类型的指针,pthread_t是无符号整型,第二个参数是线程属性,一般直接给nullptr,我们一般并不会修改线程的属性,第三个参数是函数指针,返回值void* 参数void*,最后一个参数是传入函数的参数
#include <iostream>
#include <unistd.h>
#include <pthread.h>
size_t count = 0;
void *thread_run(void *args)
{
const char *id = reinterpret_cast<const char *>(args);
std::cout << "我是 " << id << " 号线程, 我的pid是 " << getpid() << std::endl;
sleep(5);
return reinterpret_cast<void *>(const_cast<char *>("Hello World"));
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, reinterpret_cast<void *>(const_cast<char *>("thread 1")));
while (true)
{
std::cout << "我是主线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
count++;
if (count == 7)
{
break;
}
}
return 0;
}
在编译时要指定库
thread:test.cpp
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f thread
在进程运行时,我们查看OS中线程
LWP是内核中的线程id,我们仔细观察发现thread这两个线程的PID都是相同的,LWP是不同的,同时主线程的PID == LWP,子线程的PID ≠ LWP
这说明此时依旧是只有一个进程,但是进程内部一定具有两个执行流
4、线程等待
一般而言,线程也是需要被等待的,如果不等待,可能会导致类似于“僵尸进程”的问题
第一个参数是线程id,第二个参数是输出型参数,返回进程等待的结果
注意第二个参数接收的不能是临时变量(对象)等的地址,会出现野指针的问题
类比进程,一个线程跑完有三种现象
1、代码跑完,结果对
2、代码跑完,结果不对
3、代码没有跑完,线程异常
pthread_join不能够处理线程异常也并没有必要,它只需要关心跑完,结果对还是不对,代码异常整个进程就挂了,并且线程等待不能够像waitpid一样可以同时等待所有子进程,只能按照一定的顺序一个一个等待
#include <iostream>
#include <unistd.h>
#include <pthread.h>
size_t count = 0;
void *thread_run(void *args)
{
const char *id = reinterpret_cast<const char *>(args);
std::cout << "我是 " << id << " 号线程, 我的pid是 " << getpid() << std::endl;
sleep(5);
return reinterpret_cast<void *>(const_cast<char *>("Hello World"));
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, reinterpret_cast<void *>(const_cast<char *>("thread 1")));
while (true)
{
std::cout << "我是主线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
count++;
if (count == 7)
{
break;
}
}
void *ret = nullptr;
pthread_join(tid, &ret);
std::cout << reinterpret_cast<char *>(ret) << std::endl;
return 0;
}
5、线程终止
还是类比”进程“,退出进程的方法
1、main函数返回退出码
2、直接调用exit
线程终止也是类似
1、将线程函数执行完
2、调用pthread_exit
#include <iostream>
#include <unistd.h>
#include <pthread.h>
size_t count = 0;
void *thread_run(void *args)
{
const char *id = reinterpret_cast<const char *>(args);
while (true)
{
std::cout << "我是 " << id << " 号线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
if (count == 5)
{
pthread_exit(reinterpret_cast<void *>(const_cast<char *>("World")));
}
}
return reinterpret_cast<void *>(const_cast<char *>("Hello World"));
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, reinterpret_cast<void *>(const_cast<char *>("thread 1")));
while (true)
{
std::cout << "我是主线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
count++;
if (count == 7)
{
break;
}
}
void *ret = nullptr;
pthread_join(tid, &ret);
std::cout << reinterpret_cast<char *>(ret) << std::endl;
return 0;
}
3、调用pthread_cancel
pthread_cancel可以是主线程取消子线程,也可以是子线程取消主线程,但是一般而言都是主线程取消子线程
#include <iostream>
#include <unistd.h>
#include <pthread.h>
size_t count = 0;
void *thread_run(void *args)
{
const char *id = reinterpret_cast<const char *>(args);
while (true)
{
std::cout << "我是 " << id << " 号线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
if (count == 5)
{
pthread_exit(reinterpret_cast<void *>(const_cast<char *>("World")));
}
}
return reinterpret_cast<void *>(const_cast<char *>("Hello World"));
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, reinterpret_cast<void *>(const_cast<char *>("thread 1")));
while (true)
{
std::cout << "我是主线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
count++;
if (count == 4)
{
pthread_cancel(tid);
}
if (count == 7)
{
break;
}
}
return 0;
}
一般情况都不推荐使用pthread_cancel来终止线程,最好是采用pthread_exit
6、线程分离
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
size_t count = 0;
void *thread_run(void *args)
{
const char *id = reinterpret_cast<const char *>(args);
std::cout << "我是 " << id << " 号线程, 我的pid是 " << getpid() << std::endl;
sleep(5);
return reinterpret_cast<void *>(const_cast<char *>("Hello World"));
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, reinterpret_cast<void *>(const_cast<char *>("thread 1")));
pthread_detach(tid);
std::cout << "我是主线程, 我的pid是 " << getpid() << std::endl;
sleep(1);
return 0;
}
要想要创建线程,一定是要先创建进程
二、线程的优点
三、线程的缺点
四、线程与进程的关系
1、线程安全与重入
2、不可重入情况
3、可重入情况
4、可重入与线程安全的联系
五、互斥锁
1、为什么要有互斥锁?
我们先写一个简单的买票的例子,创建5个线程,让这5个线程去抢票
#include <iostream>
#include <unistd.h>
#include <pthread.h>
int tickets = 1000;
#define THREAD_NUM 5
void *route(void *arg)
{
int id = *reinterpret_cast<int *>(arg);
delete arg;
while (true)
{
if (tickets > 0)
{
usleep(1000);
std::cout << id << " sell ticket: " << tickets << std::endl;
tickets--;
}
else
{
break;
}
}
}
int main()
{
pthread_t tid[THREAD_NUM];
for (size_t i = 0; i < THREAD_NUM; i++)
{
int *id = new int(i);
pthread_create(&tid[i], nullptr, route, id);
}
for (size_t i = 0; i < THREAD_NUM; i++)
{
pthread_join(tid[i], nullptr);
}
return 0;
}
这里就会出现没有访问控制而导致的问题
我们发现最后票竟然被抢成了负数,这是不可能的情况,但是现在发生了,这是因为多线程没有访问控制的原因,要想解决这个问题,采用互斥锁
2、互斥锁的基本操作
这是对锁进行操作的一系列函数,初始化,销毁
所谓的互斥锁其实就是一个pthread_mutex_t类型的变量
lock和unlock是用在临界区的,实现了互斥
这里锁的创建和销毁很容易遗忘,可以使用RAII来赋值我们
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#define THREAD_NUM 5
class Ticket
{
private:
int _tickets;
pthread_mutex_t _mtx;
public:
Ticket()
: _tickets(1000)
{
pthread_mutex_init(&_mtx, nullptr);
}
bool GetTickets()
{
bool res = true;
// 在临界区上锁
pthread_mutex_lock(&_mtx);
if (_tickets > 0)
{
usleep(1000);
std::cout << pthread_self() << " sell ticket: " << _tickets << std::endl;
_tickets--;
}
else
{
res = false;
}
pthread_mutex_unlock(&_mtx);
return res;
}
~Ticket()
{
pthread_mutex_destroy(&_mtx);
}
};
void *ThreadRoutine(void *args)
{
Ticket *t = reinterpret_cast<Ticket *>(args);
while (true)
{
if (!t->GetTickets())
{
break;
}
}
}
int main()
{
Ticket *t = new Ticket();
pthread_t tid[THREAD_NUM];
for (size_t i = 0; i < THREAD_NUM; i++)
{
pthread_create(&tid[i], nullptr, ThreadRoutine, reinterpret_cast<void *>(t));
}
for (size_t i = 0; i < THREAD_NUM; i++)
{
pthread_join(tid[i], nullptr);
}
return 0;
}
这样就没有将票减到负数
3、互斥锁的底层实现原理
我要访问临界资源的时候,需要先访问互斥锁,前提是所有的线程都能够看到它,锁本身也是临界资源,保证所本身是安全的原理:lock和unlock是原子的
代码是原子的:编译出的汇编代码只有一行
最一开始内存中的mutex变量初始化为1,假如现在有两个线程A,B
A,B两个线程的上下文中保存CPU寄存器的数据,线程将其置为0
假如A线程先申请锁,B线程后申请锁
因为xchgb命令是完成内存和CPU寄存器数据的交换,它是原子的
一开始A线程寄存器内部是0,和内存中的mutex交换,A进程上下文中的寄存器数据变成1,就申请成功锁了,B线程也来申请,也要交换数据,因为内存中mutex是0交换之后B进程上下文中的寄存器数据还是0,就被操作系统挂起,等待A进程解锁,A进程执行完毕后,会再次跳转回之前的命令,再次执行一遍前面lock的命令,自然就将mutex的1又换了回去,这就叫做解锁,然后B再次尝试申请,成功
4、死锁
六、Linux线程同步
1、条件变量
2、同步概念与竞态条件
3、条件变量函数
首先是创建条件变量,它是pthread_cond_t类型与前面多线程的类型相似
第一个参数是条件变量的id,第二个参数也是属性,设为空
接下来是pthread_cond_wait函数
他主要是用来在某个条件变量下等待
它具有以下特点
1、调用时,首先自动释放锁,然后挂起自己
2、返回的时候,会自动竞争锁,获取到锁之后才会返回
pthread_cond_signal是唤醒在条件变量下等待的下一个线程
所谓的下一个线程是pthread_cond_t里面的队列中的第一个线程
pthread_cond_broadcast是唤醒所有线程
下面是一个简单的例子:
创建一个老板线程以及几个工人线程,让老板线程去唤醒工人线程去工作
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
const int workNum = 3;
pthread_mutex_t mtx;
pthread_cond_t cod;
void* ctrl(void* args)
{
std::string name = reinterpret_cast<char*>(args);
while(true)
{
std::cout << "master say : begin work" << std::endl;
pthread_cond_signal(&cod);
sleep(1);
}
}
void* work(void* args)
{
int id = *reinterpret_cast<int*>(args);
delete reinterpret_cast<int*>(args);
while(true)
{
pthread_cond_wait(&cod, &mtx);
std::cout << "worker: " << id << " is working ……" << std::endl;
}
}
int main()
{
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cod, nullptr);
pthread_t master;
pthread_t worker[workNum];
pthread_create(&master, nullptr, ctrl, reinterpret_cast<void*>(const_cast<char*>("Boss")));
for(size_t i = 0; i < workNum; i++)
{
int *id = new int(i + 1);
pthread_create(&worker[i], nullptr, work, id);
}
pthread_join(master, nullptr);
for(size_t i = 0; i < workNum; i++)
{
pthread_join(worker[i], nullptr);
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cod);
return 0;
}
七、生产者消费者模型
这里的生产者消费者模型,类似于现实生活中的供货商超市消费者之间的关系
超市就相当于一块缓冲区,减少交易成本,提高效率
并且将生产环节和消费环境进行了解耦合
消费者 VS 消费者 竞争 互斥
生产者 VS 生产者 竞争 互斥
消费者 VS 生产者 互斥 同步
这符合所谓的“321”原则
3种关系
2种角色
1个场所
这个模型
只有生产者知道,消费者什么时候可以消费
只有消费者知道,生产者应该什么时候生产
1、基于BlockingQueue的生产者消费者模型
下面的代码是利用阻塞队列来实现生产者消费者模型
其中push是生产函数,是用来生产数据的
pop是消费函数
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <typename T>
class BlockQueue
{
private:
size_t _capacity;//阻塞队列容量
pthread_mutex_t mtx;//互斥锁,避免同时访问阻塞队列
pthread_cond_t isFull;//条件变量,阻塞队列满的时候
pthread_cond_t isEmpty;//空的时候
std::queue<T> _que;//队列
private:
bool IsFull() const
{
return _que.size() == _capacity;
}
bool IsEmpty() const
{
return _que.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&mtx);
}
void UnLockQueue()
{
pthread_mutex_unlock(&mtx);
}
void ProducterWait()
{
pthread_cond_wait(&isEmpty, &mtx);
}
void ConsumerWait()
{
pthread_cond_wait(&isFull, &mtx);
}
void WakeUpConsumer()
{
pthread_cond_signal(&isFull);
}
void WakeUpProducter()
{
pthread_cond_signal(&isEmpty);
}
public:
BlockQueue(const size_t capacity = 3)
: _capacity(capacity)
{
//初始化锁及条件变量
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&isFull, nullptr);
pthread_cond_init(&isEmpty, nullptr);
}
~BlockQueue()
{
//销毁锁及条件变量
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&isFull);
pthread_cond_destroy(&isEmpty);
}
// 生产函数
void Push(const T &data)
{
// 临界区,加锁
LockQueue();
//当阻塞队列满的时候,阻塞停止生产
while (IsFull())
{
ProducterWait();
}
_que.emplace(data);
//解锁,并且唤醒消费者
UnLockQueue();
WakeUpConsumer();
}
// 消费函数
void Pop(T *data)
{
// 临界区
LockQueue();
while (IsEmpty())
{
ConsumerWait();
}
*data = _que.front();
_que.pop();
UnLockQueue();
WakeUpProducter();
}
};
//main.cpp
#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
//消费函数
void *consumer(void *args)
{
BlockQueue<int> *pbq = reinterpret_cast<BlockQueue<int> *>(args);
while (true)
{
//sleep(2);
int data = 0;
pbq->Pop(&data);
std::cout << "消费了一个数据 : " << data << std::endl;
}
}
//生产函数
void *producter(void *args)
{
BlockQueue<int> *pbq = reinterpret_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);
int data = rand() % 20 + 1;
std::cout << "生产者生产数据: " << data << std::endl;
pbq->Push(data);
}
}
int main()
{
//随机数种子
srand((long long)time(nullptr));
pthread_t c;//消费者线程
pthread_t p;//生产者线程
BlockQueue<int> bq;
//为了每一个线程看到同一个阻塞队列
//将阻塞队列发送给每一个线程
pthread_create(&c, nullptr, consumer, reinterpret_cast<void *>(&bq));
pthread_create(&p, nullptr, producter, reinterpret_cast<void *>(&bq));
//回收等待子线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
其中,我们也可以让生产者"生产"“任务”,让消费者去执行
#pragma once
#include <iostream>
class Task
{
public:
Task(int x = 0, int y = 0, char op = '+')
: _x(x), _y(y), _op(op)
{
}
int Run()
{
int ans = 0;
switch (_op)
{
case '+':
ans = _x + _y;
break;
case '-':
ans = _x - _y;
break;
case '*':
ans = _x * _y;
break;
case '/':
ans = _x / _y;
break;
case '%':
ans = _x % _y;
break;
default:
std::cout << "Bug" << std::endl;
break;
}
std::cout << "执行了一个任务: " << _x << " " << _op << " " << _y << " = " << ans << std::endl;
return ans;
}
int operator()()
{
return Run();
}
private:
int _x;
int _y;
char _op;
};
2、基于POSIX信号量的循环队列生产消费模型
1、POSIX信号量概念及操作
信号量的本质是一个计数器,用来描述临界资源中资源数目的大小
申请信号量的本质是,对于临界资源的预订
将临界资源划分为一个一个小资源让多个线程同时访问临界资源的不同区域从而实现并发
初始化信号量,其中pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
等待信号量,会将信号量的值减1,这就是所谓的P操作
发布信号量,表示资源使用完毕,可以归还资源,将信号量值加1,这是所谓的V操作
2、循环队列
循环队列是一种线性数据结构,其操作表现基于 FIFO(先进先出)原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。
循环队列的一个好处是我们可以利用这个队列之前用过的空间。在一个普通队列里,一旦一个队列满了,我们就不能插入下一个元素,即使在队列前面仍有空间。但是使用循环队列,我们能使用这些空间去存储新的值。
什么时候循环队列为空?
开始的时候,头指针和尾指针在同一个位置
什么时候循环队列为满?
有两种实现方法
1、镂空一个位置,当尾指针的下一个是头指针的时候,队列已满
2、使用计数器,分别记录未使用空间及使用空间,这里计数器的思想与下面使用信号量去实现循环队列有异曲同工之妙
622. 设计循环队列 - 力扣(LeetCode)
代码:
typedef struct {
int k;
int* a;
int head;
int tail;
} MyCircularQueue;
MyCircularQueue* myCircularQueueCreate(int k) {
MyCircularQueue* obj = (MyCircularQueue*)malloc(sizeof(MyCircularQueue));
obj->k = k;
obj->head = 0;
obj->tail = 0;
obj->a = (int*)malloc(sizeof(int)*(k+1));
return obj;
}
bool myCircularQueueIsEmpty(MyCircularQueue* obj) {
return obj->head == obj->tail;
}
bool myCircularQueueIsFull(MyCircularQueue* obj) {
int next = obj->tail + 1;
if(next == obj->k + 1)
{
next = 0;
}
return next == obj->head;
}
bool myCircularQueueEnQueue(MyCircularQueue* obj, int value) {
if(myCircularQueueIsFull(obj))
{
return false;
}
obj->a[obj->tail] = value;
obj->tail++;
if(obj->tail ==obj->k + 1)
{
obj->tail = 0;
}
return true;
}
bool myCircularQueueDeQueue(MyCircularQueue* obj) {
if(myCircularQueueIsEmpty(obj))
{
return false;
}
obj->head++;
if(obj->head == obj->k + 1)
{
obj->head = 0;
}
return true;
}
int myCircularQueueFront(MyCircularQueue* obj) {
if(myCircularQueueIsEmpty(obj))
{
return -1;
}
return obj->a[obj->head];
}
int myCircularQueueRear(MyCircularQueue* obj) {
if(myCircularQueueIsEmpty(obj))
{
return -1;
}
int prev = obj->tail-1;
if(prev == -1)
{
prev = obj->k;
}
return obj->a[prev];
}
void myCircularQueueFree(MyCircularQueue* obj) {
free(obj->a);
obj->a = NULL;
obj->k = 0;
obj->head = 0;
obj->tail = 0;
free(obj);
}
3、具体实现
在这里的场景就是,消费者从队列里面拿数据,生产者往队列里面放数据
循环队列就是临界资源,生产者和消费者开始的时候,指向的就是同一个位置,也就是队列为空的时候
生产者和消费者在队列满的时候也指向同一个位置
当队列不为空,不为满的时候,生产者和消费者指向的不是同一个位置,那么就可以并发的执行生产和消费了
生产者最关心的是:循环队列空的位置
消费者最关心的是:循环队列已放数据的位置
那么这里就有一些规则
1、生产者不能把消费者扣一个圈
2、消费者拿数据不能超过生产者
3、当指向同一个位置的时候,要根据空,满的状态,来判断让谁先执行
其他:除此之外,生产和消费可以并发执行
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
template <class T>
class RingQueue
{
private:
pthread_mutex_t _cMux;
pthread_mutex_t _pMux;
sem_t _blank; // 信号量,剩余队列位置
sem_t _data; // 信号量,存放数据位置
size_t _capacity; // 容量
size_t _cStep;
size_t _pStep;
std::vector<T> _que; // 使用数组来模拟循环队列
public:
RingQueue(int capacity)
: _capacity(capacity), _que(capacity, 0), _cStep(0), _pStep(0)
{
sem_init(&_blank, 0, _capacity);
sem_init(&_data, 0, 0);
pthread_mutex_init(&_cMux, nullptr);
pthread_mutex_init(&_pMux, nullptr);
}
~RingQueue()
{
sem_destroy(&_blank);
sem_destroy(&_data);
pthread_mutex_destroy(&_cMux);
pthread_mutex_destroy(&_pMux);
}
void Push(const T &in)
{
sem_wait(&_blank);
pthread_mutex_lock(&_pMux);
_que[_pStep] = in;
_pStep++;
_pStep %= _capacity;
pthread_mutex_unlock(&_pMux);
sem_post(&_data);
}
void Pop(T *out)
{
sem_wait(&_data);
pthread_mutex_lock(&_cMux);
*out = _que[_cStep];
_cStep++;
_cStep %= _capacity;
pthread_mutex_unlock(&_cMux);
sem_post(&_blank);
}
};
//main.cpp
#include "RingQueue.hpp"
#include "task.hpp"
#include <string>
#include <ctime>
#include <unistd.h>
const int ConsumerNum = 3;
void *consumer(void* args)
{
RingQueue<Task>* prq = reinterpret_cast<RingQueue<Task>*>(args);
while (true)
{
Task t;
prq->Pop(&t);
t();
//sleep(1);
}
}
void *producter(void* args)
{
std::string task = "+-*/%";
RingQueue<Task>* prq = reinterpret_cast<RingQueue<Task>*>(args);
while (true)
{
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
char op = task[rand() % 5];
Task t(x, y, op);
prq->Push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
pthread_t p;
pthread_t c[ConsumerNum];
RingQueue<Task> rq(10);
pthread_create(&p, nullptr, producter, reinterpret_cast<void*>(&rq));
for(size_t i = 0; i < ConsumerNum; i++)
{
pthread_create(&c[i], nullptr, consumer, reinterpret_cast<void*>(&rq));
}
pthread_join(p, nullptr);
for(size_t i = 0; i < ConsumerNum; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}
//task.hpp
#pragma once
#include <iostream>
class Task
{
public:
Task(int x = 0, int y = 0, char op = '+')
: _x(x), _y(y), _op(op)
{
}
int Run()
{
int ans = 0;
switch (_op)
{
case '+':
ans = _x + _y;
break;
case '-':
ans = _x - _y;
break;
case '*':
ans = _x * _y;
break;
case '/':
ans = _x / _y;
break;
case '%':
ans = _x % _y;
break;
default:
std::cout << "Bug" << std::endl;
break;
}
std::cout << "执行了一个任务: " << _x << " " << _op << " " << _y << " = " << ans << std::endl;
return ans;
}
int operator()()
{
return Run();
}
private:
int _x;
int _y;
char _op;
};
八、线程池
1、什么是线程池?
2、线程池的应用场景
3、线程池的具体实现
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class ThreadPoll
{
private:
size_t _capacity;//线程池中线程的数量
pthread_mutex_t _mtx;//互斥锁,保护任务
pthread_cond_t _cond;//条件变量,防止在线程池为空的情况下取数据
std::queue<T> _taskPoll;//线程池,使用队列来实现
public:
ThreadPoll(size_t capacity = 3)
: _capacity(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPoll()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
void InitThreadPoll()
{
pthread_t tid;
for (size_t i = 0; i < _capacity; i++)
{
pthread_create(&tid, nullptr, Routine, reinterpret_cast<void *>(this));
}
}
void PushTask(const T &in)
{
Lock();
_taskPoll.emplace(in);
Unlock();
WakeUp();
}
private:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
bool IsEmpty()
{
return _taskPoll.empty();
}
void PopTask(T *out)
{
*out = _taskPoll.front();
_taskPoll.pop();
}
static void *Routine(void *args)
{
pthread_detach(pthread_self());//线程分离
ThreadPoll<T> *ptp = reinterpret_cast<ThreadPoll<T> *>(args);
//因为非静态成员函数this指针的问题,改用static加手动传入this指针的策略
while (true)
{
ptp->Lock();
while (ptp->IsEmpty())
{
ptp->Wait();
}
T t;
ptp->PopTask(&t);
ptp->Unlock();
t(); // 执行任务
}
}
};
测试代码
#include "ThreadPoll.hpp"
#include "task.hpp"
#include <string>
#include <ctime>
#include <unistd.h>
int main()
{
srand((long long)time(nullptr));
const std::string ops("+-*/%");
ThreadPoll<Task> tp(3);
tp.InitThreadPoll();
while (true)
{
//Task t(rand() % 20 + 1, rand() % 10 + 1, "+-*/%"[rand() % 5]);
tp.PushTask(Task(rand() % 20 + 1, rand() % 10 + 1, ops[rand() % ops.size()]));
sleep(1);
}
return 0;
}
4、单例模式实现线程池
我们使用单例模式的懒汉模式来实现,在上述代码的基础上,改写为单例模式线程池
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class ThreadPoll
{
typedef ThreadPoll<T> self;
private:
size_t _capacity;
pthread_mutex_t _mtx;
pthread_cond_t _cond;
std::queue<T> _taskPoll;
static self *ins;
private:
ThreadPoll(size_t capacity = 3)
: _capacity(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPoll(const self &tp) = delete;
self &operator=(const self &tp) = delete;
void InitThreadPoll()
{
pthread_t tid;
for (size_t i = 0; i < _capacity; i++)
{
pthread_create(&tid, nullptr, Routine, reinterpret_cast<void *>(this));
}
}
public:
static self *GetInstance()
{
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
if (ins == nullptr)
{
pthread_mutex_lock(&lock);
if (ins == nullptr)
{
ins = new self();
ins->InitThreadPoll();
std::cout << "线程池首次启动" << std::endl;
}
pthread_mutex_unlock(&lock);
}
return ins;
}
~ThreadPoll()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
void PushTask(const T &in)
{
Lock();
_taskPoll.emplace(in);
Unlock();
WakeUp();
}
private:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
bool IsEmpty()
{
return _taskPoll.empty();
}
void PopTask(T *out)
{
*out = _taskPoll.front();
_taskPoll.pop();
}
static void *Routine(void *args)
{
pthread_detach(pthread_self());
ThreadPoll<T> *ptp = reinterpret_cast<ThreadPoll<T> *>(args);
while (true)
{
ptp->Lock();
while (ptp->IsEmpty())
{
ptp->Wait();
}
T t;
ptp->PopTask(&t);
ptp->Unlock();
t(); // 执行任务
}
}
};
//静态变量在类外初始化
template <class T>
ThreadPoll<T> *ThreadPoll<T>::ins = nullptr;
测试代码
#include "ThreadPoll.hpp"
#include "task.hpp"
#include <ctime>
#include <unistd.h>
int main()
{
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
std::cout << "当前正在运行我的进程其他代码..." << std::endl;
sleep(5);
ThreadPoll<Task>* pt = ThreadPoll<Task>::GetInstance();
srand((long long)time(nullptr));
while (true)
{
sleep(1);
Task t(rand() % 20 + 1, rand() % 10 + 1, "+-*/%"[rand() % 5]);
pt->PushTask(t);
}
return 0;
}
九、其它常见锁
1、读写锁
1、读者写者模型
1、对数据大部分是读取,少部分操作是写入
2、判断的依据是,进行数据读取的一端是否会将数据取走,如果不会取走,那么就适用于读者写者模型
读者写者模型也有所谓的321原则
3种关系
读者 VS 读者
写者 VS 写者
读者 VS 写者
写者和写者之间的关系是互斥的,同一块资源并不允许让不同线程同时写入
读者和写者之间的关系是互斥、同步的,写者没有写完读者读取到的数据是不完整的,可能会出现问题,同时写者写完却没有读者去读取也是不合理的
读者和读者之间的关系是:没有关系,因为在这个模型里面,读者它只是读取数据,并不会将数据取走,所以读者之间是没有关系的,如果读者会取走数据,那么读者之间就是互斥关系,这就变成了生产者消费者模型
读者写者模型也会有一个场所:一段缓冲区或者其它STL容器
读者写者模型的本质:使用锁来维护上述的三种关系
2、读者写者模型优先级
读者优先:当读者和写者同时到来的时候,让读者先进入临界资源访问
写者优先:当读者和写者同时到来的时候,让写者之后的所有的读者都不要进入临界区访问,等临界区中没有读者之后,让写者进入临界资源访问
3、读写锁常见接口
初始化和销毁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
2、自旋锁
1、自旋锁VS挂起等待锁
挂起等待的锁:当锁资源被占用时,线程将会进入挂起状态
自旋锁:当锁资源被占用时,线程不会进入挂起等待状态,不断的循环检测锁的状态
它们两个的唯一区别是,在面对锁资源被占用时的对策不同
线程是不知道自己在临界资源中等待的时间长短
但是我们自己知道,我们可以根据不同的情况自行选择锁的种类
2、自旋锁常见接口
初始化和销毁
加锁
解锁
总结
以上就是今天要讲的内容,本文仅仅简单介绍了多线程的相关概念