目录
进程互斥
计算 -> 时序问题
加锁保护
pthread_mutex_lock
pthread_mutex_unlock
使用init与destory
pthread_mutex_init
phtread_mutex_destory
锁的实现原理
图
可重入VS线程安全
死锁
Linux线程同步
条件变量
系统调用
进程互斥
进程线程间的互斥相关背景概念:
- 临界资源:多线程执行流共享的资源,在任何时刻只能被一个执行流访问的资源,就叫做临界资源。(一个资源被多执行流共享的情况下,通过一定的方法让任何时刻,只能被一个执行流访问的资源)
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。(没有访问临界资源的代码,叫做正常代码)
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
- 原子性:在任意时刻不会因为某些原因,事物只做一半,要么不做,要么就做全。即:该操作只有两态,要么完成,要么未完成。
需要互斥的场景:
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互 —— 此时,因为其中的一些特性(数据不一致)就会使用到互斥。
#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
int tickets = 1000;
void *getTickets(void *args)
{
(void)args; // 防止release下因为未使用而报错
while(true)
{
if(tickets > 0)
{
usleep(1000);
printf("%p: %d\n", pthread_self(), tickets);
tickets--;
}
else
break;
}
return nullptr;
}
int main()
{
pthread_t t1,t2,t3;
// 多线程抢票的逻辑 - t1、t2、t3对getTickets函数重入
pthread_create(&t1, nullptr, getTickets, nullptr);
pthread_create(&t2, nullptr, getTickets, nullptr);
pthread_create(&t3, nullptr, getTickets, nullptr);
// 线程等待
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
}
(不)可重入函数
main函数调用insert函数向一个链表head中插入节点node1,插入操作分为两步,刚做完第一步的 时候,因为硬件中断使进程切换到内核,再次回用户态之前检查到有信号待处理,于是切换到sighandler函数,sighandler也调用insert函数向同一个链表head中插入节点node2,插入操作的两步都做完之后从sighandler返回内核态,再次回到用户态就从main函数调用的insert函数中继续往下执行,先前做第一步之后被打断,现在继续做完第二步。结果是,main函数和sighandler先后向链表中插入两个节点,而最后只有一个节点真正插入链表中了。于是便出现了经典的内存泄漏问题。此问题存在,且非常不容易排查。
排查代码的时候,可以发现,我们的代码写的没有任何问题。对应的main函数、单链表头插insert、信号捕捉sighandler、函数调用都没有问题 —— 这个问题的产生严格来说并不是代码问题,而是因为操作系统调度导致的进程时序的变化 —— 时序问题。
像上例这样,insert函数被不同的控制流程调用,有可能在第一次调用还没返回时就再次进入该函数,这称为重入。insert函数访问一个全局链表,有可能因为重入而造成错乱,像这样的函数称为不可重入函数,反之, 如果一个函数只访问自己的局部变量或参数,则称为可重入(Reentrant) 函数。
可重入函数 VS 不可重入函数
是函数的一种特征,目前我们用的90%函数,都是不可重入的。
- 不可重入函数:好编写。
- 可重入函数:不好编写,书写成本高。
如果一个函数符合以下条件之一则是不可重入的:
- 调用了new、malloc或free,因为new、malloc也是用全局链表来管理堆的。
- 99%的STL容器,都是不可重入的。
- 函数里面带static的。
- 调用了标准I/O库函数。标准I/O库的很多实现都以不可重入的方式使用全局数据结构。
可重入函数:需要保证函数其是独立的,没有访问任何的全局数据。
此时,我们会发现一个问题,票居然减到负数了。所以 —— 在多线程的抢票下,对于共享资源如果不加以保护,就可能在一定的时序下,引起问题。
计算 -> 时序问题
代码经过编译,变成二进制可执行程序。加载之后运行,其一定在内存里,代码和数据都在内存里。而计算需要CUP进行计算,所以计算的第一种事情就是:将内存当中的数据,加载入CPU当中 —— 线程读取内存中的数据,读到CPU的寄存器里 —— 把数据读到当前执行流的上下文当中。
判断的本质也是一种计算
计算分类:
- 数值计算
- 逻辑计算
从切换讲:
当一个线程刚刚将这个判断加载到寄存器里,就被切换了。其他线程也过来将这个判断加载到寄存器里。此时如果判断就是true,就会都是读到true。于是多个执行流进入循环进行抢票,多个线程减票数,于是导致票成负数。
从并行讲:
多个线程并行运行,可能多个线程同时判断true,进入循环进行抢票。于是多个执行流进入循环进行抢票,多个线程减票数,于是导致票成负数。
Note:
因为由于并发的问题,进而导致对应的数据变成负数。
同样的道理,对全局数据的 -- 操作也会有问题。
因为 ticket-- 不是原子性的:
400511: 8b 05 15 0b 20 00 mov 0x200b15(%rip),%eax # 60102c <ticket>
400517: 83 e8 01 sub $0x1,%eax
40051a: 89 05 0c 0b 20 00 mov %eax,0x200b0c(%rip) # 60102c <ticket>
引发了数据紊乱的问题,造成了数据不一致的问题。这就是因为线程不断切换,而对一个不加保护的全局变量,做切换,可能会引发的问题。
此处问题tickets的票数为负数的问题就是二者共同导致的,于是我们要避免这样问题的产生 —— 多执行流,对这类问题的数据进行保护 —— 加锁保护。
加锁保护
写代码的过程中,使用的最高频的保护策略:互斥锁
锁的使用原理:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
首先,需要我们自己定义一把锁:
随后,需要我们将锁进行初始化:
1. 对于局部的锁初始化利用pthread_mutex_init。(也可以对全局的使用)
(较为难,后面讲解)
2. 对这把锁,只要是我们定义的是静态或全局,我们可以直接使用PTHREAD_MUTEX_INITIALIZER(宏),进行初始化。
有了定义的这一把锁,并初始化之后。就可以直接使用这一把锁,来进行对特定的区域进行保护。
于是这把锁用来保护全局变量tickets,称作为:对临界资源进行访问时,对其进行保护。即,对其中访问临界资源的临界区进行加锁保护。对临界区加锁 —— pthread_mutex_lock。
pthread_mutex_lock
#include <pthread.h> // 加锁 int pthread_mutex_lock(pthread_mutex_t *mutex);
参数:
mutex:传入我们定义并初始化的一把锁。
返回值:
- 成功,返回0。
- 失败,返回错误码。
pthread_mutex_lock—— 申请锁的时候,别的线程也在申请,如果申请锁不成功,会直接阻塞。
pthread_mutex_trylock —— 申请锁的时候,别的线程也在申请,如果申请锁不成功,会直接返回。
(意义凸显于死锁,在死锁部分讲解)
使用锁对特定的区域进行保护:
所有的线程进入循环抢票时,都需要先执行语句: pthread_mutex_lock(&mtx); 。于是由于这个锁的特点:任何一个时刻,只允许一个成功的线程,成功的获取这把锁,然后,继续向后执行。其他的线程只能默认的阻塞等待,直到拿到锁的线程最终将锁释放掉,才能有机会进入执行 —— 这就是互斥概念。
可以理解为:
加锁 ——> 互斥 ——> 串行执行。
有加锁操作,就有解锁操作 —— pthread_mutex_unlock。
pthread_mutex_unlock
#include <pthread.h> // 解锁 int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数:
mutex:传入我们定义并初始化的一把锁。
返回值:
- 成功,返回0。
- 失败,返回错误码。
注意一定要解锁,因为锁是全局的,就算具有锁的进程退出了(终止了)。但是,全局的锁还是处于被修改的状态,也就是处于加锁状态,其他线程就无法继续向后执行。
此处容易出现一个错误:
- 加锁(pthread_mutex_lock)与解锁(pthread_mutex_unlock)之间的代码就称作临界区。
- 被串行化访问的共享的全局的tickets就叫做临界资源。
以前是多线程并发的执行,也就是彼此之间的数据层面会影响,但是执行之间是各自跑各自的不会互相干扰。但是当加锁之后,就会引起执行流之间互相影响,只能一个执行流跑。这一定的也就降低了效率,并且会导致一个新的,几乎无法解决的问题。
#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
int tickets = 1000;
// 加锁保护
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // pthread_mutex_t 就是原生数据库提供的一个数据类型
void *getTickets(void *args)
{
(void)args; // 防止release下因为未使用而报错
while (true)
{
pthread_mutex_lock(&mtx);
if (tickets > 0) // 1.判断的本质也是计算的一种
{
usleep(1000);
printf("%p: %d\n", pthread_self(), tickets);
tickets--; // 2.也可能出现问题
pthread_mutex_unlock(&mtx);
}
else
{
pthread_mutex_unlock(&mtx);
break;
}
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3;
// 多线程抢票的逻辑 - t1、t2、t3对getTickets函数重入
pthread_create(&t1, nullptr, getTickets, nullptr);
pthread_create(&t2, nullptr, getTickets, nullptr);
pthread_create(&t3, nullptr, getTickets, nullptr);
// 线程等待
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
}
补充:
此处不用当一回事,只是一个理解见识,真正的知识属于线程同步的概念(后面讲解)
为什么打印线程,发现一段时间内都是一个线程?
因为在抢票的时候,因为每个线程都有自己的时间片。一个线程在执行抢票期间,其他的线程在进行等待,执行的线程抢完票就解锁了。可是有可能此线程优先级很高,进而又得到锁继续抢票。
主要原因:所有的新线程执行的内容一样,导致调度器内部出现此问题。可以让每一个线程执行不同的任务改变。
#include <iostream> #include <cstdio> #include <cstdlib> #include <ctime> #include <pthread.h> #include <unistd.h> int tickets = 1000; // 加锁保护 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // pthread_mutex_t 就是原生数据库提供的一个数据类型 void *getTickets(void *args) { (void)args; // 防止release下因为未使用而报错 while (true) { pthread_mutex_lock(&mtx); if (tickets > 0) // 1.判断的本质也是计算的一种 { usleep(1000); printf("%s: %d\n", (void*)args, tickets); tickets--; // 2.也可能出现问题 pthread_mutex_unlock(&mtx); } else { pthread_mutex_unlock(&mtx); break; } usleep(rand()%10000); // 模拟每一个线程的任务不同 } return nullptr; } int main() { srand((unsigned long)time(nullptr)); pthread_t t1, t2, t3; // 多线程抢票的逻辑 - t1、t2、t3对getTickets函数重入 pthread_create(&t1, nullptr, getTickets, (void*)"thread one"); pthread_create(&t2, nullptr, getTickets, (void*)"thread two"); pthread_create(&t3, nullptr, getTickets, (void*)"thread three"); // 线程等待 pthread_join(t1, nullptr); pthread_join(t2, nullptr); pthread_join(t3, nullptr); }
Note:
加锁保护:加锁的时候,一定要保证加锁的粒度,越小越好。不要将无关竟要的代码也放入。
使用init与destory
pthread_mutex_init
#include <pthread.h> //对我们定义的锁进行初始化。 int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数:
mutex:传入我们定义的一把锁。
attr:锁的属性 —— 可以不管,直接设为nullptr。
返回值:
- 成功,返回0。
- 失败,返回错误码。
使用pthread_mutex_init初始化局部锁。 并且最后不要锁了,就必须要用phtread_mutex_destory,对锁进行释放。
phtread_mutex_destory
#include <pthread.h> //销毁锁 int pthread_mutex_destory(pthread_mutex_t *mutex);
参数:
mutex:传入我们定义并初始化的一把锁。
返回值:
- 成功,返回0。
- 失败,返回错误码。
销毁互斥量需要注意:
使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。 不要销毁一个已经加锁的互斥量。 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。
需要注意的是,我们在主线程内部创建的局部锁,线程是看不到的,所以我们需要将锁传递给每一个线程,此处也是对pthread_create的第四个参数的升级利用,此处用可以采取使用类的方式,封装锁等,传递进入。
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <unistd.h>
int tickets = 1000;
class ThreadData
{
public:
ThreadData(const std::string &n, pthread_mutex_t *pm):tname(n), pmtex(pm)
{}
std::string tname;
pthread_mutex_t *pmtex;
};
void *getTickets(void *args)
{
ThreadData *td = (ThreadData*)args;
while (true)
{
pthread_mutex_lock(td->pmtex);
if (tickets > 0) // 1.判断的本质也是计算的一种
{
usleep(1000);
printf("%s: %d\n", td->tname.c_str(), tickets);
tickets--; // 2.也可能出现问题
pthread_mutex_unlock(td->pmtex);
}
else
{
pthread_mutex_unlock(td->pmtex);
break;
}
usleep(rand()%10000); // 模拟每一个线程的任务不同
}
delete td;
return nullptr;
}
#define THREAD_NUM 3
int main()
{
pthread_mutex_t mtx;
pthread_mutex_init(&mtx, nullptr);
srand((unsigned long)time(nullptr));
pthread_t tid[THREAD_NUM];
for(int i = 0; i < THREAD_NUM; ++i)
{
std::string name = "thread ";
name += std::to_string(i + 1);
ThreadData *td = new ThreadData(name, &mtx);
pthread_create(tid + i, nullptr, getTickets, (void*)td);
}
// 线程等待
for(int i = 0; i < THREAD_NUM; ++i)
{
pthread_join(tid[i], nullptr);
}
pthread_mutex_destroy(&mtx);
return 0;
}
#问:加锁了之后,线程在临界区中,是否会切换?
会切换,因为是多条语句,操作系统有切换的权利,但是不会有问题。
使用层面上的理解:
当,持有锁的线程进入临界区的时候,是随时随地可能被切换,但是这与平时的不同。因为临界区中且换,该线程一定抢到了锁,并且没有释放锁。所以,即便是该线程不再执行了,但是其它线程想执行,也必须具有锁,但锁已经被切换的线程拿走了。
Note:
一个线程,不申请锁,单纯的访问临界资源 -- 错误的编码方式(编码规则)
于是对于线程2来说,对其有意义的事:
- 线程1没有持有锁(什么都没做),此时我可以申请锁。
- 线程1释放锁(做完),此时我可以申请锁。
体现出:线程1持有锁期间,对于其他线程来说线程1的操作(要么有锁、要么没锁)就是原子性的。
#问:加了锁就是串行吗?
是的,执行临界区代码一定事串行的。不然如何保证的原子性?
#问:要访问临界资源,每一个线程都必须现申请锁,每一个线程都必须先看到同一把锁,并访问它,锁本身是不是就是一种共享资源?那锁由谁来保护?
所以,为了保证锁的安全,申请和释放锁,必须是原子性的。重点:锁是如何实现的?
锁的实现原理
在汇编的视角:
只有一条汇编语句,就认为该汇编语句的执行是原子的。于是,为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,以一条汇编的方式,将内存和CPU内寄存区数据进行交换。
在执行流的视角:
CPU内部的寄存器的本质:当前执行流的上下文,寄存器的空间是被所有的执行流共享的,但是寄存器的内容(上下文),是被每一个执行流私有的。
核心:是交换并非拷贝 ——> 交换 - 永远只有一个1。
前面的学习使得我们知道,之所以出现数据不一致问题是因为,一个线程将一个共享数据拿走了,但是没来的及改变共享数据的数值,就被切换成了新的线程来使用这个共享数据。 而在此,是交换并非拷贝,于是可以理解为,有锁即为:1,没有锁即为:0。于是,利用交换的原理,将共享数据的数值与线程的自身数值进行切换,(CUP中寄存器的数据,本质上就是属于该线程的上下文数据)。
lock:只有执行了核心,才会将锁(1),从共有数据交换为加锁线程的私有程序。私有数据没锁(0),交换为公共数据,并且是只有一行的。所以一定是交换了 / 没交换两种状态。于是可以保证加锁的线程,可以将锁带走,因为加锁后,锁就是该线程的上下文数据。
unlock:与lock同理,movb行执行就交换线程上下文数据与共享数据。所以保证了只要换,锁一定释放,没换一定还在改线程手上。
融汇贯通的理解:所以,因为 锁是原子性的,利用一行汇编(交换),使得锁永远只有一个 。并且对于拥有锁的线程来说,锁是该线程的上下文,是线程的私有资源,除非unlock时交换回内存 (公共资源) 。不然永远为一个线程私有。也就进一步的说明了线程在临界区中,被会切换,是没有任何影响。
可重入VS线程安全
- 可重入:针对于函数的说法,一个函数被多个执行流重复进入的现象,并且重复进入期间不会出现问题 —— 描述函数。
- 线程安全:在线程执行过程当中,可能因为一个或若干个线程,在执行的过程当中访问了某些不该访问的资源(全局变量或者静态变量、共享的资源),进而导致其他线程数据不一致问题、其他线程崩溃、进程终止 —— 描述线程。
常见的线程不安全的情况:
- 不保护共享变量的函数。
- 函数状态随着被调用,状态发生变化的函数。
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数。
常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
- 类或者接口对于线程来说都是原子操作。
- 通过加锁的方式,局部上保持原子性。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。(函数不会有状态的变化)
常见不可重入的情况:
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的。
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构。
- 可重入函数体内使用了静态的数据结构。
常见可重入的情况:
- 不使用全局变量或静态变量。
- 不使用用malloc或者new开辟出的空间。
- 不调用不可重入函数。
- 不返回静态或全局数据,所有数据都有函数的调用者提供。
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
void func()
{
int myerrno = errno;
// ……
errno = myerron;
}
保证调用的时候,调用之前与调用之后,全局数据的errno的数据没有变化。因为在一个线程执行的时候,另一个线程也执行,可能会将errno的值进行修改。
可重入与线程安全联系:
(两个有交集,但是是不同的概念)
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
- 可重入函数是线程安全函数的一种。
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生。死锁,因此是不可重入的。
Note:
线程安全:有对错之分,一定要写出线程安全的函数。(不)可重入:没有对错之分,是特点。
死锁
如:在实际用锁的时候,不一定只用了一把锁,有可能用了2把锁以上。就可能出现以下问题:
线程申请一把锁是原子的,但是,当申请完一把锁再去申请另一把锁的时候,就可能会出现问题。
当两个线程同时执行,线程A拿了锁1,线程B拿了锁2。二者就要继续向后申请,向后拿到各自的最后一把锁。
这个时候就出现了,两个线程申请互相锁的状态 —— 死锁。
一把锁也可能出现死锁:
#include <iostream> #include <cstdio> #include <cstdlib> #include <pthread.h> #include <unistd.h> // 加锁保护 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; void *getTickets(void *args) { (void)args; pthread_mutex_lock(&mtx); // …… //pthread_mutex_lock(&mtx); 写错了,将解锁写成了加锁 pthread_mutex_lock(&mtx); return nullptr; } int main() { srand((unsigned long)time(nullptr)); pthread_t t1; pthread_create(&t1, nullptr, getTickets, (void *)"thread one"); // 线程等待 pthread_join(t1, nullptr); return 0; }
持有锁还将自己挂起,永远不可能被唤醒。
- 互斥条件:一个资源每次只能被一个执行流使用。
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。
以故事的视角理解:
锁 == 情侣关系
死锁 == 冷战
- 互斥条件,即只能由一个女孩的存在于情侣关系中。
- 请求与保持条件,冷战,女孩不认错想男孩认错。
- 不剥夺条件,冷战,女孩不强制男孩认错。
- 循环等待条件,冷战,男孩同时不认错想女孩认错。
避免死锁:
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
互斥条件:
是一切锁的最根本的功能,而死锁是肯定使用了锁而导致的,所以是不怎么考虑的解决方式。因为有情侣关系才会有冷战,不可能说为了合好,从而分手吧?但也不得说避免冷战的出现,就不谈恋爱,是最有效的方式。
直观来说,不适用锁,也就是从根源解决死锁。
请求与保持条件:
pthread_mutex_lock—— 申请锁的时候,别的线程也在申请,如果申请锁不成功,会直接阻塞。
pthread_mutex_trylock —— 申请锁的时候,别的线程也在申请,如果申请锁不成功,会直接返回。
中的tryloc作用就是凸显于此,因为使用其申请锁的线程,发现锁被其他线程占用,并不会阻塞,而是立即返回:
- 在成功完成之后会返回零。
- 其他任何返回值都表示出现了错误。(EBUSY:由于 mutex 所指向的互斥锁已锁定,因此无法获取该互斥锁。)
将返回值EBUSY的次数进行连续记录,如果到到达一定数目,该线程将自身所有的锁解掉。
不剥夺条件:
线程尝试着去抢占(竞争)锁,依据一定的条件:我的优先级更高。强制的要求释放锁,我来获取锁。
循环等待条件:
通过编码的方式解决:堆每一把锁进行编号,要求每个线程申请锁时,如果是申请多把锁,每个线程都必须按照一定的顺序来申请。只能按照锁1,锁2,锁3……的申请,不允许交叉申请。
尽量将资源一次性分配,避免频繁加锁分配资源,导致锁太多而锁被申请数量多,导致易出现死锁的问题。
Linux线程同步
就如同我们前面所讲的抢票,多线程抢票的时候,可能会出现一段时间一直是一个线程抢票(因数很多),该线程因为优先级高,疯狂的申请锁、释放锁。其也没有任何问题,但是其是不合理:
- 频繁申请到资源 —— 导致别的线程申请不到。
- 太过于浪费进程自己的资源和临界区的资源。
频繁申请到资源:
此没有错,但是不合理的行为,导致:别的进程长时间的得不到资源,而导致别的进程饥饿的问题。
就如同食堂排队。有一个人总是由于某些合理的原因,一直处于队首,并且打完饭又立马回到队首继续打饭,甚至直到将饭打完。
所以:
如果只有互斥,就会导致所有的竞争情况为极强的随机性(看调度器),就有可能导致其他线程长时间得不到资源的情况。
太过于浪费进程自己的资源和临界区的资源:
一个进程申请到资源,首先需要做的是,不是访问资源,而是在申请锁之后,先检测资源是否就绪,如果就绪才会有后续操作。
就如同电影院买票,票经过线程抢购售卖,而前提是提供了票。于是忽,票抢完之后需要电影院方再提供票售卖,而此期间线程会持续的申请锁并检测(防止有新票上线),没有票的时候,所有线程还在申请锁、检测、释放锁,做无用功。
此期间线程在进行,临界区提供检测,从而浪费支援。
所以,我们需要引入同步:不是严格意义上的保证临界资源安全的方法,主要为了解决,访问临界资源合理性问题。
再以食堂打饭为例,但是这一次极端一些:
只有一个餐盘,并且只有这一个餐盘才允许打饭(多余要求没有)。
于是,有一个人早饭一过就开始等,下午饭时间一到,立马将餐盘一抢,开始打饭,但是这个人很奇怪,每次打几粒米,然后吃完,将餐盘拿到厨房。然后,厨房将餐盘刚刚洗完放回,这人又饿了,于是立马又拿起来继续跑去打饭(按照规矩这个人也没错,他离餐盘最近,他肯定先抢的到),甚至循环往复将饭打完了,于是后面的人就吃不到饭菜。
于是便有了线程同步,也就是管理员发现不对,就加了一个规矩:吃完的人不能立即,再拿餐盘。后面等的人不能乱等,必须按照前来时间排队。然后如果吃完放回餐盘,想再打饭,必须排在队尾。
意义:让访问线程访问临界资源,具有一定的顺序性。
条件变量
当我们申请临界资源前 -> 先要对临界资源进行是否存在的检测 -> 要做检测的本质:也是访问临界资源。因为:该线程不清楚资源是否被其他线程拿完,大部分资源都是我们申请到锁,但并不代表是有资源的。所以需要使用临界资源,也就代表了需要要先对临界资源进行检测。
结论:对临界资源的检测,也一定是需要在加锁和解锁之间。
常规方式要检测条件就绪,就注定了线程必须频繁的申请和释放锁,使得线程处于一直在申请和释放锁,这样会使得线程一直处于很忙的状态,还不会创造什么价值。
#问:有没有什么办法让线程检测资源处于不就绪的时候?
1. 不要让线程在频繁检测 —— 让其处于等待。
2. 当条件就绪的时候,通知对应的线程,让他来进行资源的申请和访问。
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中,这种情况就需要用到条件变量。
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
系统调用
初始化与destory
1. 对于局部的条件变量初始化利用pthread_cond_init。(也可以用于全局)
#include <pthread.h> // 局部条件变量初始化 int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
参数:cond:要初始化的条件变量attr:局部变量的属性 —— 可以不管,直接设为nullptr。返回值:
- 成功,返回0。
- 失败,返回错误码。
与锁的初始化相同,需要:pthread_cond_destroy。
#include <pthread.h> // 局部条件变量销毁 int pthread_cond_destroy(pthread_cond_t *cond);
参数:
mutex:传入我们定义并初始化的条件变量。
返回值:
- 成功,返回0。
- 失败,返回错误码。
2. 对条件变量,只要是我们定义的是静态或全局,我们可以直接使用PTHREAD_COND_INITIALIZER(宏),进行初始化。
等待条件满足
现在我们需要的不是加锁和解锁,我们需要的是在对应临界区中,资源不就绪的时候,不要频繁的检测,等待即可:pthread_cond_wait。
pthread_cond_wait
#include <pthread.h> // 等待临界资源是否就绪 int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待。
mutex:互斥量(锁)
- mutex就是对应着为了保护数据安全的一把锁,也就是说除了要保证能够同步,还要保持能够互斥。所以参数mutex与参数cond可以说是有渊源的 —— 使用条件变量,必须有一把锁。(为什么?后面具体代码详细解释)
返回值:
pthread_cond_timedwait
(用的少,此文不细致讲解)
#include <pthread.h> // 等待临界资源是否就绪 int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
参数:
前面两个参数与pthread_cond_wait一样。restrict abstime:传入的时间
- 也就是说等的时候,是只能一直的等?不是的,我们可以使用pthread_cond_timedwait,设置一个时间restrict abstime,时间到了线程会自动醒来。
返回值:
唤醒等待
条件满足,对应的线程就要收到通知,也就是唤醒它。所以条件满足的时候,需要向等待的线程发通知:pthread_cond_broadcast、pthread_cond_signal。
pthread_cond_broadcast
#include <pthread.h> // 可能不止一个线程在等资源就绪 —— 将对应等待的线程全部唤醒。 int pthread_cond_broadcast(pthread_cond_t *cond);
参数:
cond:当前在指定条件变量cond上阻塞所有线程。
返回值:
- 成功,返回0。
- 失败,返回错误码。
pthread_cond_signal
#include <pthread.h> // 唤醒指定的一个线程。 int pthread_cond_signal(pthread_cond_t *cond);
参数:
cond:当前在指定条件变量cond上阻塞中的一个线程。
返回值:
- 成功,返回0。
- 失败,返回错误码。
代码使用
(下列代码从基础向上增加代码,以做讲解)
#:先写一个简易的多线程程序
(由于局部的锁与条件变量更难写,所以此处演示局部的)
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#define NUMS 3
typedef void (*func_t)(const std::string &);
class ThreadDate
{
public:
ThreadDate(const std::string name, func_t func) : name_(name), func_(func)
{}
public:
std::string name_;
func_t func_;
};
void func1(const std::string &name)
{
while (true)
{
std::cout << name << " running -- 下载" << std::endl;
usleep(10000);
}
}
void func2(const std::string &name)
{
while (true)
{
std::cout << name << " running -- 播放" << std::endl;
usleep(20000);
}
}
void func3(const std::string &name)
{
while (true)
{
std::cout << name << " running -- 扫码用户信息" << std::endl;
usleep(30000);
}
}
void *Entry(void *args)
{
ThreadDate *td = (ThreadDate *)args; // td在每一个线程自己私有的栈空间中保存
td->func_(td->name_);
delete td;
return nullptr;
}
int main()
{
pthread_t tids[NUMS];
func_t funcs[NUMS] = {func1, func2, func3};
for (int i = 0; i < NUMS; ++i)
{
std::string name = "Thread ";
name += std::to_string(i + 1);
ThreadDate *td = new ThreadDate(name, funcs[i]);
pthread_create(tids + i, nullptr, Entry, (void *)td);
}
for (int i = 0; i < NUMS; ++i)
{
pthread_join(tids[i], nullptr);
std::cout << "Thread " << i + 1 << "-quit-" << std::endl;
}
return 0;
}
#:利用互斥锁和条件变量控制线程,并采用wait等待和signal唤醒
现在想控制线程,想让不同的线程去运行,想唤醒哪个就唤醒哪个:
控制线程:
- 定义一把互斥锁。
- 定义一个条件变量。
随后使用wait与signal:
#include <pthread.h>
// 等待临界资源是否就绪
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
#include <pthread.h>
// 唤醒指定的一个线程。
int pthread_cond_signal(pthread_cond_t *cond);
唤醒的时候,为什么参数传递的是一个条件变量?因为所有程序都在对应的条件变量下等,并且我们根本不关心是哪一个线程,当对应的新线程在对应的条件变量下等,就认为其进行了排队等待。只是当条件变量由 "无效" 变为 "有效" 的时候,从其中挑一个线程,让其去执行特定的任务。(这也就是条件变量的意义)
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#define NUMS 3
typedef void (*func_t)(const std::string& name, pthread_mutex_t* pmtx, pthread_cond_t* cond);
class ThreadDate
{
public:
ThreadDate(const std::string name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
: name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
{}
public:
std::string name_;
func_t func_;
pthread_mutex_t* pmtx_;
pthread_cond_t* pcond_;
};
void func1(const std::string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while (true)
{
pthread_cond_wait(pcond, pmtx);
std::cout << name << " running -- 下载" << std::endl;
}
}
void func2(const std::string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while (true)
{
pthread_cond_wait(pcond, pmtx);
std::cout << name << " running -- 播放" << std::endl;
}
}
void func3(const std::string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while (true)
{
pthread_cond_wait(pcond, pmtx);
std::cout << name << " running -- 扫码用户信息" << std::endl;
}
}
void *Entry(void *args)
{
ThreadDate *td = (ThreadDate *)args; // td在每一个线程自己私有的栈空间中保存
td->func_(td->name_, td->pmtx_, td->pcond_);
delete td;
return nullptr;
}
int main()
{
pthread_mutex_t mtx;
pthread_cond_t cond;
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t tids[NUMS];
func_t funcs[NUMS] = {func1, func2, func3};
for (int i = 0; i < NUMS; ++i)
{
std::string name = "Thread ";
name += std::to_string(i + 1);
ThreadDate *td = new ThreadDate(name, funcs[i], &mtx, &cond);
pthread_create(tids + i, nullptr, Entry, (void *)td);
}
sleep(5);
// 所有线程停止5秒
// 休眠的时候主线程没有继续向后执行
// 新线程一直在执行wait
while(true)
{
std::cout << "resume thread run code ...." << std::endl;
pthread_cond_signal(&cond); // 主线程唤醒一个线程,唤醒哪一个关心,只关心将条件变量由无效设为有效 —— 表征条件就绪
sleep(1);
}
for (int i = 0; i < NUMS; ++i)
{
pthread_join(tids[i], nullptr);
std::cout << "Thread " << i + 1 << "-quit-" << std::endl;
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
我们会发现其是具有一定的顺序的,就是因为这些线程,在条件未满足的时候,所有的线程都会在该条件变量下排队等待。所以主线程对应使用signal一个一个的唤醒线程,就会使得线程的运行是有序的。
将唤醒方式替换:
#include <pthread.h>
// 可能不止一个线程在等资源就绪 —— 将对应等待的线程全部唤醒。
int pthread_cond_broadcast(pthread_cond_t *cond);
写一个执行5次调用(唤醒)线程:
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#define NUMS 3
typedef void (*func_t)(const std::string& name, pthread_mutex_t* pmtx, pthread_cond_t* cond);
volatile bool quit = false;
class ThreadDate
{
public:
ThreadDate(const std::string name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
: name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
{}
public:
std::string name_;
func_t func_;
pthread_mutex_t* pmtx_;
pthread_cond_t* pcond_;
};
void func1(const std::string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while (!quit)
{
pthread_cond_wait(pcond, pmtx);
std::cout << name << " running -- 下载" << std::endl;
}
}
void func2(const std::string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while (!quit)
{
pthread_cond_wait(pcond, pmtx);
std::cout << name << " running -- 播放" << std::endl;
}
}
void func3(const std::string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while (!quit)
{
pthread_cond_wait(pcond, pmtx);
std::cout << name << " running -- 扫码用户信息" << std::endl;
}
}
void *Entry(void *args)
{
ThreadDate *td = (ThreadDate *)args; // td在每一个线程自己私有的栈空间中保存
td->func_(td->name_, td->pmtx_, td->pcond_);
delete td;
return nullptr;
}
int main()
{
pthread_mutex_t mtx;
pthread_cond_t cond;
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t tids[NUMS];
func_t funcs[NUMS] = {func1, func2, func3};
for (int i = 0; i < NUMS; ++i)
{
std::string name = "Thread ";
name += std::to_string(i + 1);
ThreadDate *td = new ThreadDate(name, funcs[i], &mtx, &cond);
pthread_create(tids + i, nullptr, Entry, (void *)td);
}
sleep(5);
// 所有线程停止5秒
// 休眠的时候主线程没有继续向后执行
// 新线程一直在执行wait
int cnt = 5;
while(cnt)
{
std::cout << "resume thread run code ...." << cnt-- << std::endl;
//pthread_cond_signal(&cond); // 主线程唤醒一个线程,唤醒哪一个不用关心,只关心将条件变量由无效设为有效 —— 表征条件就绪
pthread_cond_broadcast(&cond);
sleep(1);
}
quit = true;
for (int i = 0; i < NUMS; ++i)
{
pthread_join(tids[i], nullptr);
std::cout << "Thread " << i + 1 << "-quit-" << std::endl;
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
此时,会发现处于并没线程运行,且并未结束的卡死状态。
线程中wait一定要在加锁和解锁之间进行wait。因为我们wait的原因就是,检测到临界资源条件不就绪,所以线程的做法就是等,需要检测临界资源不就绪,就一定需要在临界区内。
供货商只负责生产,
供货商不研究用户需要什么,只用关心超市想要什么
超市只关心将商品给消费者
提高效率:解耦合
让不同的角色进行逻辑解耦,以此通过超市这样的角色,来进行提高效率