thread
- 1.什么是线程
- 1.1 线程优缺点
- 1.2 线程异常
- 1.3 线程用途
- 2. 进程和线程区别
- 3. 线程控制
- 3.1 POSIX线程库
- 3.2 pthread_create()
- 3.3 线程ID
- 3.4 线程ID地址空间布局
- pthread_self()
- 3.5 线程终止
- pthread_exit函数
- pthread_cancle函数
- 3.6 线程等待
- 3.7 分离线程
- __thread修饰全局变量
- 4.线程互斥
- 4.1 mutex
- 4.2mutex接口
- 4.2.1初始化互斥量
- 4.2.2销毁互斥量
- 4.2.3互斥量加锁和解锁
- 4.2.4 互斥量原子性的原理
- 4.3 可重入和线程安全
- 4.3.1 概念
- 常见线程不安全的情况
- 常见线程安全的情况
- 常见不可重入的情况
- 常见可重入的情况
- 可重入与线程安全联系
- 可重入与线程安全的区别
- 5.锁
- 5.1 死锁
- 5.2 死锁4个必要条件
- 5.3 避免死锁
- 6. 线程同步
- 6.1 同步与静态条件
- 6.2 条件变量
- 6.3 条件变量函数
- 初始化与销毁
- 等待和唤醒
- demo示例
- 7.生产消费模型
- 模型优点
- 基于阻塞队列的生产消费模型
- 基于环形队列的生产消费者模型
- 8.读写者问题
1.什么是线程
- 一个程序里的一个执行路线就叫做线程(thread)。或者说:线程是一个进程内部的控制序列。
- 一个进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在linux系统中,在CPU严重,看到的PCB要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每一个执行流,就形成了线程执行流。
解释:把起初的一个进程叫做主线程,蓝色的是主线程创建的新线程。
- 为什么线程是轻量化的呢?
因为进程创建和切换,是需要将数据加载到内存,并且刷新页表同时需要管理上下文,IO非常耗时;而线程被创建仅仅是复制主线程的PCB即可,不需要进行IO。并且线程切换时,因为所属于一同一进程(不需要IO),仅仅维护自己的上下文就可以,所以线程调度是轻量化的。 - 在linux下,CPU不关心当前是进程还是线程,它只关注task_struct(PCB),因此linux下没有真正意义的线程,linux的线程使用进程pcb模拟的线程。所以linux下的进程统一被称为轻量级进程!
//makefile
text:text.cc
g++ -o $@ $^ -std=c++11 -pthread
.PHONY:clean
clean:
rm -f text
// text.cc
#include<iostream>
#include<pthread.h>
#include<stdio.h>
#include <unistd.h>
using namespace std;
void * threadRun(void* args)
{
const string name = (char *)args;
while(true)
{
cout<<name<<", pid:"<<getpid()<<endl;
sleep(1);
}
}
int main()
{
pthread_t tid[5];
char name[64];
for(int i = 0;i<5;i++)
{
snprintf(name, sizeof name, "%s-%d", "thread", i);
pthread_create(tid+i, nullptr, threadRun, (void *)name);
sleep(1);
}
while(true)
{
cout<<"main pthread: pid :"<<getpid()<<endl;
sleep(1);
}
return 0;
}
由该demo代码,可以看出主线程pid和新线程的pid是相同的!
1.1 线程优缺点
优点:
- 创建一个新线程的代价比新进程小的多。
- 线程切换比进程切换小得多
- 线程占用资源比进程更少
- 能充分利用多处理器并行性
- 在等待慢速的IO同时,程序可执行其他的计算任务(不会被阻塞)
- 计算密集型应用,能在多处理器系统上运行,将计算分解到多个线程中实现
- IO密集型应用,为了提高性能,将IO操作重叠,线程可以同时等待不同的IO操作
因为在传统的IO操作中,当一个线程等待某个IO操作完成时,它通常会被挂起,处于空闲状态,直到该IO操作完成。这段时间内,该线程无法执行其他任务,造成了资源的浪费和系统的低效利用。而通过允许线程在等待一个IO操作完成的同时,同时等待其他IO操作,可以最大限度地利用线程的等待时间。这样做可以确保线程在等待某个IO操作完成时不会空闲,而是可以继续执行其他IO操作或计算任务,从而提高了系统的利用率和性能。
缺点:
- 性能损失
一个很少被外部时间阻塞的计算密集型线程往往无法与其他线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。 - 健壮性降低
编写多线程需要更全面深度的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性还是很大的,换句话说线程之间是缺乏保护的。 - 编程难度提高
调试多线程的程序更困难。
1.2 线程异常
-
单个线程如果出现除0,野指针问题导致线程崩溃,进程也会崩溃。
-
线程是进程的执行分支,线程出现异常,就类似进程出现异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出。
-
在线程里面进行程序替换时,如果替换成功,把上下文、页表等全部替换(不会报错,其他线程都会停止),并且执行程序替换的代码。
1.3 线程用途
- 合理使用多线程,能提高CPU密集型程序的执行效率。
- 合理的使用多线程,能提高IO密集型程序的用户体验(如我们一边写代码,一边下载东西)。
2. 进程和线程区别
- 进程是资源分配的基本单位。
- 线程是调度的基本单位
- 线程共享进程数据,但也拥有自己的一部分数据:
线程ID、一组寄存器、栈、errno、信号屏蔽字、调度优先级等 - 进程的多个线程共享同一地址空间(代码段、数据段都是共享的),各线程还共享以下进程资源和环境:
函数、全局变量、文件描述符表、每种信号的处理方式、当前工作目录、用户id和组id
3. 线程控制
3.1 POSIX线程库
- 头文件为<pthread.h>
- 链接这些线程函数时需加上"-lpthread"选项
3.2 pthread_create()
功能:创建一个新线程
参数:
- thread:返回线程id
- attr:设置线程的属性,attr为nullptr表示使用默认属性
- start_routine:返回值为void * 参数为void *的函数指针
- arg:传给函数指针的参数
返回值:成功返回0,失败返回错误码
注意:
- 传统的函数一般是成功返回0,失败返回-1,并对全局变量errno赋值以示错误。
- pthreads函数出错时不会设置全局变量errno,而是将错误码通过返回值返回。
- pthreads函数同样提供了线程内的errno变量,以支持其他使用errno的代码。对于pthreads的错误,建议通过返回值获取,开销更小。
文章开头的demo展示了create函数的使用。可以使用ldd命令,查看可执行程序的链接库。
通过ls -l命令,还可以查看它的链接文件:
3.3 线程ID
在linux中,目前的线程实现时Native POSIX Thread Libaray,简称NPTL。在这种实现下,线程又被称为轻量级进程(Light Weighted Process),每一个用户态的线程,在内核中都对应一个调度实体,也拥有自己的进程描述符(task_struct结构体)。
- 在引入线程后,linxu引入了线程组的概念,也产生了线程id
struct task_struct
{
...
pid_t pid;
pid_t tgid;
...
struct task_struct *group_leader;
...
struct list_head thread_group;
...
}
- 多线程的进程,又称为线程组,线程组内的每一个线程在内核之中都存在一个进程描述符(task_struct)与之对应。进程描述符结构体中的pid,表面上看对应的是进程ID,其实对应的是线程ID;进程描述符中的tgid,含义是Thread Group ID,该值对应的用户层面的进程ID。(有点绕,但是系统调用不饶!!)
#include<iostream>
#include<pthread.h>
#include<stdio.h>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
void * threadRun(void* args)
{
const string name = (char *)args;
while(true)
{
cout<<name<<", pid:"<<getpid()<<", tid:"<<pthread_self()<<endl;
sleep(1);
}
}
int main()
{
pthread_t tid[5];
char name[64];
for(int i = 0;i<5;i++)
{
snprintf(name, sizeof name, "%s-%d", "thread", i);
pthread_create(tid+i, nullptr, threadRun, (void *)name);
sleep(1);
}
while(true)
{
cout<<"main pthread: pid :"<<getpid()<<", tid:"<<pthread_self()<<endl;
sleep(1);
}
return 0;
}
将上面的demo稍微改正一下,我们就可以验证到不同线程的tid是不一样的,而os其实是用线程id进行线程调度的!
注意:上面我们是使用pthread_self()获取线程id的,如果需要使用gettid(),请参考该文章:Linux上线程ID以及pthread_self()和gettid()函数的使用与对比
使用脚本监控上面的进程:
while :; do ps -aefL | head -1 && ps -aefL | grep text ; sleep 1; done
LWP:线程ID,即gettid()的返回值。
NLWP:线程组内线程的个数。
根据这个消息,我们就可以猜测,在OS底层,其实调度的信息是根据LWP调度的!
注意到:上面图片第一个线程的PID和LWP都是21342,这是巧合吗?
- 这是因为线程组内的第一个线程,在用户态被称为主线程(main thread),在内核中被称为group leader,内核在创建第一个线程时,会将线程组的ID的值设置成第一个线程的线程ID,group_leader指针则指向自身,即主线程的进程描述符。所以线程组内存在一个线程ID等于进程ID,而该线程就是线程组的主线程。
- 线程和进程不一样,进程有父进程的概念,但在线程组里面,所有的线程都是对等关系。
3.4 线程ID地址空间布局
- 前面说的线程ID(gettid)属于进程调度的范围。因为线程是轻量级进程,是OS系统调度器的最小单位,所以需要一个数值来唯一表示该进程。
- pthread_create函数会产生一个线程ID,存在thread所指向的内容。并且以后线程库的后续操作,就是根据该线程ID来操作线程的。
pthread_self()
功能:获取线程自身ID
pthread_t pthread_self(void);
在Linux目前实现的NPTL而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址,里面的内容是线程ID。
并且该地址在共享区!也就是说,主线程用栈区,新线程用共享区的地址空间。如下图可看出,线程在mmap区域,也是互相独立的
3.5 线程终止
只终止某个线程,不终止整个进程,有三种方法:
- 线程里return。对主函数不适用,因为main函数return相当于调用exit。会导致进程终止!
- 线程可以调用pthread_exit终止自己
- 一个线程可以调用pthread_exit终止统一进程中的另一个线程。
pthread_exit函数
功能:线程终止自己
void pthread_exit( void *retval);
参数:如果线程被join了,那么就会把返回值存到retval里面。
注意:pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回指针时线程函数已经退出了。
pthread_cancle函数
功能:取消一个执行中的线程
int pthread_cancle(pthread_t thread);
参数:线程ID
返回值:成功返回0,失败返回错误码。
3.6 线程等待
- 已经退出的线程,其空间没有被释放,仍在进程的地址空间内。
- 创建新的线程不会复用刚才退出线程的地址空间。
功能:等待指定线程结束
int pthread_join(pthread_t thread, void **retval)
参数:thread:线程ID
retval:指向一个指针,后者指向线程的返回值。
返回值:成功返回0,失败返回错误码
调用该函数的线程将挂起等待,知道id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,retval所指向的单元里存放的是线程函数的返回值。
- 如果thread线程被别的线程调用pthread_cancel异常终止掉,retval所指向的单元里存放的是常数PTHREAD_CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,retval所指向的单元存放的是传给phtread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给retval参数。
以下三个线程demo示例分别验证了123三点。
#include<iostream>
#include<pthread.h>
#include<stdio.h>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
void *thread1(void *arg)
{
printf("thread 1 returning ..,\n");
int *p = (int *)malloc(sizeof(int));
*p = 1;
return (void*)p;
}
void *thread2(void *arg)
{
printf("thread 2 exiting ..,\n");
int *p = (int *)malloc(sizeof(int));
*p = 2;
pthread_exit((void *)p); //该值会返回给主线程ret
}
void *thread3(void * arg)
{
while (1)
{
printf("thread 3 is running ..\n");
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid;
void *ret;
//thread 1 return
pthread_create(&tid, nullptr, thread1, nullptr);
pthread_join(tid, &ret); //相当于 二级指针&ret 指向 void*p , 那么 *(&ret) 其实就相当于ret,又相当于是 ret = p;
printf("thread 1 return, thread id %X, return code:%d\n", tid, *(int *)ret); //%X 大写打印16进制
free(ret);
//thread 2 exit
pthread_create(&tid, nullptr, thread2, nullptr);
pthread_join(tid, &ret); //跟thread 1 一样,只不过换了一种方式
printf("thread 2 exit, thread id %X, return code:%d\n", tid, *(int *)ret);
free(ret);
// thread 3 cancle by other
pthread_create(&tid, nullptr, thread3, nullptr);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &ret);
if(ret == PTHREAD_CANCELED)
printf("thread 3 cancled by other, thread id %X, return code:PTHREAD_CANCELED\n", tid);
else
printf("thread 3 cancled by other, thread id %X, return code:nullptr\n", tid);
return 0;
}
3.7 分离线程
- 默认情况下,新常见的线程是joinable的,线程退出后,需要对其进行pthrerad_join操作,否则无法释放资源,从而造成系统资源泄露。
- 如果不关心线程的返回值,join是一种负担,这个时候,可以告诉OS当线程退出时,自动释放线程资源。
int pthread_detach(pthread_t thread);
成功返回0,失败返回错误码
可以是线程组内其他线程对目标线程进行分离,也可以线程自己分离:
pthread_detach(pthread_self());
请看下面的demo代码:
#include<iostream>
#include<pthread.h>
#include<stdio.h>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
void * thread(void * arg)
{
pthread_detach(pthread_self()); //自己解绑自己
printf("%s\n", (char*)arg);
return NULL;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread, (void *)"thread running...");
int ret=0;
sleep(1); //等一下,让thread运行完
if(pthread_join(tid, nullptr) == 0)
{
printf("thread wait success\n");
ret = 0;
}
else{
printf("thread wait failed\n");
ret = 1;
}
return ret;
}
__thread修饰全局变量
__thread修饰全局变量,带来的结果就是让每一个线程各自拥有一个全局的变量 ,线程的局部存储。
#include<iostream>
#include<pthread.h>
#include<stdio.h>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
__thread int g_val = 0;
void * thread(void * arg)
{
while(1)
{
cout<<(char*)arg<<" : "<<g_val<<" &:"<<&g_val<<endl;
g_val++;
sleep(1);
}
return NULL;
}
int main()
{
pthread_t tid1,tid2,tid3;
pthread_create(&tid1, nullptr, thread, (void *)"thread 1 running...");
pthread_create(&tid2, nullptr, thread, (void *)"thread 2 running...");
pthread_create(&tid3, nullptr, thread, (void *)"thread 3 running...");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
加了__thread后,结果:
不加__thread,结果:
4.线程互斥
- 临界资源:多线程执行流共享的资源
- 临界区:访问临界资源的那段代码
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区。
- 原子性:动作是一气呵成的,不会被打断。
4.1 mutex
问题引入:demo展示了,4个线程进行抢票,但是最后ticket出问题了。
#include<iostream>
#include<pthread.h>
#include <unistd.h>
using namespace std;
int ticket = 10000;
void * func(void* args)
{
char *id = (char *)args;
while(true)
{
if(ticket>0)
{
usleep(1000);
ticket--;
cout<<id<<" 强票, ticket:"<<ticket<<endl;
}
else
{
break;
}
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1,nullptr, func,(void *)"thread 1");
pthread_create(&t2,nullptr, func,(void *)"thread 2");
pthread_create(&t3,nullptr, func,(void *)"thread 3");
pthread_create(&t4,nullptr, func,(void *)"thread 4");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
出现这种情况是什么原因呢?
- if语句判断过后,代码可以并发的切换到其他线程。
- ticket–本事就不是一个原子操作,它是分三步进行的。
所以要保证,这部分的代码,同一时刻只能有一个线程执行,并且是一气喝成,不能被中断。
因此我们需要一把锁,在linxu上这把锁叫做互斥量。
4.2mutex接口
4.2.1初始化互斥量
初始化互斥量有两种方法:
- 方法1,静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- 方法2,动态分配
参数:mutex:要初始化的互斥量
attr:锁的属性,一般都填nullptr
4.2.2销毁互斥量
- 使用静态分配的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,确保后面不会有线程再使用锁。
int pthread_mutex_destroy(pthread_mutex_t *mutex);
4.2.3互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t mutex);
调用pthread_lock时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_lock调用会陷入阻塞,等待互斥量解锁。
将上面的demo改进,即可解决问题:
#include<iostream>
#include<pthread.h>
#include <unistd.h>
using namespace std;
int ticket = 1000;
//pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; //静态分配,不需要销毁
pthread_mutex_t mtx_d; //动态锁,需要用函数初始化
void * func(void* args)
{
char *id = (char *)args;
while(true)
{
//抢票前加锁
//pthread_mutex_lock(&mtx);
pthread_mutex_lock(&mtx_d);
if(ticket>0)
{
//usleep(1000);
ticket--;
cout<<id<<" 抢票, ticket:"<<ticket<<endl;
//抢完票解锁
//pthread_mutex_unlock(&mtx);
pthread_mutex_unlock(&mtx_d);
}
else
{
//抢完票解锁
//pthread_mutex_unlock(&mtx);
pthread_mutex_unlock(&mtx_d);
break;
}
usleep(1000); //抢完票干点别的
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_mutex_init(&mtx_d,nullptr);
pthread_create(&t1,nullptr, func,(void *)"thread 1");
pthread_create(&t2,nullptr, func,(void *)"thread 2");
pthread_create(&t3,nullptr, func,(void *)"thread 3");
pthread_create(&t4,nullptr, func,(void *)"thread 4");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
pthread_mutex_destroy(&mtx_d); //动态分配要销毁锁
return 0;
}
可以观察到,最终进程正常结束!
4.2.4 互斥量原子性的原理
上面的例子已经看到,++和–操作不是原子的,有可能会引发数据一致性问题。
- 为了实现互斥锁,大多体系结构提供了swap或exchange指令,该指令作用是把寄存器和内存单元的数据相交换。由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。下面是lock()和unlock()的伪代码。
4.3 可重入和线程安全
4.3.1 概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他执行流再次进入,称为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则函数叫做可重入函数。否则就是不可重入函数。
常见线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全的函数
常见线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程时安全的。
- 类或者接口对于线程来说都是原子操作。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的。
- 调用了I/O库函数, 标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态数据结构
常见可重入的情况
- 不使用全局变量或静态变量
- 不使用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都由函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
可重入与线程安全联系
- 函数是可重入的,那就是线程安全的。
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全的区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数一定是线程安全的。
- 如果将临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
5.锁
5.1 死锁
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因各个进程互相申请被其他进程所占用不会释放的资源而处于一种永久等待状态。
5.2 死锁4个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
- 不可剥夺条件:一个执行流已获得的资源,在未使用完之前,不能强行剥夺。
- 循环等待条件:若干个执行流之间形成一种头尾相接的循环等待资源的关系。
5.3 避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁的算法:死锁检测算法、银行家算法。
6. 线程同步
6.1 同步与静态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
- 竞态条件:因为时序问题,而导致程序异常,这种情况叫静态条件。
6.2 条件变量
- 当一个线程互斥的访问某个变量时,如果其他线程加锁了,他可能除了等待什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,直到其他线程将一个节点添加到队列中。这种情况就需要用到条件变量。
条件变量作用:
- 当没有临界资源的时候,不要让线程自己频繁检测有没有资源。而是将线程挂起,当对应的条件就绪的时候,通知对应的线程,让他来进行资源申请和访问!
6.3 条件变量函数
初始化与销毁
cond :是控制的条件变量
attr:是条件变量的属性,一般设置为nullptr
注意:第三个框证明了条件变量也能初始化!
等待和唤醒
cond:要在该条件变量上等待
mutx:互斥量,控制互斥访问的锁。
cond:唤醒该条件变量
demo示例
解释:该demo实现了主线程创建新线程去执行用户自定义任务,同时使用互斥同步锁保护临界资源和线程执行的顺序。
#include<iostream>
#include<pthread.h>
#include<iostream>
#include <unistd.h>
#define TNUM 4
typedef void(*func_t)(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;
// //静态初始化方法,不需要destroy
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
// pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
class ThreadData
{
public:
ThreadData(const std::string &name, func_t func,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
:_name(name),_func(func),_pmtx(pmtx), _pcond(pcond)
{}
public:
std::string _name;
func_t _func;
pthread_mutex_t *_pmtx;
pthread_cond_t *_pcond;
};
void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
//记住wait一定要在加锁和解锁之间
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行好的时候,wait代码被执行,当前线程会被立即阻塞
std::cout<<name<<"running ...."<<std::endl;
sleep(1);
pthread_mutex_unlock(pmtx);
}
}
void func2(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
//记住wait一定要在加锁和解锁之间
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行好的时候,wait代码被执行,当前线程会被立即阻塞
std::cout<<name<<"running ...."<<std::endl;
sleep(1);
pthread_mutex_unlock(pmtx);
}
}
void func3(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
//记住wait一定要在加锁和解锁之间
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行好的时候,wait代码被执行,当前线程会被立即阻塞
std::cout<<name<<"running ...."<<std::endl;
sleep(1);
pthread_mutex_unlock(pmtx);
}
}
void func4(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
//记住wait一定要在加锁和解锁之间
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行好的时候,wait代码被执行,当前线程会被立即阻塞
std::cout<<name<<"running ...."<<std::endl;
sleep(1);
pthread_mutex_unlock(pmtx);
}
}
void *Entry(void* args)
{
ThreadData* td = (ThreadData*)args; // td 在每个线程自己私有的栈空间中保存
td->_func(td->_name, td->_pmtx, td->_pcond); //他是一个函数,调用完成就返回
delete td;
return nullptr;
}
int main()
{
pthread_t tids[TNUM];
func_t func[TNUM] = {func1, func2, func3, func4}; //定义一个函数指针数组
pthread_mutex_t mtx;
pthread_cond_t cond;
pthread_mutex_init(&mtx, nullptr); //动态初始化,得销毁
pthread_cond_init(&cond, nullptr);
for(int i = 0;i<TNUM;i++)
{
std::string name = "Thread ";
name+=std::to_string(i+1);
ThreadData *td = new ThreadData(name, func[i], &mtx, &cond);
pthread_create(tids+i, nullptr, Entry, (void*)td); //创建线程
}
sleep(5);
//控制新线程
int cnt = 10;
while(cnt)
{
std::cout<<" making thread run code...."<<std::endl;
pthread_cond_signal(&cond); //唤醒其中一个线程
sleep(1);
cnt--;
}
std::cout<<"控制结束"<<std::endl;
quit = true;
pthread_cond_broadcast(&cond);
//线程等待
for(int i = 0;i<TNUM;i++)
{
pthread_join(tids[i], nullptr);
std::cout<<" thread: "<<tids[i]<<" quit "<<std::endl;
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
wait这里还有一个小细节就是:我们通过获得锁进来了之后,如果发现cond不满足,那么这个函数会自动释放锁,然后将该线程阻塞挂起,等待唤醒。防止造成死锁。
注意:为什么pthread_cond_wait需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变成满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁,就无法安全的获取和修改共享数据。
7.生产消费模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里面取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
基于321原则:
3种关系:生产者和生产者(竞争互斥关系)、消费者和消费者(竞争互斥关系)、生产者和消费者(互斥同步关系)
2种角色:生产者和消费者
1种场所:缓冲区。
模型优点
- 解耦
- 支持并发(生产消费同时进行,不需要等待)
- 支持忙闲不均
基于阻塞队列的生产消费模型
下面的代码一共五个文件:完成的功能是基于线程池的线程任务派发功能。
- lockGuard.hpp是锁模块,是基于RAII实现的自动加锁解锁模块,可用来控制临界资源的访问!
- log.hpp 是日志功能模块,将程序的运行状态和输出情况标准化,并且输出再threadpool.log文件
- Task.hpp:模拟任务模块,简单的设计了一下求和的任务。
- thread.hpp:是线程模块,用于记录单个线程的详细信息。
- threadPool.hpp:是线程池模块,这个是该demo的核心,有着存储线程的队列,包括派发和回收任务。(该实现是基于单例模式的饿汉实现!)
- Main.cc:模拟网络中来的任务,让程序跑起来
//1.makefile
thread_pool:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread #-D DEBUG_SHOW
.PHONY:clean
clean:
rm -f thread_pool
//2.lockGuard.hpp
#pragma once
#include<iostream>
#include<pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):_pmtx(mtx){}
void lock()
{
std::cout<<"要进行加锁"<<std::endl;
pthread_mutex_lock(_pmtx);
}
void unlock()
{
std::cout<<"要进行解锁"<<std::endl;
pthread_mutex_unlock(_pmtx);
}
~Mutex(){}
private:
pthread_mutex_t *_pmtx;
};
//RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):_mtx(mtx)
{
_mtx.lock();
}
~lockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
//3.log.hpp
#pragma once
#include<iostream>
#include<stdarg.h>
//日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char* gLevelMap[] ={
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
//完整的日志功能,至少:日志等级 时间 支持用户自定义(日志内容,文件行,文件名)
void logMessage(int level, const char *format,...)
{
#ifndef DEBUG_SHOW
if(level == DEBUG) return;
#endif
//va_list ap;
//va_start(ap, format);
//while()
//int x = va_arg(ap, int);
//va_end(ap); //ap = nullptr;
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
//struct tm* localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof(stdBuffer), "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
//vprintf(format, args);
vsnprintf(logBuffer, sizeof(logBuffer), format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
//printf("%s %s\n", stdBuffer, logBuffer);
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
//4.Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>
#include "log.hpp"
//#include "thread.hpp"
typedef std::function<int(int, int)> func_t; //这个typedef 如果定义全局变量,两个文件都取了相同的名字,
//可能就会产生影响,但是这个是局部变量,不会对其他的文件产生影响!!
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):_x(x),_y(y),_func(func){}
void operator()(const std::string &name)
{
// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
logMessage(WARNING, "%s 处理完成:%d+%d = %d | %s | %d",
name.c_str(), _x, _y, _func(_x, _y), __FILE__, __LINE__);
}
private:
int _x;
int _y;
func_t _func;
};
//5.thread.hpp
#pragma once
#include<string>
#include<iostream>
#include<cstdio>
typedef void*(*fun_t)(void *);
class ThreadData
{
public:
void *_args;
std::string _name;
};
class Thread
{
public:
Thread(int num, fun_t callback, void *args):_func(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d",num);
_name = nameBuffer;
_tdata._args = args;
_tdata._name = _name;
}
void start()
{
pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
}
void join()
{
pthread_join(_tid, nullptr);
}
std::string name()
{
return _name;
}
~Thread(){}
private:
std::string _name;
fun_t _func;
ThreadData _tdata;
pthread_t _tid;
};
//6.threadPool.hpp
#pragma once
#include<iostream>
#include<vector>
#include<queue>
// #include<string>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
//本质是生产消费模型
const int g_thread_num = 3;
template<class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &lock;
}
bool isEmpty()
{
return _task_queue.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
private:
ThreadPool(int thread_num = g_thread_num):_num(thread_num)
{
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&cond, nullptr);
for(int i = 1;i<=_num;i++)
{
_threads.push_back(new Thread(i, routine, this)); //注意这个this指针,这样线程就能够访问线程池了!!
}
}
//禁止线程池进行拷贝和赋值
ThreadPool(const ThreadPool<T> &other) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
public:
//考虑一下多线程使用单例的过程
static ThreadPool<T>* getThreadPool(int num = g_thread_num)
{
//可以有效减少未来必定要进行加锁检测的问题
//拦截大量的已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为
if(nullptr == _thread_ptr)
{
lockGuard lockguard(&mutex);
//但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口,因为构造私有化了,所以必须也得static,使用类域访问
if(nullptr == _thread_ptr)
{
_thread_ptr = new ThreadPool<T>(num); //.....
}
}
return _thread_ptr;
}
// 1. run
void run()
{
for(auto &it:_threads)
{
it->start();
//std::cout << iter->name() << " 启动成功" << std::endl;
logMessage(NORMAL, "%s %s", it->name().c_str(), "启动成功");
}
}
// 线程池本质也是一个生产消费模型
// void* routine(void *args)
// 消费过程
static void *routine(void *args) //因为线程传进的函数,不能有this参数(加上就是两个),所以需要设置为static,这里的routine的args是thread里的_data,
//而_data的类型是ThreadData,并且Data里面存的args是线程池的指针
{
ThreadData *td = (ThreadData*)args; //先获得threadData
ThreadPool<T> *tp = (ThreadPool<T> *)td->_args; //获得了this指针
while(true)
{
T task;
{
lockGuard lockguard(tp->getMutex()); //静态的成员函数因为没有this指针,不能直接访问类的非静态数据,但是因为tp就是this指针,所以不用这个函数也可以完成任务
while(tp->isEmpty()) tp->waitCond(); // 如果任务队列(缓冲区为空,挂起)
//读取任务
task = tp->getTask(); //队列是共享的->将任务从共享,拿到自己的私有空间
}
task(td->_name); //打印出自己已经完成了
}
}
void pushTask(const T&task)
{
lockGuard lockguard(&lock);
_task_queue.push(task);
pthread_cond_signal(&cond); //有任务了就唤醒
}
~ThreadPool()
{
for(auto &it:_threads)
{
it->join();
delete it;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread *> _threads; //指向线程
int _num; //线程池的线程数
std::queue<T> _task_queue;
static ThreadPool<T> *_thread_ptr; //指向线程池的指针
static pthread_mutex_t mutex; //创建线程池的互斥锁
pthread_mutex_t lock; //是信号量的锁和信号
pthread_cond_t cond;
};
// 静态变量需要在类外初始化,线程池只需要一个,指向线程池的指针初始化为nullptr
// 线程池的锁也需要初始化
template<typename T>
ThreadPool<T> *ThreadPool<T>::_thread_ptr = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
//7.Main.cc
#include "Task.hpp" //把他放在前面,因为threadPool要用
#include "threadPool.hpp"
#include "thread.hpp"
#include <ctime>
#include <unistd.h>
#include<typeinfo> //查看类型的头文件
int main()
{
srand((unsigned long)time(nullptr) ^ getpid()); // 增加种子随机性
// fun_t a;
// std::cout<<typeid(a).name()<<std::endl; 检查typedef重定义来着
// 如果单例本身也在被多线程申请使用呢? 那么单例构造需要加锁!
ThreadPool<Task>::getThreadPool()->run();
while (true)
{
// 生产过程中,制作任务要花费时间
int x = rand() % 100 + 1;
usleep(5210); // 模拟花费的时间
int y = rand() % 30 + 1;
Task t(x, y, [](int x, int y)->int{
return x+y;
});
logMessage(DEBUG, "制作完成: %d+%d = ?",x,y);
logMessage(DEBUG, "制作完成: %d+%d = ?",x,y);
logMessage(DEBUG, "制作完成: %d+%d = ?",x,y);
logMessage(DEBUG, "制作完成: %d+%d = ?",x,y);
//推送任务到线程池中
ThreadPool<Task>::getThreadPool()->pushTask(t);
sleep(1);
}
return 0;
}
运行结果部分展示:
基于环形队列的生产消费者模型
逻辑结构是环形的,但是物理结果还是线性模型
该demo实现了:基于环形队列的生产消费模型,采用信号量pv的代码风格,实现了多线程互斥协同工作
// makefile
ring_queue:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ring_queue
//sem.hpp
#ifndef __SEM_HPP_
#define SEM_HPP_
#include<iostream>
#include<semaphore.h>
class Sem
{
public:
Sem(int val)
{
sem_init(&_sem, 0, val);
}
void p()
{
sem_wait(&_sem);
}
void v()
{
sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};
#endif
//ringQueue.hpp
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_
#include <iostream>
#include <vector>
#include<pthread.h>
#include "sem.hpp"
const int g_default_num = 5;
// 信号量本质是一把计数器->计数器又是什么呢??可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断!
// 申请锁->判断与访问->释放锁-->本质是我们并不清楚临界资源的情况 !!!
// 信号量要提前预设资源的情况,而且在pv变化的过程中,我们可以在外部就能知晓临界资源的情况。
// 多线程
template <class T>
class RingQueue
{
public:
RingQueue(int default_num = g_default_num)
:_ring_queue(default_num),
_num(default_num),
c_step(0),
p_step(0),
_space_sem(default_num),
_data_sem(0)
{
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&pclock, nullptr);
}
~RingQueue()
{
pthread_mutex_destroy(&clock);
pthread_mutex_destroy(&pclock);
}
//生产者
void push(const T&in)
{
//先申请信号量(0)
_space_sem.p();
pthread_mutex_lock(&pclock); //生产者需要互斥
_ring_queue[p_step++] = in;
p_step%=_num;
pthread_mutex_unlock(&pclock);
_data_sem.v(); //通知取数据
}
//消费者
void pop(T *out)
{
_data_sem.p();
pthread_mutex_lock(&clock); //消费者需要互斥
*out = _ring_queue[c_step++];
c_step%=_num;
pthread_mutex_unlock(&clock);
_space_sem.v();
}
private:
std::vector<T> _ring_queue;
int _num;
int c_step; //消费下标
int p_step; //生产下标 用于控制唤醒队列的队头和队尾
Sem _space_sem; //空间多大的信号量
Sem _data_sem; //数据多少的信号量
pthread_mutex_t clock; //消费者的锁
pthread_mutex_t pclock; //生产者的锁
};
#endif
//testMain.cc
#include "ringQueue.hpp"
#include <unistd.h>
#include <ctime>
#include<sys/types.h>
#include<unistd.h>
void *consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (true)
{
sleep(1);
int x;
//1. 从环形队列中取数据
rq->pop(&x);
//2.进行一定的处理
std::cout<<" 消费: "<< x <<" ["<<pthread_self()<<" ]"<<std::endl;
}
}
void *productor(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (true)
{
//sleep(1);
//1.构建数据或者对象
int x = rand()%100+1;
std::cout<<" 生产: "<<x<<" ["<<pthread_self()<<"] "<<std::endl;
//2.推送到环形队列中
rq->push(x);
}
}
int main()
{
srand((uint64_t)time(nullptr)^getpid());
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c[3],p[2];
pthread_create(c, nullptr, consumer, (void *)rq);
pthread_create(c+1, nullptr, consumer, (void *)rq);
pthread_create(c+2, nullptr, consumer, (void *)rq);
pthread_create(p, nullptr, productor, (void *)rq);
pthread_create(p+1, nullptr, productor, (void *)rq);
for(int i = 0; i<3;i++) pthread_join(c[i], nullptr);
for(int i = 0;i<2;i++) pthread_join(p[i], nullptr);
return 0;
}
结果展示:
8.读写者问题
后续会慢慢更新!