从进程到线程
为什么需要线程?这是因为进程本身存在一定问题:
首先是进程切换时,各类进程资源如寄存器CPU、包括虚拟地址和物理地址要进行映射等等进行上下文切换,这是非常消耗资源和时间的事情,并且实现进程间通信非常的麻烦,所以我们有必要引入一个更加轻量级的“微进程”,这也就是我们要说的线程。
线程基本概念
同进程一样,线程一样是一个正在执行的程序,只不过线程不再是资源分配的基本单位。
一个进程当中可以有多个线程,多个线程可以共享进程的内存资源。
因为线程本身是一个程序,所以每个线程也具有自己的PC啊stack寄存器之类的资源(即线程上下文),假设一个进程里有两个线程,那么这两个线程共享一个(也就是进程的内存空间)地址空间。因为只有一个地址空间,所以只有一个代码段和一个数据段以及一个堆区一个栈区,但是引入了多线程之后,数据段和代码段以及堆区是共享的,但栈区不是,当产生线程切换时为了不影响原先的t1线程的执行,t2线程将会新建一个自己的栈区。虽然不是同一个栈区,但因为位于同一个地址空间且数据段和代码段以及堆区是共享的,所以t1和t2的所有数据都是可以共享的。
虽然内存空间是共享的,但是CPU资源是共享的吗?
CPU调度的单位是什么呢?是进程还是线程?
这有两种说法:即用户级线程和内核级线程。
用户级线程CPU无法感知到,这是进程内部自己进行分配的。而内核级线程则是每一个都是有CPU和操作系统参与的。
所以这就决定了CPU在调度任务时是以线程为单位的,这就意味着我们之前写的进程都是单线程的进程。
注意:在Linux操作系统的设计中,CPU调度要做的事情其实就是把一个task_struct换成另一个task_struct,即每一个线程都有自己单独的task_struct; 注意这里和教科书上的内容不太一样,教科书上说进程有自己的PCB而线程有自己的TCB,但是在Linux的具体实现当中PCB和TCB合二为一就叫task_struct了。
那我们怎么判断一个task_struct是进程还是线程呢?Linus本人是这么说的:task_struct是以线程为单位的 ,如果这个线程独享一个地址空间,那它实际上就是个进程,如果多个task_struct是共享同一个地址空间的,那么它们就是属于同一个进程的不同线程。
在Linux中线程不能脱离进程而单独存在,因为进程是内存资源分配的基本单位,所以我们所使用的线程永远是先有进程,得到了地址空间之后再创建线程。
那么我们来看一下引入线程之后有没有解决之前进程存在的缺陷:
很明显在这样的模式下系统性能是能够得到显著提升的。
可以通过proc目录下的PID号文件目录下的task文件夹下看一个程序是多线程还是单线程的:
创建线程:pthread_create函数
首先要明确,在我们一个程序执行时本身就存在一个线程,我们称其为主线程,我们说创建线程的含义其实就是在创建子线程。
然后主线程是从main函数开始调用的,这也就是为什么我们的C语言程序都是从main函数开始压栈。
那我们就能够在主线程当中调用pthread_create函数来创建子线程,来看一下这个函数:
来看一下该函数的一些参数:
pthread_self获取本线程的线程ID
写个代码来简单测试:
1 #include <43func.h>
2
3 //这个就是线程启动函数,即子线程的入口函数
4 void* threadFunc(void* arg){
5 printf("I am child thread: 子线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
6 }
7
8 int main(){
9 printf("I am main thread: 主线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
10 pthread_t tid;//pthread_t是线程ID的类型
11 //线程创建函数从左到右的参数:线程ID,线程属性(为NULL表示默认属性),线程启动函数,传递给线程启动函数的参数
12 pthread_create(&tid,NULL,threadFunc,NULL);
13 sleep(1);
14 return 0;
15 }
编译运行,注意,在gcc条件下编译多线程程序链接时需要带上参数-phread,否则链接报错(这在man手册中有明确提及,不过我这里可能是版本比较高编译器比较智能了所以不加也一样可以):
线程之间是并发执行的。
我们可以把睡眠一秒给注释掉,看会发生什么事情:
可以看见不管执行几次,都只会打印主线程里面的内容,而子线程不会打印。为什么?
因为主线程终止了,那么意味着进程就没了,那进程都没了子线程肯定就都没了呀,进程是线程存在的基础!
这与之前讲进程的时候不同,因为进程与进程之间又不共享内存资源,所以哪怕父进程挂了那么子进程依然可以逍遥快活只不过变成了孤儿进程罢了。线程则不然,这个区别要注意。
小插曲,了解即可:
其实就是当我们把睡眠时间改小了之后可能会存在多次打印的情况,这是由于return 0时stdout标准输出内的打印内容还未清空就被再次输出导致的,结合文件系统相关的内容分析即可,注意一下就行。
如果想避免这样的事情发生,那么用_exit(0);代替 return 0来终止程序即可。
线程库的历史
线程库其实一开始是有两个版本的,一个是LinuxThreads,这是Intel公司发明的;另一个是NPTL,是红帽公司发明的,也就是我们现在使用的线程库(从内核2.6版本之后)。
这两个库最开始的线程都是用户级线程,但因为用户级线程是没有系统参与的,所以后来就把NPTL的用户级线程1:1映射到了内核级线程中,每个用户级线程都对应一个内核级线程,所以现在用起来就和内核级线程一样。
这就是个小故事,以后吹水可以用到~
多线程环境下处理报错
首先要清楚,多线程环境下不能使用perror,为什么不能用?举个例子:
比如open函数是可能报错的,根据其函数返回值来判断是否报错:
这个errorno是存在全局变量中的,看上去该函数只返回了-1,实际上一个典型的报错会做两件事情:
1、返回-1,并且修改全局变量errorno,可以通过查看#include<errorno.h>来查看errorno值。
而perror的原理就是根据该errorno的值去生成一个错误字符串。
因为errorno位于全局变量,即存在在数据段中,但这时候会产生一个问题就是多线程是共享数据段中的数据的,如果此时两个线程同时报错,一个线程先报错之后并发的另外一个进程也报错的话,那这时候后面报错的线程就会把前面报错的进程的错误信息给覆盖掉。
也就是这里会存在一个并发访问的问题。如何解决呢?
我们之前知道每个线程的栈区是独享的,那么我们可以采用栈区上的变量来指示不同的报错类型,所谓的变量就是函数的返回值。
也就是说现在我们报错不再返回-1,而是会返回一个整数,这个整数不等于0,它用来指示报错的类型。
Linux提供了一个函数strerror,它可以用来将给定的数字转换成对应的错误字符串:
但是注意该函数是不安全的(安不安全的以后再说),另外该函数也是无法自己打印错误字符串的,需要我们手动让其打印一下,所以我们去修改一下头文件中错误函数的宏定义:
1 #include <stdio.h>
2 #include <string.h>
3 #include <stdlib.h>
4 #include <sys/stat.h>
5 #include <unistd.h>
6 #include <sys/types.h>
7 #include <dirent.h>
8 #include <pwd.h>
9 #include <grp.h>
10 #include <time.h>
11 #include <fcntl.h>
12 #include <sys/mman.h>
13 #include <signal.h>
14 #include <pthread.h>
15 #include <sys/select.h>
16 #include <sys/time.h>
17 #include <sys/sem.h>
18 #include <sys/msg.h>
19 #include <sys/ipc.h>
20 #include <sys/shm.h>
21 #include <sys/wait.h>
22 #include <syslog.h>
23 #define ARGS_CHECK(argc,num) {if(argc != num){fprintf(stderr,"args error!\n"); return -1;}}
24 #define ERROR_CHECK(ret,num,msg){if(ret == num){perror(msg); return -1;}}
25 //下面是多线程环境下打印错误信息的函数宏定义
26 #define THREAD_ERROR_CHECK(ret,msg){if(ret != 0){fprintf(stderr,"%s: %s\n",msg,strerror(ret));}}
来测试一下,假如我们现在想测试最多可以创建多少个子进程:
1 #include <43func.h>
2
3 //这个就是线程启动函数,即子线程的入口函数
4 void* threadFunc(void* arg){
5 //printf("I am child thread: 子线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
6 while(1){
7 sleep(1);
8 }
9 }
10
11 int main(){
12 //printf("I am main thread: 主线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
13 pthread_t tid;//pthread_t是线程ID的类型
14 //死循环创建子进程,子进程一创建出来就被睡觉
15 for(int i=0;;++i){
16 //线程创建函数从左到右的参数:线程ID,线程属性(为NULL表示默认属性),线程启动函数,传递给线程启动函数的参数
17 int ret = pthread_create(&tid,NULL,threadFunc,NULL);//ret获取创建子进程函数的返回值
18 THREAD_ERROR_CHECK(ret,"pthread_create");//测试错误打印函数是否正常,ret!=0时说明报错
19 if(ret != 0){//报错时看一下我们创建了多少个子进程
20 printf("i = %d\n",i);
21 break;
22 }
23 }
24 //sleep(1);
25 return 0;
26 }
运行如下:
可以看见错误打印函数是正常执行的,并且在当前我的计算机的系统条件下,可以创建4876个子进程。
多线程是共享内存地址空间的
数据段是共享的
之前就提过这个事情,但是我们其实还可以写代码来验证一下:
可以看到主线程和子线程访问这个全局变量global都是200.
堆空间是共享的
要验证这个,我们只要让主线程和子线程共享同一份堆空间上的内存地址即可,那么怎么确保主子线程拿到的内存地址是一样的呢?除了使用全局变量来保存地址的做法之外,还有一种更优雅的做法,就是传递参数(还记得创建线程函数的第四个参数我们填的是NULL吗?),简单测试一下:
深度剖析一下这个过程:
这个方式和我们直接给子线程的启动函数传参有什么区别?
最大的区别就是,直接给子线程的启动函数传参就变成了压栈操作,那就丧失了子线程的栈区独立性,所以肯定不能这么做。
而使用pthread_create函数来传递其参数值就是新开辟了一块空间用来做子线程的栈区,然后在子线程的栈区中拷贝了一份主线程中的整形指针pHeap,这时候主子线程的整形指针都会指向同一块堆空间,这就保证了主子线程拿到的堆空间是一致的没有因为并发问题导致二者拿到的地址不一样,图示如下:
多线程之间如何传递一个整数
如果我们只是想传递一个整数,那么很明显没有必要取地址传地址,我们可以干脆传递一个long类型,这是因为pthread_create函数的第四个参数直接传递八个字节,这八个字节一般都是当作地址来用,现在我们拿来当长整形用的话,int就浪费了,所以不如干脆直接传递一个long类型。
使用方法也很简单:
另外从上面的结果也能看出,我们这个长整型的i是值传递,因为很明显主线程和子线程中打印的i值不同。
小总结
想要在主子线程之间共享数据就用指针类型,想要值传递的话就使用 long 类型。
另外多线程中的栈区是相对独立的,所以一个线程可以通过地址去访问另一个线程的栈区。
线程终止
首先一个进程中的任意一个线程如果进行以下操作:
main return、exit、_exit/_Exit、abort、收到信号等都会造成整个进程终止,那么其中所有线程也将终止。
这样杀死的范围就太广泛了,我们要让子线程只终止自己。
做法如下:
1、从线程启动函数中return,但是这个尽量不要用,后面会说为什么。
2、使用pthread_exit函数来执行子线程终止:
pthread_join函数回收线程资源
在进程中我们使用过wait函数来回收子进程,在线程中也有类似的概念,即pthread_join函数,它用来回收线程资源。
对于该函数的第二个参数而言,详细剖析一下为啥是这样的void**类型:
假设此时有一个main线程和一个子线程t1,t1此时终止了那么会返回一个void*类型的返回值,该返回值包含了子线程终止的状态信息,主线程肯定要把该返回值拿到不然咋进行回收,所以会将该返回值拷贝到主线程的栈帧中,为什么不用指针指而是用拷贝呢?这是因为子线程终止说明其内部数据很快就可能被消去了,此时用指针就会有风险。继续我们刚刚说的,返回值拷贝到主线程 的栈帧中肯定会调用pthread_join函数(因为要回收子线程资源那肯定是pthread_join函数来做啦),但是能直接通过值传递的形式把retval返回吗?肯定不行,如果被调函数希望影响到主调函数栈帧里面的数据应该传递地址,并且在被调函数中采用间接访问的方式去进行修改。
看代码应该会更清晰一点:
因为void*类型占八个字节,就像我们之前说的,其实使用long类型的八个字节来接收一样可以,所以上图使用long类型的retval来接收信息。
我们也可以直接申请void* 类型的变量来存retval:
如果不希望获取子线程的终止信息,可以直接填NULL:
线程的取消(杀死线程)
首先要注意,在多线程中杀死线程是不能使用信号的。
因为多个线程会共享进程注册信号的信息,用户无法得知是哪个线程在递送信号,处理起来会非常棘手,所以干脆就不让在多线程环境下使用信号了。
线程的取消行为类似于信号,使用的函数是pthread_cancel:
该函数给另一个线程发送取消请求,注意请求有可能成功有可能不成功。
取消点
发送请求之后另一个线程收到请求之后会怎么样?
它不一定会立刻终止,要看情况,当其收到取消请求之后,它不一定会马上终止,而是将自己的取消标志位置为1,然后继续运行,直到运行到一些特别函数的开始或者结束,要么在这些特别函数的开始终止线程要么就在结束的时候终止线程,这些特别的函数称为“取消点”。
取消点有哪些,使用man 7 pthreads命令查看:
这些取消点都是系统调用或者库函数。
操作文件的、可能引发阻塞的,包括我们的printf函数,这些都是取消点,是会引发线程终止的函数。
但是有一个后面会聊到的函数是会引发阻塞的但它并不是取消点,这个函数就是pthread_mutex_lock加锁函数。
我们来测试一下取消点这个东西:
可以看见我们的线程函数明明返回的是0但是显示的是-1,这是因为tid如果是被cancel掉的,那么retval的值就是-1.
理论上的执行流程如下:
左边是主线程,右边是子线程,结合我们之前说的内容应该很好理解。
我们可以使用ps -elLf来查看线程状态,该命令可以展示所有的进程和线程状态:
其中的LWP的意思就是:Light Weight Process轻量级进程,即线程。
pthread_testcancel函数:取消请求异步杀死线程
使用系统提供的那些取消点来杀死线程有点low,比如手动打印个printf来杀死线程很low,所以有一个专门的函数来作我们手动的取消点用来专门杀死线程,这个函数就是pthread_testcancel:
也就是当取消标志位为真时,该函数就执行取消(杀死)线程行为。
测试一下,手动增加取消点函数pthread_testcancel:
pthread_cancel的流程总结
注意取消点函数调用前会终止线程,而终止线程会导致一个问题:异步终止线程导致资源泄露。
举个例子:
假如现在有个子进程,先malloc(1)再malloc(2)再free(2)再free(1),此时主线程很有可能在这四个阶段中任何一个阶段发送cancel请求:
如果是上图中第二种清空发送cancel请求的话那么内存就泄露了1个字节,第3种情况的话内存就泄露了三个字节,只有第一种和第五种是安全的。
即我们所说的异步终止可能引发资源泄露问题,原因就是我们无法线程会在什么时候被终止。
那么我们自然也是有对应的处理机制来处理这个问题的,在运行到取消点和终止线程这两步之间会运行线程终止清理函数:
虽然有这个机制的存在,但依然有棘手的问题无法解决,就是我们怎么知道线程里面申请了多少资源;我们需要一个方案,根据终止之前申请了多少资源我们在终止的那个时刻就根据我们申请资源的数量去调用对应的清理函数。
这个方案就是采用栈结构来进行管理。
资源清理栈
引入两个函数:
别整迷糊了,资源清理栈的作用是让线程自动根据申请了多少资源,就释放掉多少资源。
观察资源清理函数:pthread_cleanup_push可以看到,第一个参数其实就是清理函数,第二个参数voidarg是传递给清理函数的参数,但此时有个问题,清理函数的参数的类型是void,free函数是没有问题的但close这种呢?
所以我们需要对close这种缺乏void* 类型的函数进行一个包装:
简单写代码来测试一下:
1 #include <43func.h>
2
3 //清理函数1,该函数用来清理p1资源
4 void clean1(void* arg){
5 printf("I am clean1\n");
6 free(arg);
7 }
8
9 //清理函数2,该函数用来清理p2资源
10 void clean2(void* arg){
11 printf("I am clean2\n");
12 free(arg);
13 }
14
15 //这个就是线程启动函数,即子线程的入口函数
16 void* threadFunc(void* arg){
17 printf("I am child thread: 子线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
18 //子进程申请8个字节的堆空间资源
19 int* p1 = malloc(4);//申请资源后马上将对应的清理行为压栈
20 pthread_cleanup_push(clean1,p1);
21 int* p2 = malloc(4);
22 pthread_cleanup_push(clean2,p2);
23 pthread_cleanup_pop(1);//参数设置为1表示会主动调用清理函数,为0则不会主动调用
24 pthread_cleanup_pop(1);
25 }
26
27 int main(){
28 printf("I am main thread: 主线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
29 pthread_t tid;//pthread_t是线程ID的类型
30 //线程创建函数从左到右的参数:线程ID,线程属性(为NULL表示默认属性),线程启动函数,传递给线程启动函数的参数
31 int ret = pthread_create(&tid,NULL,threadFunc,NULL);
32 THREAD_ERROR_CHECK(ret,"pthread_create");
33 pthread_join(tid,NULL);//回收子线程资源,第二个参数为NULL表示不获取子线程终止状态
34
35 }
编译运行:
可以看见是正常状态,没有内存泄露的情况。
接下来我们让线程自动退出:
1 #include <43func.h>
2
3 //清理函数1,该函数用来清理p1资源
4 void clean1(void* arg){
5 printf("I am clean1\n");
6 free(arg);
7 }
8
9 //清理函数2,该函数用来清理p2资源
10 void clean2(void* arg){
11 printf("I am clean2\n");
12 free(arg);
13 }
14
15 //这个就是线程启动函数,即子线程的入口函数
16 void* threadFunc(void* arg){
17 printf("I am child thread: 子线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
18 //子进程申请8个字节的堆空间资源
19 int* p1 = malloc(4);//申请资源后马上将对应的清理行为压栈
20 pthread_cleanup_push(clean1,p1);
21 pthread_exit(NULL);//我们让线程在这里终止
22 int* p2 = malloc(4);
23 pthread_cleanup_push(clean2,p2);
24 pthread_cleanup_pop(1);//参数设置为1表示会主动调用清理函数,为0则不会主动调用
25 pthread_cleanup_pop(1);
26 }
27
28 int main(){
29 printf("I am main thread: 主线程 ,tid = %lu\n",pthread_self());//通过pthread_self函数拿到本线程的线程id
30 pthread_t tid;//pthread_t是线程ID的类型
31 //线程创建函数从左到右的参数:线程ID,线程属性(为NULL表示默认属性),线程启动函数,传递给线程启动函数的参数
32 int ret = pthread_create(&tid,NULL,threadFunc,NULL);
33 THREAD_ERROR_CHECK(ret,"pthread_create");
34 pthread_join(tid,NULL);//回收子线程资源,第二个参数为NULL表示不获取子线程终止状态
35
36 }
编译运行:
可以看见这一次我们让其在p1资源申请完之后就压栈之后就直接终止了子线程,但是clean1清理函数一样执行了,说明该函数是非常智能的替我们释放掉了内存资源。
注意:只有使用pthread_exit和pthread_cancel导致的线程终止才会调用资源清理函数,在线程入口函数中使用return不会调用清理函数。
最好统一使用pthread_exit。
另外Linux中规定,该函数push和pop必须成对出现,而且必须在同一个作用域中。
这是因为在Linux源码实现中,该函数push和pop相当于是do-while结构的关系:
push对应的是do,pop对应的while。
这也就说明push和pop必须在同一个作用域中成对出现,另外如下定义变量也是错误的:
这是因为 i 的声明是在push-pop对应的do-while的花括号中的,所以离开了pop的作用域定义就自然出问题了。
总结
线程当中的竞争条件问题
来看一个在进程章节当中提过的问题,假设现在定义一个全局变量10000000,我们让主线程和子线程都分别对同一个count执行自增操作,循环自增条件为小于该全局变量,正常情况下最后count值应该为20000000,代码如下:
#include <43func.h>
#define NUM 10000000
void * threadFunc(void *arg){
int *pCount = (int *)arg;
for(int i = 0; i < NUM; ++i){
++*pCount;
}
}
int main(){
pthread_t tid;
int count = 0;
int ret = pthread_create(&tid,NULL,threadFunc,(void *)&count);
THREAD_ERROR_CHECK(ret,"pthread_create");
for(int i = 0; i < NUM; ++i){
++count;
}
pthread_join(tid,NULL);
printf("count = %d\n", count);
}
我们运行之后结果有时小于20000000,有时等于20000000:
原因很简单,就是因为这个自增操作在汇编语言中,依然是分为三步来实现的,而多线程环境下极有可能一个线程中count自增的三步操作才做完了第一步就被另一个线程给直接拦腰截断了,所以导致了最终结果的异常:
解决办法也很简单,就是使用互斥机制来解决。
mutex 互斥锁
使用命令man pthread_mutex_init可以看到关于mutex锁的描述,如果无法使用man手册查找的,可以见如下命令安装一下:
(使用云服务器的哥们儿们真的得注意一下,我感觉可能是云服务器的配置问题,有些内容和本地的虚拟机系统非常不一样,包括现在这个mutex的事情,将就着用的,还是以老师的为准)
对于互斥锁的操作汇总如下:
我们可以简单测试一下初始化锁和销毁锁:
1 #include <43func.h>
2
3 int main(){
4 //创建一个锁,静态初始化
5 //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 这一句和下面的初始化锁效果相同
6 pthread_mutex_t mutex;//动态初始化锁,比较常用
7 pthread_mutex_init(&mutex,NULL);//如果第二个参数是NULL,表示以默认形式初始化锁
8 //销毁一个锁
9 pthread_mutex_destroy(&mutex);
10
11 }
当然此时运行不会有什么结果,有了锁机制之后我们可以来改写之前存在竞争条件异常的多线程程序了,首先我们想到,因为主子线程都要打印count变量,那么count肯定是共享的,此时要上锁,那么这个锁肯定也是共享的,但我们创建线程的函数只能传递一个void*参数,怎么办?当然是封装啦,将这两个要传递的变量封装在一个结构体中一起传递就可以:
1 #include <43func.h>
2
3 #define NUM 10000000
4
5 //封装共享资源
6 typedef struct shareRes_s{
7 pthread_mutex_t mutex;//锁资源
8 int count; //要自增的变量资源
9 }shareRes_t;
10
11 //子线程入口函数
12 void* threadFunc(void* arg){
13 shareRes_t * pShareRes = (shareRes_t *) arg;
14 for(int i=0; i<NUM;i++){
15 pthread_mutex_lock(&pShareRes->mutex);//在自增操作前加锁
16 ++pShareRes->count;
17 pthread_mutex_unlock(&pShareRes->mutex);//在自增操作结束后解锁
18 }
19 }
21 int main(){
22 //创建一个锁,静态初始化
23 //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 这一句和下面的初始化锁效果相同
24 //pthread_mutex_t mutex;//动态初始化锁,比较常用
25 //pthread_mutex_init(&mutex,NULL);//如果第二个参数是NULL,表示以默认形式初始化锁
26 //销毁一个锁
27 //pthread_mutex_destroy(&mutex);
28
29 shareRes_t shareRes;//创建共享资源的封装对象
30 shareRes.count = 0;//初始化共享资源count
31 pthread_mutex_init(&shareRes.mutex,NULL); //初始化共享资源mutex锁
32
33 pthread_t tid; //线程号
34 int ret = pthread_create(&tid,NULL,threadFunc,(void*)&shareRes); //创建子线程
35 THREAD_ERROR_CHECK(ret,"pthread_create");//检查报错
36 for(int i=0;i<NUM;i++){
37 pthread_mutex_lock(&shareRes.mutex);//自增前加锁
38 ++shareRes.count;
39 pthread_mutex_unlock(&shareRes.mutex);//自增之后解锁
40 }
41 pthread_join(tid,NULL);//回收子线程资源
42 printf("count = %d\n",shareRes.count);//打印结果
43 pthread_mutex_destroy(&shareRes.mutex);//销毁锁
44 }
编译运行:
此时可以看见不论测试多少次都能够准确无误的拿到20000000的值。
线程死锁问题分析与解决
除了上图中的这两种,还有第三种,就是一个线程对同一把锁加锁了两次:
1 #include <43func.h>
2
3 int main(){
4 pthread_mutex_t mutex;//创建锁
5 pthread_mutex_init(&mutex,NULL);//初始化锁
6
7 pthread_mutex_lock(&mutex); //第一次加锁
8 puts("lock one time!");
9
10
11 pthread_mutex_lock(&mutex); //第二次加锁
12 puts("lock two time!");
13
14 pthread_mutex_destroy(&mutex);//销毁锁
15 }
运行:
可以看见已经被阻塞致死锁了,因为第二次加锁时该锁已经被第一次加锁时给占用了,第一次加的锁又没释放,那么第二次就只能陷入阻塞一直等待。
可以新开一个终端来查看一下该线程阻塞在哪里:
可以看见上面有个futex_,这个futex_是我们操作系统底层使用的锁,我们自己使用的mutex是用户态的,而内核用的就是这个futex_,实际上mutex底层使用的就是futex_。
mutex底层实现的原理
实际上锁的类型有三种,前文讲的mutex是三种里面的互斥睡眠锁。
第二种是rwlock,读写锁。该锁的使用场景主要是当有线程在写的时候,那么加上写锁之后别的线程就不能读了;当有线程在读的时候很明显读并不影响其它线程一起读,但是别的线程不能写,此时就是加的读锁。
第三种是pthread_spin,自旋锁。其和mutex锁差不多,用途一样,只不过有一个区别:当mutex加锁时不满足条件的话线程会陷入睡眠状态,而自旋锁不会,它会一直处于while(1)死循环。这样做的区别是睡眠不占用CPU(阻塞态),而死循环要占用CPU(运行态)。这就导致了自旋锁的缺陷:如果是单核CPU的话使用自旋锁就可能陷入一直死循环的状态,除非有外力打破该状态。但好处也是显而易见的,比如虽然某一时刻线程去访问某段资源的锁状态是加锁的,但很有可能它一会儿就释放锁该线程就能够访问其资源了,如果是睡眠的话就需要进行上下文的切换(需要从运行态转为阻塞态),这需要消耗大量资源和时间,而如果是自旋锁的话就会一直保持运行态到该资源解锁,这能大大提升资源利用率和时间利用率。
但工作上用的最多的就是睡眠互斥锁mutex。
解决第三种死锁
上策
不写这种代码。
中策
使用非阻塞加锁:pthread_mutex_trylock
该锁比较智能,如果用其加锁时是未加锁状态,则trylock会加锁;若是已加锁状态,则trylock会立刻返回。
用该锁来重写一下我们上面的程序:
1 #include <43func.h>
2
3 int main(){
4 pthread_mutex_t mutex;//创建锁
5 pthread_mutex_init(&mutex,NULL);//初始化锁
6
7 int ret = pthread_mutex_trylock(&mutex); //第一次加锁,使用trylock非阻塞加锁
8 THREAD_ERROR_CHECK(ret,"pthread_mutex_trylock 1");
9 puts("lock one time!");
10
11
12 ret = pthread_mutex_trylock(&mutex); //第二次加锁
13 THREAD_ERROR_CHECK(ret,"pthread_mutex_trylock 2");
14 puts("lock two time!");
15
16 pthread_mutex_destroy(&mutex);//销毁锁
17 }
运行结果:
可以看见使用非阻塞加锁之后,如果发现已经是加锁状态那么该加锁函数就会立即返回报一个错误而不去影响后面代码的执行。
使用while循环+trylock可以实现一个自旋锁。
活锁:假如上图两个线程一个锁住了A另一个锁住了B,那两个线程互相谦让一起退一步就又回到了最开始的状态,在时间上卡点卡的特别准的情况下,那就陷入了一种活锁状态,两个线程都无法继续往前推进。
但其实这个问题解决也很简单,只要这两个线程并发的时间稍微不那么同步一点就能解决,比如让其中一个线程执行完操作之后sleep随机一段时间。
下策
修改锁的属性,就是在初始化锁的时候我们经常填NULL 的位置,对那儿进行修改即可:
测试这些属性:
重点说一下可重入锁的一些性质:
使用可重入锁时,若一个线程对同一把锁重复加锁,程序既不会报错也不会死锁,只会增加锁的引用计数,这也就意味着加了几次锁就得解开几次锁,并且其它线程只有在锁的引用计数为0的情况下才能继续加锁。
用锁实现火车票卖票问题
问题描述:
假设现在有2000张票,有两个售票窗口w1和w2,然后两个窗口同时进行售票,直到票卖完为止。
代码实现:
1 #include <43func.h>
2
3 //共享资源封装
4 typedef struct shareRes_s{
5 int trainTicket; //火车票资源
6 pthread_mutex_t mutex; //锁资源
7 }shareRes_t;
8
9 //子线程1入口函数,即售票窗口1
10 void* sellTicket1(void* arg){
11 shareRes_t* pShareRes = (shareRes_t*) arg;
12 //开始售票
13 //加锁
14 pthread_mutex_lock(&pShareRes->mutex);
15 while(pShareRes->trainTicket > 0){
16 printf("before win1 sell ticket, num = %d \n",pShareRes->trainTicket);
17 --pShareRes->trainTicket;
18 printf("after win1 sell ticket, num = %d \n",pShareRes->trainTicket);
19 }
20 //解锁
21 pthread_mutex_unlock(&pShareRes->mutex);
22 }
23
24 //子线程2入口函数,即售票窗口2
25 void* sellTicket2(void* arg){
26 shareRes_t* pShareRes = (shareRes_t*) arg;
27 //开始售票
28 //加锁
29 pthread_mutex_lock(&pShareRes->mutex);
30 while(pShareRes->trainTicket > 0){
31 printf("before win2 sell ticket, num = %d \n",pShareRes->trainTicket);
32 --pShareRes->trainTicket;
33 printf("after win2 sell ticket, num = %d \n",pShareRes->trainTicket);
34 }
35 //解锁
36 pthread_mutex_unlock(&pShareRes->mutex);
37 }
38
39 int main(){
40
41 //创建共享资源
42 shareRes_t shareRes;
43 shareRes.trainTicket = 2000; //初始化二十张电影票
44 pthread_mutex_init(&shareRes.mutex,NULL);//初始化锁
45 pthread_t tid1,tid2;//创建两个线程,模拟两个售票窗口
46
47 //创建子线程1
48 pthread_create(&tid1,NULL,sellTicket1,(void*)&shareRes);
49 //回收子线程1的资源
50 pthread_join(tid1,NULL);
51
52
53 //创建子线程2
54 pthread_create(&tid2,NULL,sellTicket2,(void*)&shareRes);
55 //回收子线程1的资源
56 pthread_join(tid2,NULL);
57
58
59 //销毁锁
60 pthread_mutex_destroy(&shareRes.mutex);
61
62 }
结果如下:
在进行这种加锁解锁问题的时候,千万要判断好临界区是哪一段代码,比如这个程序中如果加解锁没有包括while里面的判断的话就会出问题,会出现余额为负数的情况。
同步
所谓同步,意思就是事件的执行顺序是固定的。
我们若使用mutex来实现这种同步机制的话,会存在一些问题。
假如现在有线程t1它会执行A操作,有线程t2会执行B操作,我们要做的事情是让A操作永远在B操作之前执行。
我们可以弄一个标志位flag(一开始为false),用来表示A是否完成,在t1线程中若执行了A那么紧接着就要将flag置为true,在t2线程中要死循环去判断flag是否为真再来判断B操作是否要执行:
有可能出现的一种情况是在并发状态下可能 t2 线程先执行,但因为flag为false所以直接就死循环了。
即完成后一个事件的线程,有极大可能CPU会空转一段时间来等待另一个线程完成前一个事件。
这很明显会浪费很多资源,所以我们选择另外一种更节省资源的做法,我们想要的效果是对于t2线程,如果t1线程没有完成操作A,我们就让t2进入一种阻塞状态(这种状态下并不占用CPU,节省资源),这就是条件变量的底层逻辑。
条件变量
条件变量专门用来实现事件的同步。
注意图中的signal并非信号,这里只是用它来表示一个消息,t1传递给t2的消息告知t2可以开始执行B操作了。
使用条件变量的步骤:
1、设计一个条件,这个条件决定了本线程是否要等待
2、如果不满足,调用wait会使本线程陷入等待
3、此时,另外的线程会运行,直到将条件改成满足,通知(signal)阻塞的线程恢复就绪状态
条件变量接口
接口详解:
简单测试一下这些接口:
1 #include <43func.h>
2
3 //共享资源的封装
4 typedef struct shareRes_s{
5 int flag; //条件
6 pthread_cond_t cond; //条件变量
7 pthread_mutex_t mutex; //条件变量必须配合锁一起使用
8 } shareRes_t;
9
10
11 //线程启动函数
12 void* threadFunc(void* arg){
13 shareRes_t* pShareRes = (shareRes_t*)arg;
14
15 //先加锁
16 pthread_mutex_lock(&pShareRes->mutex);
17 //只有在加锁的状态下,才能够使用wait
18 pthread_cond_wait(&pShareRes->cond,&pShareRes->mutex);
19 //如果从wait状态中被唤醒的话,说明前置事件已经完成了
20 puts("world");
21 //记得解锁
22 pthread_mutex_unlock(&pShareRes->mutex);
23 //终止线程
24 pthread_exit(NULL);
25 }
26
27 int main(){
28
29 //初始化共享资源
30 shareRes_t shareRes;
31 pthread_mutex_init(&shareRes.mutex,NULL);
32 pthread_cond_init(&shareRes.cond,NULL);
33
34 pthread_t tid; //子线程id
35 pthread_create(&tid,NULL,threadFunc,(void*)&shareRes);//创建线程
36
37 sleep(3);//现在哪怕是让主线程先睡三秒,子线程也不可能先在主线程之前打印world
38 //主线程先执行事件A:打印Hello
39 puts("Hello");//在hello之后再唤醒子线程
40 pthread_cond_signal(&shareRes.cond);//唤醒子线程
41
42 pthread_join(tid,NULL); //回收线程
43
44
45 pthread_cond_destroy(&shareRes.cond);
46 pthread_mutex_destroy(&shareRes.mutex);
47
48 }
编译运行:
可以发现先后顺序是一定的。
观察上面的程序可以发现我们还没有用到flag变量,其实flag变量的作用是用来标志是否需要进入wait状态用的,这样可以提高资源利用率,代码改变如下:
1 #include <43func.h>
2
3 //共享资源的封装
4 typedef struct shareRes_s{
5 int flag; //条件
6 pthread_cond_t cond; //条件变量
7 pthread_mutex_t mutex; //条件变量必须配合锁一起使用
8 } shareRes_t;
9
10
11 //线程启动函数
12 void* threadFunc(void* arg){
13 shareRes_t* pShareRes = (shareRes_t*)arg;
14
15 //先加锁
16 pthread_mutex_lock(&pShareRes->mutex);
17 //只有在加锁的状态下,才能够使用wait
18 if(pShareRes->flag == 0){//flag标志为0时才需要等待
19 pthread_cond_wait(&pShareRes->cond,&pShareRes->mutex);
20 }
21 //如果从wait状态中被唤醒的话,说明前置事件已经完成了
22 puts("world");
23 //记得解锁
24 pthread_mutex_unlock(&pShareRes->mutex);
25 //终止线程
26 pthread_exit(NULL);
27 }
28
29 int main(){
30
31 //初始化共享资源
32 shareRes_t shareRes;
33 pthread_mutex_init(&shareRes.mutex,NULL);
34 pthread_cond_init(&shareRes.cond,NULL);
35 shareRes.flag = 0; //表示子线程需要等待
36
37 pthread_t tid; //子线程id
38 pthread_create(&tid,NULL,threadFunc,(void*)&shareRes);//创建线程
39
40 sleep(3);//现在哪怕是让主线程先睡三秒,子线程也不可能先在主线程之前打印world
41 //主线程先执行事件A:打印Hello
42 puts("Hello");//在hello之后再唤醒子线程
43 shareRes.flag = 1; //当前置事件完成之后,标志flag置为1表示子线程此时不需要等待
44 pthread_cond_signal(&shareRes.cond);//唤醒子线程
45
46 pthread_join(tid,NULL); //回收线程
47
48
49 pthread_cond_destroy(&shareRes.cond);
50 pthread_mutex_destroy(&shareRes.mutex);
51
52 }
所以现在可以解释为什么条件变量需要配合锁来使用了,这是因为除了条件变量是共享的,与之相伴的某种状态也是共享的(比如上面代码中的flag),那么在判断flag是否等于0的这里在并发条件下没有加锁的话就很容易出现问题,所以良好的编码规范是使用条件变量的时候必须配合锁使用:
我们总是希望检查状态和陷入等待这两者是一个原子操作。
pthread_cond_wait的内部实现(非常重要)
首先该函数会判断有没有加锁,有加锁才有继续往下执行的价值否则就直接抛出异常。然后有锁的话往下执行该函数会将调用自己的线程加入待唤醒队列,因为同一时刻可能有很多线程在等待同一个signal,但我们又不可能直接把所有的线程都给唤醒,要有个先来后到一个个的去挨个唤醒才行,然后就解锁将自己陷入阻塞状态(解锁和陷入是原子操作),这是该函数前半程做的事情。
后半程则表示在收到signal醒来之后,第一件事情是让自己处于就绪状态,然后加锁,但此时这个锁不一定在调用该函数的线程手上,所以有可能加锁这个操作会一直阻塞直到有线程将该锁释放,调用了该函数的线程会在持有了锁之后再继续运行。
这就是pthread_cond_wait的内部实现过程,感觉不好理解,等看看理论书籍再来详细说说。
补充一下,在signal之时,应有线程已调wait而阻塞,否则这个signal就丢了。
另外在之前的代码中我们只会wait操作进行了加锁解锁操作,但实际上是我偷懒了,因为signal操作也是必须要进行加锁解锁的:
并且和wait一样,最好是直接加在状态变量改变的后面,这样程序更安全。
条件变量的惯用法
其实基本上就和我们之前写的示例程序差不多,一样的,这是写这类多线程程序时使用条件变量的惯用手法。
使用条件变量实现售票放票问题
这个问题我们要设计的比之前的生产者消费者问题要稍微复杂点,如下:
依然是在卖火车票,有一个放票部门和两个售票窗口,一开始有20张票,当售票窗口将20张票卖完之后放票部门再继续加票10张
分析可知道,放票部门需要wait,而售票窗口需要signal,来看代码实现:
1 #include <43func.h>
2
3 //共享资源
4 typedef struct shareRes_s{
5 int trainTicket;
6 int flag;//是否要放票
7 pthread_mutex_t mutex; //锁
8 pthread_cond_t cond; //条件变量
9 }shareRes_t;
10
11 //线程入口函数:售票窗口1
12 void* sellTicket1(void* arg){
13 sleep(1);//确保addTicket能够持有锁并wait,因为wait操作必须先执行
14 shareRes_t* pShareRes = (shareRes_t*) arg;
15 while(1){
16 sleep(1);//避免打印太快太多看不清
17 pthread_mutex_lock(&pShareRes->mutex);
18 if(pShareRes->trainTicket > 0){
19 printf("before win1 sell,ticket = %d\n",pShareRes->trainTicket);
20 --pShareRes->trainTicket;
21 printf("after win1 sell,ticket = %d\n",pShareRes->trainTicket);
22 if(pShareRes->trainTicket == 0 && pShareRes->flag == 0){
23 //如果卖完票之后,没有票了,而且从来没放过票
24 //那就唤醒放票线程进行放票
25 pthread_cond_signal(&pShareRes->cond);
26 pShareRes->flag = 1;
27 }
28 else if(pShareRes->trainTicket == 0 && pShareRes->flag != 0){
29 //如果票卖完而且已经放过票了
30 //那就结束循环并退出线程,退出之前记得要释放锁嗷
31 pthread_mutex_unlock(&pShareRes->mutex);
32 break;
33 }
34 }
35 else {
36 pthread_mutex_unlock(&pShareRes->mutex);
37 break;
38 }
39 pthread_mutex_unlock(&pShareRes->mutex);
40 }
41 pthread_exit(NULL);
42 }
43
44 //线程入口函数:售票窗口2
45 void* sellTicket2(void* arg){
46 sleep(1);//确保addTicket能够持有锁并wait,因为wait操作必须先执行
47 shareRes_t* pShareRes = (shareRes_t*) arg;
48 while(1){
49 sleep(1);//避免卖太快看不清
50 pthread_mutex_lock(&pShareRes->mutex);
51 if(pShareRes->trainTicket > 0){
52 printf("before win2 sell,ticket = %d\n",pShareRes->trainTicket);
53 --pShareRes->trainTicket;
54 printf("after win2 sell,ticket = %d\n",pShareRes->trainTicket);
55 if(pShareRes->trainTicket == 0 && pShareRes->flag == 0){
56 //如果卖完票之后,没有票了,而且从来没放过票
57 //那就唤醒放票线程进行放票
58 pthread_cond_signal(&pShareRes->cond);
59 pShareRes->flag = 1;
60 }
61 else if(pShareRes->trainTicket == 0 && pShareRes->flag != 0){
62 //如果票卖完而且已经放过票了
63 //那就结束循环并退出线程,退出之前记得要释放锁嗷
64 pthread_mutex_unlock(&pShareRes->mutex);
65 break;
66 }
67 }
68 else {
69 pthread_mutex_unlock(&pShareRes->mutex);
70 break;
71 }
72 pthread_mutex_unlock(&pShareRes->mutex);
73 }
74 }
75
76 //线程入口函数:放票部门
77 void* addTicket(void* arg){
78 shareRes_t* pShareRes = (shareRes_t*)arg;
79 //加锁
80 pthread_mutex_lock(&pShareRes->mutex);
81 if(pShareRes->flag != 1){//说明现在不需要放票,就等待即可
82 pthread_cond_wait(&pShareRes->cond,&pShareRes->mutex);
83 }
84 printf("add ticket!\n");
85 pShareRes->trainTicket = 10;
86 //解锁
87 pthread_mutex_unlock(&pShareRes->mutex);
88 pthread_exit(NULL); //退出线程
89 }
90
91
92 int main(){
93
94 shareRes_t shareRes;
95 shareRes.trainTicket = 20; //初始票的数量是20
96 shareRes.flag = 0; //最开始不需要放票
97 pthread_mutex_init(&shareRes.mutex,NULL);
98 pthread_cond_init(&shareRes.cond,NULL);
99
100 pthread_t tid1,tid2,tid3; //三个线程,一个放票两个卖票
101 pthread_create(&tid1,NULL,sellTicket1,(void*)&shareRes);
102 pthread_create(&tid2,NULL,sellTicket2,(void*)&shareRes);
103 pthread_create(&tid3,NULL,addTicket,(void*)&shareRes);
104 pthread_join(tid1,NULL);
105 pthread_join(tid2,NULL);
106 pthread_join(tid3,NULL);
107
108 pthread_mutex_destroy(&shareRes.mutex);
109 pthread_cond_destroy(&shareRes.cond);
110
111 }
编译运行:
使用条件变量实现生产者消费者问题
这问题OS里应该都讲过,这里不再详述是什么了:
代码实现:
1 #include <43func.h>
2
3 #define NUM 10
4
5 //循环队列模拟生产者消费者问题中的商品队列
6 typedef struct queue_s{
7 int elem[NUM];
8 int size;
9 int front;//front表示队首下标
10 int rear;//rear表示下一个要入队的元素的下标
11 }queue_t;
12
13 //共享资源
14 typedef struct shareRes_s{
15 queue_t queue;
16 pthread_mutex_t mutex;
17 pthread_cond_t cond;
18 }shareRes_t;
19
20 int cnt = 0; //全局变量,用来做入队出队操作
21
22 //生产者线程
23 void* producer(void* arg){
24 shareRes_t* pShareRes = (shareRes_t*) arg;
25
26 while(1){
27 //访问共享资源之前,加锁
28 pthread_mutex_lock(&pShareRes->mutex);
29 if(pShareRes->queue.size == 10){//商品队列已经满了
30 //那就被阻塞
31 pthread_cond_wait(&pShareRes->cond,&pShareRes->mutex);
32 }
33 //商品队列没满那就继续生产商品到队列的队尾
34 pShareRes->queue.elem[pShareRes->queue.rear] = cnt++;
35 pShareRes->queue.rear = (pShareRes->queue.rear+1)%NUM;
36 ++pShareRes->queue.size;
37
38 printf("producer size = %d, front = %d , rear = %d\n",
39 pShareRes->queue.size,
40 pShareRes->queue.elem[pShareRes->queue.front],
41 pShareRes->queue.elem[(pShareRes->queue.rear-1+NUM)%NUM]);
42
43 if(pShareRes->queue.size == 1){//如果商品队列大小由空变成拥有一个元素
44 //那就说明之前消费者肯定被阻塞了,所以需要唤醒一下
45 pthread_cond_signal(&pShareRes->cond);
46 }
47 //共享资源访问完毕,解锁
48 pthread_mutex_unlock(&pShareRes->mutex);
49 //睡眠0.5s
50 usleep(500000);
51 }
52 }
53
54 //消费者线程
55 void* consumer(void* arg){
56
57 shareRes_t* pShareRes = (shareRes_t*) arg;
58
59 while(1){
60 //访问共享资源之前,加锁
61 pthread_mutex_lock(&pShareRes->mutex);
62 if(pShareRes->queue.size == 0){//商品队列为空
63 //那就被阻塞
64 pthread_cond_wait(&pShareRes->cond,&pShareRes->mutex);
65 }
66 //商品队列不空那就出队消费商品
67 pShareRes->queue.front = (pShareRes->queue.front+1)%NUM;
68 --pShareRes->queue.size;
69
70 printf("consumer size = %d, front = %d , rear = %d\n",
71 pShareRes->queue.size,
72 pShareRes->queue.elem[pShareRes->queue.front],
73 pShareRes->queue.elem[(pShareRes->queue.rear-1+NUM)%NUM]);
74
75 if(pShareRes->queue.size == 9){//如果商品队列大小由满变成拥有九个元素
76 //那就说明之前生产者肯定被阻塞了,所以需要唤醒一下
77 pthread_cond_signal(&pShareRes->cond);
78 }
79 //共享资源访问完毕,解锁
80 pthread_mutex_unlock(&pShareRes->mutex);
81 sleep(1);
82 }
83 }
84
85 int main(){
86
87 //初始化资源
88 shareRes_t shareRes;
89 bzero(&shareRes.queue,sizeof(shareRes.queue));//将队列所有元素清零
90 pthread_mutex_init(&shareRes.mutex,NULL);
91 pthread_cond_init(&shareRes.cond,NULL);
92
93 pthread_t tid1,tid2;
94
95 pthread_create(&tid1,NULL,producer,(void*)&shareRes);
96 pthread_create(&tid2,NULL,consumer,(void*)&shareRes);
97
98 pthread_join(tid1,NULL);
99 pthread_join(tid2,NULL);
100
101 pthread_mutex_destroy(&shareRes.mutex);
102 pthread_cond_destroy(&shareRes.cond);
103
104 }
运行结果:
一些杂项
之前提过的条件变量有很多接口,最重要的上面提到过的,剩下的了解一下就行。
timedwait有时间的等待
该函数的时间参数需要填一个绝对时间,意思是该函数会等待到某一个时间点为止自动被唤醒。
运行效果:
broadcast广播
这里重点不在broadcast是什么,而是要知道为了避免系统的虚假唤醒或者说避免广播唤醒导致的不应该的醒来,我们最好使用while循环来进行条件判断而不是使用if语句,这是因为当wait醒来之后如果是循环结构的话又会进行条件判断是否cnt == 0,而if结构则不会这样做从而可能产生并发错误。
线程安全问题
有一些库函数,因为各种原因(比如出现的比多线程机制还早)导致其在多线程的情况下可能运行会不正确,比如之前提到的perror函数,除此之外还有ctime:
那怎么解决这种问题呢?那就是使用线程安全的库函数。
使用线程安全版本的库函数
在每个库函数的man手册说明中,函数名后面跟_r的,都表示其为一个线程安全版本的库函数。
比如上面提到的ctime函数的线程安全版本的函数为ctime_r:
可重入不可重入的概念
总结
没啥好总结的,这章是绝对的重点,一定要掌握好!