线程
线程的概念
在官方书籍对于线程的概念:
1.在进程内部的执行流
2.线程比进程粒度更细,调度成本更低。
3.线程是CPU调度的最小单位。
线程(tcb):进程(pcb) = n:1
进程和线程在执行流层面是不一样的
fork之后,父子进程共享代码,可以用过选择语句,来让父子进程执行不同的代码块。
不同的执行流就可以完成对特定资源的划分。
Linux认为:没有进程,线程在概念上的区分,只有一个叫做: 执行流!
Linux的线程是由进程模拟出来的。
基本示意图:
CPU看到的所有的task_struct 都是一个进程
CPU看到的所有的task_struct 都是一个执行流(线程)
Linux上没有真正意义上的线程,而是用进程task_struct 模拟实现Linux下的进程 <=其他操作系统上的进程概念
CPU看到的虽说不是PCB但是也比传统意义上的进程要轻量化了。
进程 = 内核数据结构 + 进程对应的代码 和 数据
进程 = 内核视角 :承担分配系统资源的基本实体(进程的基座属性)
进程是向系统资源的基本单位。
内部只有一个执行流的进程 – 单执行流进程
内部有多个执行流的进程 – 多执行流进程
线程是调度的基本单位。
关于Linux的线程库
Linux上有一个已经封装好了的第三方线程库 – POSIX库
使用时要包含头文件 <pthread.h>
由于时第三方动态库,所以要使用 -lpthread 链接动态库
创建一个新的线程 pthread_create
函数:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
函数:pthread_jion
int pthread_join(pthread_t thread, void **retval);
参数:
thread:返回线程的id
retval:默认写nullptr
返回值:成功返回0,失败返回错误码。
示例:
#include <iostream>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
using namespace std;
void *callback1(void* args)
{
string name = (char*) args;
while(true)
{
cout<< name <<": "<<getpid()<<endl;
sleep(1);
}
}
void* callback2(void* args)
{
string name = (char*) args;
while(true)
{
cout<< name <<": "<<getpid()<<endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1,nullptr,callback1,(void *)"thread 1");
pthread_create(&tid2,nullptr,callback2,(void*)"thread 2");
while(true)
{
cout<< "i am a main tcb..." << getpid() << endl;
sleep(1);
}
pthread_join(tid1,nullptr);//加入线程
pthread_join(tid2,nullptr);
return 0;
}
运行截图:
查看线程被的指令是:ps -aL
运行c++提供的thread库
#include <iostream>
// #include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <thread>
using namespace std;
void *callback1(void* args)
{
string name = (char*) args;
while(true)
{
cout<< name <<": "<<getpid()<<endl;
sleep(1);
}
}
void* callback2(void* args)
{
string name = (char*) args;
while(true)
{
cout<< name <<": "<<getpid()<<endl;
sleep(1);
}
}
int main()
{
thread t(()[]{
while(true)
{
cout << "thread is running "<< endl;
sleep(1);
}
});
t.join();
return 0;
}
补:进程与信号
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstdlib>
#include <signal.h>
#include <assert.h>
using namespace std;
void FreeChld(int signo)
{
assert(signo == SIGCHLD);
while(true)
{
//waitpid 没有子进程的时候会调用失败
//-1表示等待任意一个子进程
pid_t id = waitpid(-1,NULL,WNOHANG);
if(id > 0)
{
cout << "父进程等待成功" << endl;
}
else if(id == 0)
{
//还有子进程,但是没有退出
cout << "还有子进程,但是没有退出,父进程要自己去忙了" << endl;
break;
}
else{
cout << "父进程等待所有的子进程结束"<< endl;
break;
}
}
}
int main()
{
//signal(SIGCHLD,FreeChld);
//子进程退出的时候,默认的信号处理是忽略吗?
//调用signal/sigaction SIG_IGN,意义在哪里?
//SIG_ING手动设置,让子进程退出,不要让父进程发送信号了,并且自动释放。
signal(SIGCHLD,SIG_IGN);
for(int i = 0;i<10;i++)
{
pid_t id = fork();
if(id == 0){
//子进程
int cnt = 10;
while(cnt)
{
cout << "我是子进程,pid: "<<getpid() << "当前的cnt: "<<cnt--<<endl;
sleep(1);
}
cout<<"子进程退出,进入僵尸状态" <<endl;
exit(0);
}
sleep(1);
}
while(true)
{
cout<<"我是父进程,我正在运行: "<<getpid()<<endl;
sleep(1);
}
//父进程需要自己主动等待
// if(waitpid(id,nullptr,0) > 0)
// {
// cout << "父进程等待子进程成功" <<endl;
// }
return 0;
}
线程控制
如何证明?
char* msg = "hello word";
*msg = "hello";
//这种代码式错误的,因为字符常量不可被修改
图中的RWX是文件的用户权限,U/K是区分是内核还是用户级。
解释页表问题
如果页表就像之前的概念图一样只有一张,那么有232个条目,内存大概是232 * 8 = 32G.(32位平台下)。这显然是不合理的。
所以,一般是二级页表。
二级页表 = 页目录 + 页表。
OS通过页表链接 虚拟地址和物理地址,虚拟地址在被转化的过程中,不是直接转换的!
举例 :虚拟地址为 : 0101 0101 00 0100 1100 00 1110 0101 1001
xxxx xxxx xx yyyy yyyy yy zzzz zzzz zzzz
页目录左半部分是虚拟地址的前十位 右半部分是 页表 即先找到对应的page
页表:左半部分是虚拟地址的中间10位,右半部分是page的起始地址。
而虚拟地址的后12位是page之后的页内偏移量。
所以只需要210个页表,212就能覆盖页内的所有地址。
IO的基本单位是块(4kb)
虚拟地址编译,也划分好了4kb,物理内存中有也跨国,每一份为页帧。
通过先描述再组织的原则,OS中有 struct page 来对page进行管理
成员:struct page mem[1024*1024] --> 对内存的管理,就变成了对数组的增删查改
页表的创建需不需要创建内存?
页表分离了,可以实现页表的__按需创建__
所需的页表数量 = 232/212;
硬件MMU
软(页表)硬件 (MMU)结合的方式完成页表
这样做的好处
1.进程虚拟地址管理和内存管理,通过页表 + page进行解耦。
2.机制分页 + 按需创建页表 = 节省空间。
线程控制的相关函数
创建一个新的线程
原型:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(start_routine)(void), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
错误检查
1.pthread函数出错时,不会设置全局变量errno。而是将错误代码通过返回值返回。
2.pthread提供了线程内的errno变量,以支持其他使用errno的代码,对于pthread函数的错误,可以通过返回值判定,返回值读取比读取线程内的errno开销更小。
线程id及进程地址空间布局
1.phread_create函数会创建产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面的线程ID不一样。
2.线程ID是属于进程调度的范畴。线程是轻量级进程,是操作系统调度器的最小单位,所以要唯一个数值来表示该线程。
3.pthread_create函数第一个参数指向的是一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。
4.线程库NPTL提供了pthread_self()函数,可以获得线程自身ID
函数原型:pthread_t pthread_self(void);
线程终止
终止某个线程而不是进程,有三种方式:
1.从线程函数return。这种方式不适用于从main函数return相当于exit
2.线程可以调用 pthread_exit()终止自己
3.一个线程可以调用pthread_cancel() 终止统一进程的另一个线程。
pthread_exit()函数
功能:线程终止
原型:void pthread_exit(void *value_ptr);
参数:value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值
需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
pthread_cancel函数
功能:取消一个执行中的线程
原型:int pthread_cancel(pthread_t thread);
参数:thread:线程ID
返回值:返回成功0,失败返回错误码
线程等待
1.为什么要等待?
已经退出的线程,其空间没有被释放,仍在进程的地址空间中。
创建新的线程,不会复用刚才退出的线程的地址空间。
功能:等待线程结束
原型
int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的
终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数
PTHREAD_ CANCELED。 - 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参
数。 - 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
代码示例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int global_value = 100;
static void printTid(const char* name, const pthread_t& tid)
{
printf("%s is running, thread id:0x%x, global_value: %d\n",name,tid,global_value);
}
void* startRoutinue(void* args)
{
const char* name = static_cast<const char*>(args);
int cnt = 5;
printTid(name,pthread_self());
while(true)
{
if(!(cnt--))
{
global_value = 200;
}
}
cout << "thread exit" <<endl;
//1.线程退出的方式,return
return (void*)111;
//2.线程退出的方式,pthread_exit
//pthread_exit((void*)111);
}
int main()
{
pthread_t tid;
//pthread_t --> unsigned long int
int n = pthread_create(&tid,nullptr,startRoutinue,(void*)"thread1");
//线程退出的方式,给线程发送取消请求,如果线程被取消,退出结果是-1
//pthread_cancel(tid); //join拿到PTHREAD_CANCELED宏
(void)n;//避免在release版本下出现warning
sleep(2);
//cout << "new thread been canceled"<<endl;
while(true)
{
printTid("main thread",pthread_self());
sleep(1);
}
//线程推出的时候必须join,不进行join就会发生类似于进程那样的内存泄露的问题。
void* ret = nullptr;
//void* 在64为平台是8字节空间。
pthread_join(tid,&ret);
cout << "main thread join success,*ret = " <<(int*)ret << endl;
delete (int*)ret;
return 0;
}
理解pthread_t
线程是一个地址
1.线程是一个独立的执行流
2.线程一定会在自己的运行过程中,产生临时数据(调用函数,定义局部变量)
3.线程一定要有自己独立的栈结构
线程栈
使用的线程库,用户级的线程库,pthread
所以pthread究竟是什么?
pthread_t:对应的是用户级线程的控制结构体的起始地址。
**线程的全部实现,并没有全部体现在OS中,而是OS提供执行流,具体的线程操作由库来进行管理。**即库可以创建多个线程。
线程分离
默认情况下,新创建的线程是jionable的,线程退出后,需要对pthread_join操作,否则无法释放资源,进而造成资源泄露。
如果不关心现成的返回值,join是一种负担,这个时候就可以使用库中的相关操作,在线程退出的时候,自动释放线程资源。
函数:int pthread_detach(pthread_t thread);
可以是线程组内的其他线程对目标线程进行分离,也可以是线程自己分离:
pthread_detach(pthread_self());
joinable和分离是冲突的,一个线程不能即是joinable又是分离的。
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/syscall.h>//了解
using namespace std;
int global_value = 100;
//__thread int global_value = 100;
void* startRotine(void* args)
{
// pthread_detach(pthread_self());
// cout << "线程分离" << endl;
while(true)
{
cout << "thread "<<pthread_self() << "global_value: "<< global_value << " &global_value " << &global_value
<<" Inc: "<<global_value++ << " lwp: "<<::syscall(SYS_gettid) <<endl;
/*getpid()得到的是进程的pid,在内核中,每个线程都有自己的PID,要得到线程的PID(即TID),必须用syscall(SYS_gettid);
pthread_self()获取的是线程ID,仅在同一个进程中保证唯一*/
sleep(1);
break;
}
//任何一个线程调用exit都表示整个进程退出
// exit(1);
// pthread_exit();
return nullptr;
}
int main()
{
pthread_t tid1,tid2,tid3,tid4;
pthread_create(&tid1,nullptr,startRotine,(void*)"thread 1");
pthread_create(&tid2,nullptr,startRotine,(void*)"thread 2");
pthread_create(&tid3,nullptr,startRotine,(void*)"thread 3");
pthread_create(&tid4,nullptr,startRotine,(void*)"thread 4");
sleep(1);
// //倾向于: 让主线程分离其他线程
// pthread_detach(tid1);
// pthread_detach(tid2);
// pthread_detach(tid3);
// sleep(1);
//1.立即分离,延后分离 -- 线程活着 -- 意味着,这个线程的死活不再管. 4.线程退出的第四种方式:延后退出
//2.新线程分离,但是主线程先退出(进程退出) -- 一般分离线程,对应的main thread 一般不退出(常驻内存的进程)
//3.一般主线程退出,所有的退出,新线程中一个退了,并不影响主线程。
//pthread_join成功返回0,失败返回错误码
int n = pthread_join(tid1,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid2,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid3,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid4,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
return 0;
}
结果:
使用int global_value = 100;得:
使用__thread int global_value = 100;得:
线程互斥
进程线程间的沪指相关背景概念
1.临界资源:多个线程流共享的的资源称为临界资源
2.临界区:每一个线程内部,访问临界资源的代码,就叫做临界区。
3.互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
4.原子性:不被任何调度器机制打断的操作,该操作只有两种状态:完成 or 未完成
互斥量mutex
1.大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈的空间内。这种情况,变量归属单个线程,其他线程无法获取这种变量。
2.但是有时候,很多线程都需要线程共享,这样的变量称为共享变量,可以通过数据的共享,完成之间的交互。
3.多个线程并发的操作共享变量,会带来一些问题。
//抢票代码
int tickets = 1000;
void* getTickets(void* args)
{
const char* name = static_cast<const char*>(args);
while(true)
{
if(tickets > 0)
{
usleep(1000);
cout << " 抢到了票,票的编号是:" << tickets-- << endl;
//可能还有其他业务,暂时忽略
}
else
{
cout << name << "放弃抢票,因为没有余票了..." << endl;
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1,tid2,tid3,tid4;
pthread_create(&tid1,nullptr,getTickets,(void*)"thread 1");
pthread_create(&tid2,nullptr,getTickets,(void*)"thread 2");
pthread_create(&tid3,nullptr,getTickets,(void*)"thread 3");
pthread_create(&tid4,nullptr,getTickets,(void*)"thread 4");
//pthread_join成功返回0,失败返回错误码
int n = pthread_join(tid1,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid2,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid3,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid4,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TkX3dsJ1-1689079601986)(https://typora-1311788518.cos.ap-beijing.myqcloud.com/mytyporaQQ%E6%88%AA%E5%9B%BE20221214004350.png)]
可以看到,tickets出现了负数,这明显是错误的。
出现这种情况的原因是:线程切换 – 时间片到了,线程会在内核返回用户态做检测,创造更多的让线程阻塞的场景。CPU内的寄存器是被所有的执行流所共享的,但是寄存器里面的数据是属于当前执行流的上下文数据。
当线程被切换的时候,需要保存上下文。
当线程被换回的时候,需要恢复上下文。
假定有线程A 和 B两个线程去抢票:
判断语句是要CPU执行的,但是某个时间段线程A的时间片到了,线程A北埔挂起,线程B进抢占资源成功,但是CPU计算的值还未成功写入内存,就会出现,线程B使用最后一次A的场景,等到A再回来时,就可能出现票数变负的情况。
解决问题:
**解决问题就是线程加锁。**pthread_mutex
注意:
1.加锁主要是在临界区加锁,而且加锁的粒度越细越好。
2.加锁的本质是让线程执行临界区代码串行化。
3.枷锁是一套规范,通过临界区对临界资源进行访问的时候,都必须先申请锁,前提是必须先看懂锁。
4.锁本身就是临界资源,但是pthread_mutex_lock:竞争和申请加锁的过程就是原子的。
5.加锁的难度在临界区里面,是否没有线程切换。
互斥量的接口
初始化互斥量两种方法:
1.方法一:静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
2.方法二:动态分配
int pthraed_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t* restrictattr);
参数:mutex:要初始化的互斥量
attr:NULL
销毁互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
注意:
1.使用PTHREAD_MUTEX_INITIALIZER
初始化的互斥量不需要销毁。
2.不要销毁一个已经加锁的互斥量
3.已经销毁的互斥量,要确保后面不会有线程再尝试加锁。
互斥加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
对抢票代码进行加锁:
//创建锁对象
pthread_mutex_t mutex;
int tickets = 1000;
void* getTickets(void* args)
{
const char* name = static_cast<const char*>(args);
while(true)
{
//在临界区加锁
pthread_mutex_lock(&mutex);
if(tickets > 0)
{
usleep(1000);
cout << " 抢到了票,票的编号是:" << tickets-- << endl;
//解锁
pthread_mutex_unlock(&mutex);
//可能还有其他业务,暂时忽略
}
else
{
cout << name << "放弃抢票,因为没有余票了..." << endl;
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1,tid2,tid3,tid4;
//对锁初始化
pthread_mutex_init(&mutex,nullptr);
pthread_create(&tid1,nullptr,getTickets,(void*)"thread 1");
pthread_create(&tid2,nullptr,getTickets,(void*)"thread 2");
pthread_create(&tid3,nullptr,getTickets,(void*)"thread 3");
pthread_create(&tid4,nullptr,getTickets,(void*)"thread 4");
//pthread_join成功返回0,失败返回错误码
int n = pthread_join(tid1,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid2,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid3,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
n = pthread_join(tid4,nullptr);
cout<< n<< ":" << strerror(n)<<endl;
//销毁锁
pthread_mutex_destroy(&mutex);
return 0;
}
加锁以后,就不会出现临界资源不符的问题了。
调用pthread_lock可能遇到以下情况:
1.互斥量处于未锁状态,该函数将会互斥量锁定,同时返回成功。
2.发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但是没有竞争到互斥量,那么pthread_lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
封装锁及实现RAII
#include <pthread.h>
#include <iostream>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_lock,nullptr);
}
void lock()
{
pthread_mutex_lock(&_lock);
}
void unlock()
{
pthread_mutex_unlock(&_lock);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockQuard
{
public:
LockQuard(Mutex* mutex):_mutex(mutex){
_mutex->lock();
std::cout<< "加锁成功" << std::endl;
}
~LockQuard()
{
_mutex->unlock();
std::cout << "解锁成功" << std::endl;
}
private:
Mutex *_mutex;
};
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "log.hpp"
using namespace std;
int tickets = 1000;
Mutex mymutex;//创一个对象
//函数本身是一个代码块,会被多个线程同时调用执行,该函数被重复进入 --- 被重入了
bool getTickets()
{
bool ret = false;
LockQuard lockQuard(&mymutex);//临时对象,可以完成作用域外直接销毁
if(tickets > 0)
{
usleep(1001);//线程切换
cout << "thread: " << pthread_self() << " tickets: " << tickets << endl;
tickets--;
ret = true;
}
return ret;
}
void* startRoutinue(void* args)
{
const char* name = static_cast<const char*>(args);
//抢票
while(true)
{
if(!getTickets())
{
break;
}
cout << name << " get tickets success" <<endl;
//做其他事
usleep(1000);
}
return nullptr;
}
互斥实现原理探究
单纯的i++ 都不是原子的,可能会有数据一致性的问题。
为了实现互斥锁操作,大多数体系结构都提供了swap或者exchange指令,该指令的作用是吧寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多个处理器平台,访问内存的总线程周期也有先后,一个处理器上的交换指令执行时,另一个处理器的交换指令只能等待总线程周期。
关于lock和unlock的伪代码:
%al : 寄存器 – CPU
mutex : 内存的一个变量
凡是在寄存器中的数据,全部都是线程的内部上下文,多个线程看起来同时在访问寄存器,但是互不影响。
上锁:
1.将thread中的0 load进寄存器中
2.将mutex和寄存器中的值交换
3.判断语句执行。
4.如果时间片到了,将寄存器中的值回收,线程B进入步骤1
5.必要时执行goto语句
解锁:
1.将1 load进mutex中
2.唤醒等待的mutex线程。
常见的线程安全的情况
1.每个线程对全局变量或者静态变量只有读取权限,而没有写入权限,一般来说这些线程是安全的。
2.类或者接口对于线程来说都是原子操作。
3.多个线程之间的切换不会导致改接口的执行结果存在二义性。
常见的不可重入的情况
1.调用了malloc/free函数,因为malloc函数是全局链表来管理堆的。
2.调用了标准的I/O库函数,标准的I/O库函数很多都是不可重入的方式使用全局数据结构。
3.可重入函数体内使用了静态数据结构。
常见的可重入的情况
1.不使用全局变量 或 静态变量。
2.不使用malloc/free 或者 new 开辟新空间
3.不调用不可重入函数
4.不返回静态 或者 全局数据,所有数据都有函数调用者提供。
5.使用本地数据,或者 通过制作去那句数据的办呢滴拷贝来保护全局数据
可重入与线程安全联系
1.函数是可重入的,那线程就是安全的。
2.函数是不可重入的,那就不能多个线程使用,有可能引发线程安全问题。
3.如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全的区别
1.可重入函数是线程安全函数的一种。
2.线程安全不一定是可重入的,而可重入函数一定是线程安全的。
3.如果将对临界资源的访问上锁,则这个函数是线程安全的,但如果这个重入函数托锁还未释放,则会产生死锁,因此是不可重入的。
常见的锁概念
死锁
死锁是指在一组进程中的各个线程均占有不会释放的资源,的那一次相互申请被其他进程所占用不会释放的资源而处于一种永久等待的状态。
示例:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "log.hpp"
using namespace std;
int tickets = 1000;
pthread_mutex_t mutexA = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutexB = PTHREAD_MUTEX_INITIALIZER;
void* routinue1(void* args)
{
pthread_mutex_lock(&mutexA);
sleep(1);
pthread_mutex_lock(&mutexB);
const char* name = static_cast<const char*> (args);
cout << "name: "<<name <<" tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexB);
pthread_mutex_unlock(&mutexA);
return nullptr;
}
void* routinue2(void* args)
{
pthread_mutex_lock(&mutexB);
sleep(1);
pthread_mutex_lock(&mutexA);
const char* name = static_cast<const char*> (args);
cout << "name: "<<name <<"tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexB);
pthread_mutex_unlock(&mutexA);
return nullptr;
}
int main()
{
pthread_t t1,t2;
pthread_create(&t1,nullptr,routinue1,(void*)"thread 1");
pthread_create(&t2,nullptr,routinue2,(void*)"thread 2");
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_mutex_destroy(&mutexA);
pthread_mutex_destroy(&mutexB);
return 0;
}
死锁的四个必要状态
1.互斥条件:一个资源每次只能被一个执行流使用
2.请求与报纸条件:一个执行流因请求资源而阻塞时,队以获得的资源保持不放
3.不剥夺条件:一个执行流已获得的资源,围在使用完之前,不能强行剥夺
4.循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
破环死锁的四个必要条件:
1.加锁顺序一致
2.避免锁未释放的场景
3.资源一次分配
4.规避循环等待
避免思索算法
1.死锁检测算法
2.银行家算法
Linux线程同步
条件变量
当一个线程互斥访问某个变量的时候,他可能发现在其他线程改变状态之前,什么都做不了。
同步概念和竞态条件
同步:保证数据安全的情况下,让线程能够安照某种特定的顺序访问临界资源,从而有效避免饥饿问题。
竞态条件:因为时序问题,而导致程序异常。
条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 参数: cond:要初始化的条件变量 attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 参数: cond:要在这个条件变量上等待 mutex:互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);
下面我们用主线程创建三个新线程,让主线程控制这三个新线程活动。这三个新线程创建后都在条件变量下进行等待,直到主线程检测到键盘有输入时才唤醒一个等待线程,如此进行下去
相关代码:
#include <iostream>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <functional>
//一把锁也有可能造成死锁,比如忘记了解锁
using namespace std;
//定义一个条件变量
pthread_cond_t cond;
//定义一把锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
vector<function<void()>> vec;
void show()
{
cout << "show()" << endl;
}
void play()
{
cout << "play()" << endl;
}
//定义一个全局变量
volatile bool quit = false;
void* waitCommd(void* args)
{
while(!quit)
{
//下面代码。证明在某种条件不就绪,就让这个线程等待
pthread_cond_wait(&cond,&mutex);//对应的线程在此处等待被唤醒
//唤醒以后,会给唤醒的线程自动加锁。
for(auto& e: vec)
{
e();
}
cout << "tid: " << pthread_self() <<endl;
}
return nullptr;
}
int main()
{
vec.push_back(show);
vec.push_back(play);
vec.push_back([](){
cout << "完美世界" <<endl;
});
pthread_cond_init(&cond,nullptr);
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,waitCommd,nullptr);
pthread_create(&t2,nullptr,waitCommd,nullptr);
pthread_create(&t3,nullptr,waitCommd,nullptr);
while(true)
{
//sleep(1);
//pthread_cond_signal(&cond); //唤醒一个线程
//pthread_cond_broadcast(&cond);
char n = 'a';
cout << "请输入你的commd" <<endl;
cin >> n;
if(n == 'n')
pthread_cond_signal(&cond);
sleep(1);
}
pthread_cond_destroy(&cond);
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
POSIX 信号量
信号量是一个计数器,描述临界资源数量的计数器。
–P–> 原子的 --> 申请资源
++V --> 原子的 --> 归还资源
1.信号量申请成功,就一定能保证你会获得指定的资源。
2.临界资源就可以被当成整体,可结合场景看成各个小的部分。
资源预定:
申请mutex --只要拿到了锁,当前的临界资源就是我的
可能存在被切换的问题,但是不用担心。
//访问临界资源
释放mutex
信号量: 1
p – 1 – 0 – 加锁
v – 0 – 1 --释放锁
二元信号量 === 互斥锁
为什么pthread_cond_wait 需要互斥量?
1.条件等待是线程间同步的一种手段,如果是一个线程,条件不满足,一直等待下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,是原先不满足的条件变得满足,并且有好的通知等待在条件百年量上的线程。
2.条件不会无缘无故的突然无法满足,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护,没有互斥锁就无法安全的获取和修改共享数据。
条件变量使用规范
等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
//修改条件
pthread_mutex_unlock(&mutex);
给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
生产者消费者模型
321原则
使用生产者消费者模型
通过一个容器来解决生产者和消费者的强耦合状态。生产者和消费者彼此之间不直接通讯,而
通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
模型优点
1.解耦
2.支持并发
3.支持忙闲不均
基于blockqueue的生产者消费者模型
在多线程编程中,阻塞式队列是一种常用的用于实现生产者和消费者模型的数据结构,与普通队列区别在于,当队列为空的时候,从队列获取元素的操作会被阻塞,知道队列中被加入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
C++queue模拟阻塞式队列生产消费模型
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include <string.h>
#include <stdlib.h>
using namespace std;
//默认队列的容量是五个
const uint32_t gDefaultCap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap):_cap(cap)
{
//锁初始化
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_conCond,nullptr);
pthread_cond_init(&_proCond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_conCond);
pthread_cond_destroy(&_proCond);
}
public:
//生产接口
void push(const T& in)//纯输入
{
//加锁
//判断是否适合生产 --> _bq是否为满 -->1.满(不适合) 2.未满(适合)
//if满 不生产 休眠
//else if 生产,唤醒消费者
//解锁
lockQueue();
while(isFull())
{
//阻塞式等待
//before:当等待的时候,会自动释放mutex
proBlockwait();
//after:当醒来的时候,是在临界区醒来的
}
//条件满足,可以生产
pushCore(in);
//解锁
unlockQueue();
//唤醒消费者
wakeupCon();
}
//消费者
T pop()
{
//加锁
//判断是否适合消费 --> _bq是否为空 -->1.空(不适合) 2.未空(适合)
//if空 不消费 休眠
//else if 消费,唤醒生产者
//解锁
lockQueue();
while(isEmpty())
{
conBlockwait();
}
T tmp = popQueue();
unlockQueue();
wakeupPro();
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
bool isFull()
{
return _bq.size() == _cap;
}
void proBlockwait()//生产者一定是在临界区的
{
//1.在阻塞线程的时候,会自动释放mutex锁
pthread_cond_wait(&_proCond,&_mutex);
//2.当阻塞结束,返回的时候,pthread_cond_wait会帮你重新获得mutex,然后返回
}
void wakeupCon()//唤醒消费者
{
pthread_cond_signal(&_conCond);
}
void pushCore(const T& in)
{
_bq.push(in);
}
private:
bool isEmpty()
{
return _bq.empty();
}
void conBlockwait()
{
pthread_cond_wait(&_conCond,&_mutex);
}
T popQueue()
{
T tmp = _bq.front();
_bq.pop();
return tmp;
}
void wakeupPro()
{
pthread_cond_signal(&_proCond);
}
private:
uint32_t _cap;//队列容量
queue<T> _bq;//队列
pthread_mutex_t _mutex;//阻塞式队列的互斥锁
pthread_cond_t _conCond;//让消费者等待的条件变量
pthread_cond_t _proCond;//让生产者等待的条件变量
};
-------------------------------------
//BlockQueue.cpp
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
const char* ops = "+-*/%";
void* consumer(void* args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t = bq->pop();
int result = t(); //相当于 t.run()
int one,two;
char op;
t.get(&one,&two,&op);
cout << "consumer ["<<pthread_self() <<"] " << (unsigned long)time(nullptr) << " 消费了一个任务" << one<<op<<two<<"="<< result <<endl;
}
}
void* productor(void* args)
{
BlockQueue<Task>* bqp = static_cast<BlockQueue<Task>*>(args);
while(true)
{
//1.制作任务
int one = rand()%50;
int two = rand()%30;
char op = ops[rand()%strlen(ops)];
Task t(one,two,op);
//2.生产任务
bqp->push(t);
cout << "productor["<<pthread_self() <<"] " << (unsigned long)time(nullptr) << "生产了一个任务" << one<<op<<two<<"=?"<<endl;
sleep(1);
}
}
int main()
{
//定义一个阻塞式队列
//创建两个线程,productor,consumer
srand((unsigned long)time(nullptr));
BlockQueue<Task> bq;
pthread_t c,p;
pthread_create(&c,nullptr,consumer,&bq);
pthread_create(&p,nullptr,productor,&bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
---------------------------
//Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
//任务就是写一个简单的整数计算器
class Task
{
public:
Task() = default;
Task(int one,int two,char op):_ele1(one),_ele2(two),_operator(op)
{}
int operator()()
{
return run();
}
int run()
{
int result = 0;
switch(_operator)
{
case '+':
result = _ele1 + _ele2;
break;
case '-':
result = _ele1 - _ele2;
break;
case '*':
result = _ele1 * _ele2;
break;
case '/':{
if(_ele2 == 0)
cerr << "div zero" <<endl;
else
result = _ele1 / _ele2;
break;
}
case '%':{
if(_ele2 == 0)
{
cerr << "mod zero" <<endl;
result = -1;
}
else
result = _ele1 % _ele2;
break;
}
default:
cout << "null operator" << endl;
break;
}
return result;
}
int get(int* e1,int* e2,char* op)
{
*e1 = _ele1;
*e2 = _ele2;
*op = _operator;
}
private:
int _ele1;
int _ele2;
char _operator;
};
C++环形队列模拟生产消费模型
环形队列相关知识:
判空判满:空和满的状态是一样的,但可以通过加计数器或者标记位来判断满或者空,也可以预留一个空的位置,作为满的状态。
两个线程 消费者 生产者
有可能访问同一个位置,发生的条件:
1.当两个指向同一个位置的时候,即只有满和空的时候 (互斥和同步)
2.其他时间是指向不同的位置(并发)
后续操作基本原则:
1.空:消费者不能超过生产者 -->生产者先运行
2.满:生产者不能够过量生产 --> 消费者先运行
如何保证? —信号量保证
对于生产者最关心的是空间问题 N ->P(N) N [N,0] sem_t roomSem = N;
消费者最关心的是数据 [0,N] sem_t dataSem = 0;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JuyOKPMe-1689079601989)(https://typora-1311788518.cos.ap-beijing.myqcloud.com/mytyporaQQ%E6%88%AA%E5%9B%BE20230109222800.png)]
程序员保证不同的线程,访问的是临界资源中不同的区域。
新的头文件 #include <semaphore.h>
sem_init:初始化信号量
sem_wait:阻塞式等待,它的作用是从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值才开始做减法。也就是说,如果你对一个值为2的信号量调用sem_wait(),线程将会继续执行,介信号量的值将减到1。如果对一个值为0的信号量调用sem_wait(),这个函数就会地等待直到有其它线程增加了这个值使它不再是0为止。如果有两个线程都在sem_wait()中等待同一个信号量变成非零值,那么当它被第三个线程增加一个“1”时,等待线程中只有一个能够对信号量做减法并继续执行,另一个还将处于等待状态。
信号量这种“只用一个函数就能原子化地测试和设置”的能力下正是它的价值所在。还有另外一个信号量函数sem_trywait,它是sem_wait的非阻塞搭档。
sem_post:unlock semaphore 给信号量的值加上一个“1”,它是一个“原子操作”---即同时对同一个信号量做加“1”操作的两个线程是不会冲突的;而同时对同一个文件进行读、加和写操作的两个程序就有可能会引起冲突。信号量的值永远会正确地加一个“2”--因为有两个线程试图改变它。
基本代码:
//ringqueue.hpp
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
using namespace std;
const int g_cap = 10;
template<class T>
class RingQueue
{
public:
RingQueue(int cap = g_cap):_ringqueue(cap),_pIndex(0),_cIndex(0)
{
//生产
sem_init(&_roomSem,0,_ringqueue.size());
//消费
sem_init(&_dataSem,0,0);
pthread_mutex_init(&_pmutex,nullptr);
pthread_mutex_init(&_cmutex,nullptr);
}
~RingQueue()
{
sem_destroy(&_roomSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
public:
//生产
void push(const T& in)
{
sem_wait(&_roomSem);//无法被多次申请
pthread_mutex_lock(&_pmutex);
//生产过程
_ringqueue[_pIndex++] = in;
//更新下标
_pIndex %= _ringqueue.size();
pthread_mutex_unlock(&_pmutex);
sem_post(&_dataSem);//unlock semaphore
}
T pop()
{
sem_wait(&_dataSem);
pthread_mutex_lock(&_cmutex);
//消费
T tmp = _ringqueue[_cIndex];
_cIndex++;
//更新下标
_cIndex%=_ringqueue.size();
pthread_mutex_unlock(&_cmutex);
sem_post(&_roomSem);
return tmp;
}
private:
vector<T> _ringqueue;//环形队列
sem_t _roomSem;//衡量空间计数器,productor
sem_t _dataSem;//衡量空间计数器,consumer
uint32_t _pIndex;//生产者当前位置,如果是多线程,就是临界资源
uint32_t _cIndex;//消费者当前位置
pthread_mutex_t _pmutex;
pthread_mutex_t _cmutex;
};
//ringqueue.cc
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
void* consumer(void* args)
{
RingQueue<int>* rqc = static_cast<RingQueue<int>*>(args);
while(true)
{
sleep(10);
int data = rqc->pop();
cout << "pthread["<<pthread_self() <<"] " << "消费了一个数据" << data <<endl;
}
}
void* productor(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = rand() % 20;
rqp->push(data);
cout << "pthread["<<pthread_self() <<"] " << "生产了一个数据" << data <<endl;
sleep(1);
}
}
int main()
{
srand((unsigned long)time(nullptr));
RingQueue<int> rq;
pthread_t c1,c2,p1,p2;
pthread_create(&c1,nullptr,consumer,&rq);
pthread_create(&c2,nullptr,consumer,&rq);
pthread_create(&p1,nullptr,productor,&rq);
pthread_create(&p2,nullptr,productor,&rq);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
return 0;
}
线程池
一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护者多个进程,等待着监督管理者分配可并发执行的任务。避免了处理短时间任务时创建和销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过度过分调用。可用线程数量应该取决于可用的并发处理器,处理器内核,内存,网络sockets的数量
应用场景:
1.需要大量的线程来完成任务,且完成任务需要的时间比较短。如:web服务器完成网页请求。
2.对性能要求苛刻的应用,比如要求服务器迅速响应客户的请求
3.接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,短时间产生大量的线程可能使内存到达极限,出现错误。
线程池示例:
1.创建固定数量线程池,循环从任务队列中获取任务对象
2.获取到任务对象后,执行任务对象中的任务接口
//threadpool.hpp
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <queue>
#include <sys/prctl.h>
#include "lock.hpp"
#include "log.hpp"
using namespace std;
const int g_threadNum = 5;
template <class T>
class ThreadPool
{
private:
ThreadPool(int threadNum = g_threadNum) : _threadNum(threadNum), _isStart(false)
{
assert(_threadNum > 0);
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (instance == nullptr) // 仅仅是过滤重复的判断
{
LockQuard lockguard(&mutex); // 进入代码块,加锁。退出代码块,自动解锁
if (instance == nullptr)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
// 线程函数 内类成员 成员函数 默认有this指针
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
// ptcl修改线程的名字
prctl(PR_SET_NAME, "follwer");
while (1)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
// 这个文件被拿到了线程的上下文中
T t = tp->pop();
tp->unlockQueue();
// for debug
int one, two;
char op;
t.get(&one, &two, &op);
// 规定,所有的任务必须要有一个run方法
log() << "新的线程完成计算任务" << one << op << two << "=" << t.run() << endl;
}
}
void start()
{
assert(!_isStart);
for (int i = 0; i < _threadNum; i++)
{
pthread_t tmp;
pthread_create(&tmp,nullptr,threadRoutine,this);
}
_isStart = true;
}
void push(const T& in)
{
lockQueue();
_taskQueue.push(in);
//
choiceThreadForHandler();
unlockQueue();
}
private:
bool _isStart;
int _threadNum;
queue<T> _taskQueue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *instance;
private:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool haveTask() { return !_taskQueue.empty(); }
void waitForTask() { pthread_cond_wait(&_cond, &_mutex); }
T pop()
{
T tmp = _taskQueue.front();
_taskQueue.pop();
return tmp;
}
void choiceThreadForHandler(){pthread_cond_signal(&_cond);}
};
template <class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;
//test.cc
#include "threadpool.hpp"
#include "Task.hpp"
#include <memory>
#include <ctime>
#include "log.hpp"
int main()
{
// 修改线程名
prctl(PR_SET_NAME, "master");
const string operators = "+-*/%";
unique_ptr<ThreadPool<Task>> tp(ThreadPool<Task>::getInstance());
tp->start();
srand((unsigned long)time(nullptr));
// 派发线程任务
while (true)
{
int one = rand() % 100;
int two = rand() % 50;
char op = operators[rand() % operators.size()];
log() << "主线程派发计算任务" << one << op << two << "=?" << endl;
Task t(one,two,op);
tp->push(t);
sleep(1);
}
return 0;
}
//log.cc
#pragma once
#include <iostream>
#include <time.h>
#include <pthread.h>
std::ostream &log()
{
std::cout << "For Debug | "
<< "timestamp" << (unsigned long)time(nullptr) << " | "
<< "Thread[" << pthread_self() << "] | ";
return std::cout;
}
//lock.hpp
#include <pthread.h>
#include <iostream>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_lock,nullptr);
}
void lock()
{
pthread_mutex_lock(&_lock);
}
void unlock()
{
pthread_mutex_unlock(&_lock);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockQuard
{
public:
LockQuard(Mutex* mutex):_mutex(mutex){
_mutex->lock();
std::cout<< "加锁成功" << std::endl;
}
~LockQuard()
{
_mutex->unlock();
std::cout << "解锁成功" << std::endl;
}
private:
Mutex *_mutex;
};
//Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
//任务就是写一个简单的整数计算器
class Task
{
public:
Task() = default;
Task(int one,int two,char op):_ele1(one),_ele2(two),_operator(op)
{}
int operator()()
{
return run();
}
int run()
{
int result = 0;
switch(_operator)
{
case '+':
result = _ele1 + _ele2;
break;
case '-':
result = _ele1 - _ele2;
break;
case '*':
result = _ele1 * _ele2;
break;
case '/':{
if(_ele2 == 0)
cerr << "div zero" <<endl;
else
result = _ele1 / _ele2;
break;
}
case '%':{
if(_ele2 == 0)
{
cerr << "mod zero" <<endl;
result = -1;
}
else
result = _ele1 % _ele2;
break;
}
default:
cout << "null operator" << endl;
break;
}
return result;
}
int get(int* e1,int* e2,char* op)
{
*e1 = _ele1;
*e2 = _ele2;
*op = _operator;
}
private:
int _ele1;
int _ele2;
char _operator;
};
线程安全的单例模式
单例模式的特点
某些类,之应该具有一个对象(示例),就称之为单例。
饿汉实现方式和懒汉实现方式
饿汉就是吃完饭立刻洗碗,下次吃饭就直接拿着碗吃
懒汉就是吃完饭不洗碗,下次吃饭再洗碗
饿汉方式实现单例模式
template <class T>
class Singleton
{
public:
static T *GetSingleton()
{
return &_data;
}
private:
static T *_data;
};
一个进程中只有一个T对象实例
懒汉模式实现单例模式
template <class T>
class Singleton
{
public:
static T *GetInstance()
{
if (_data == nullptr)
{
_data = new T();
}
return _data;
}
private:
static T *_data;
};
存在一个严重的问题,线程不安全
第一次调用GetInstance的时候,如果两个线程同时调用,就会出现两份T对象,但是后续再调用就不会出现问题了。
懒汉模式实现线程安全版本
template <class T>
class Singleton
{
public:
static T *GetInstance()
{
if (_data == nullptr)//多次判定,降低锁冲突的概率,提高性能
{
// 加锁
_mutex.lock();
if (_data == nullptr)
{
_data = new T();
}
// 解锁
_mutex.unlock();
return _data;
}
}
private:
volatile static T *_data; // 防止_data被优化
static mutex _mutex;
};
注意:
1.加锁解锁的位置
2.双重if判定,避免不必要的锁竞争
3.volatitle关键字防止过度优化
STL,智能指针和线程安全
STL是否是线程安全的?
否,STL默认不是线程安全的,在多线程环境下使用,需要自行保证安全
智能指针是否是线程安全的?
unique_ptr,只在当前代码块范围内生效,不涉及线程安全
shared_ptr,多个对象共用一个引用计数变量,会存在线程安全问题。但计数却是原子的。
其他常见的各种锁
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
自旋锁,公平锁,非公平锁?
读写锁
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的
机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地
降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-42OEkgzL-1689079601990)(https://typora-1311788518.cos.ap-beijing.myqcloud.com/mytyporaQQ%E6%88%AA%E5%9B%BE20230112225936.png)]
注意:写独占,读共享,读锁优先级高
321原则
3:读者 写者之间的关系
1.写着和写着互斥
2.读者和读者没有关系
3.读者和写者互斥关系
2:角色,读者和写着
1:读写场所
读者n,写者1
读者 写者
加读锁 加写锁
读取内容 写入修改内容
释放锁 释放锁
int readers = 0;
struct rwlock_t
{
int readers = 0;
int who;
mutex_t mutex;
}
写者饥饿问题
读者写者进行操作的时候:
读者非常多,频率比较高;
写者比较少,频率不高。
读者优先
rwlock 主要有以下几种特征:
多进程对临界区的读不互斥,可同步进行,互不影响
如果要执行写,需要等所有的读者退出才能执行写操作
如果正在执行写操作且未完成,这一阶段发生的读操作会被阻塞,即读写互斥
如果正在执行写操作且未完成,这一阶段发生的读操作会被阻塞,即写写互斥
不造成睡眠,等待形式是自旋
这种场景有点像行人过马路,公交车司机必须停在斑马线前等待所有行人过完马路才能继续往前开,在繁忙的时段,不断地有行人走过,就会导致公交车一直止步不前,甚至造成堵车。
这也是 rwlock 的一大缺点:写者优先级太低,在极端情况下甚至出现饿死的情况。
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/
#include <iostream>
#include <pthread.h>
#include <string>
#include <time.h>
#include <unistd.h>
using namespace std;
// 读写锁 rwlock
pthread_rwlock_t rwlock;
int times = 0;
void *read(void *args)
{
const char *name = static_cast<const char *>(args);
cout << "running..." << endl;
while (true)
{
pthread_rwlock_rdlock(&rwlock);
cout << "i am a reader :[" << pthread_self() << "] | times: " << times << endl;
sleep(5);
pthread_rwlock_unlock(&rwlock);
}
}
void *writer(void *args)
{
const char *name = static_cast<const char *>(args);
sleep(1);
while (true)
{
pthread_rwlock_wrlock(&rwlock);
++times;
cout << "i am a writer" << endl;
sleep(5);
pthread_rwlock_unlock(&rwlock);
}
}
int main()
{
pthread_rwlock_init(&rwlock, nullptr);
pthread_t t1, t2, t3, t4;
pthread_create(&t1, nullptr, read, (void *)"read");
pthread_create(&t2, nullptr, read, (void *)"read");
pthread_create(&t3, nullptr, read, (void *)"read");
pthread_create(&t4, nullptr, writer, (void *)"writer");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
pthread_rwlock_destroy(&rwlock);
return 0;
}