目录
一.线程概念
1.什么是线程
2.页表
(1)页表结构
(2)好处
3.线程优点
4.线程缺点
5.线程异常
6.线程用途
7.进程和线程的
8.简单使用线程
二.线程控制
1.使用线程
2.线程栈和pthread_t
3.线程的局部存储
4.分离线程
三.线程互斥
1.数据不一致问题
2.锁
四.重入、线程安全
1.概念
2.可重入情况
3.不可重入情况
4.线程安全情况
5.线程不安全情况
6.可重入与线程安全联系
7.可重入与线程安全区别
五.死锁
1.概念
2.死锁四个必要条件
3.避免死锁
六.线程同步
1.概念
2.条件变量
(1)函数
(2)为什么pthread_cond_wait需要互斥量
(3)条件变量使用规范
七.生产者消费者模型
八.POSIX信号量
1. 概念
2.初始化信号量
3.销毁信号量
4.等待信号量
5.发布信号量
九.基于环形队列的生产消费模型
十.线程池
十一.STL、智能指针和线程安全
1.STL中的容器不是线程安全的。
2.智能指针是否是线程安全的
十二.读者写者问题
前言:这一篇的内容很多,包括多线程的各种操作,生产者消费者模型等。
一.线程概念
1.什么是线程
①在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
② 一切进程至少都有一个执行线程
③ 线程在进程内部运行,本质是在进程地址空间内运行
④ 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
⑤ 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
2.页表
地址空间的单位是2^32 * 1字节 = 4GB,这时如果要做地址映射,每个虚拟地址都要有对应的物理地址,如果页表只要一张,要多少条目(页表项)呢?
2^32个条目!一个条目可不是1个字节,保守估计有8个字节,2^32 * 8字节 = 32GB,如果页表这么大,那内存都没了。所以页表一定不是只要一个的。
物理内存是按4KB为单位进行划分的(这里的4KB叫做页框),可执行程序是虚拟地址编译,也划分好了4KB(这里的4KB叫做页帧)
IO的基本单位是块,块:一般是4KB
如果物理内存有4GB,大概有100万(2^20)个页。
页框就也是有2^20个,那么OS就要管理它们,就要先描述,在组织。
因此用一个结构
struct page
{
};
来描述
通过struct page name[1024*1024]来组织
因此,对内存的管理,就变成了对数组的增删查改。
(1)页表结构
虚拟地址在被转化的过程,不是直接转化的,是按10 + 10 + 12的形式的。
如果页表只有一张,要占2^32 / 2^12 = 2^20条目
即使一个条目10字节,最大也就10M到20M
如果把整个页表旋转一下,把页目录放上面,就相当于一棵多叉树
上面的这些动作都是由硬件MMU完成的,所以是软(页表)硬件(MMU)结合的方式。
(2)好处
① 进程虚拟地址管理和内存管理,通过页表+page进行了解耦
② 页表分离了,可以实现页表的按需创建
③ 分页机制+按需创建页表 = 节省空间
3.线程优点
① 创建一个新线程的代价要比创建一个新进程小得多
② 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
③ 线程占用的资源要比进程少很多
④ 能充分利用多处理器的可并行数量
⑤ 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
⑥ 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
⑦ I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
4.线程缺点
①性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与其它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
② 健壮性降低
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
③ 缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
④ 编程难度提高
编写与调试一个多线程程序比单线程程序困难得多
5.线程异常
单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
6.线程用途
合理的使用多线程,能提高CPU密集型程序的执行效率
合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
7.进程和线程的
进程是资源分配(向系统申请资源)的基本单位
线程是调度的基本单位
线程是在进程内部运行的执行流
线程比进程粒度更细,调度成本更低
线程共享进程数据,但也拥有自己的一部分数据:
① 一组寄存器
② 栈
③ 线程ID
④ errno
⑤ 信号屏蔽器
⑥ 调度优先级
进程的多个线程共享同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
① 文件描述符表
② 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
③ 当前工作目录
④ 用户id和组id
线程:进程 = n : 1
Linux认为:是没有进程、没有线程在概念上的区分的,只有一个 叫做执行流。Linux的线程是用进程 (PCB)模拟的。
进程和线程在执行流层面是不一样的
CPU看到的所有的take_struct都是一个执行流(线程)
CPU视角:take_struct <= 传统的进程PCB
没有真正意义上的线程,而是用进程take_struct模拟实现的
linux下的"进程" <= 其它OS的进程概念
进程 = 内核数据结构 + 进程对应的代码和数据
进程 = 内核视角:承担分配系统资源的基本实体(进程的基座属性)
内部只有一个执行流的进程 ---- 单执行流进程
内部有多个执行流的进程 ---- 多执行流进程
8.简单使用线程
下面简单介绍一下线程要用到的2个函数:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start routine)(void*), void *arg);
作用:创建线程
第一个参数*thread:线程id,输出型参数
第二个参数*attr:线程属性
第三个参数:返回值为void*,参数为void*的函数指针,线程调用函数的一个入口地址
第四个参数*arg:是第三个参数想传的参数
int pthread_join(pthread_t thread, void** retval);
作用:等待一个终止的线程
我们先来看看线程的运行:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void *callback1(void *args)
{
string name = (char*)args;
while(true)
{
cout << name << endl;
sleep(1);
}
}
void *callback2(void *args)
{
string name = (char*)args;
while(true)
{
cout << name << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, nullptr, callback1, (void*)"thread 1");
pthread_create(&tid2, nullptr, callback2, (void*)"thread 2");
while(true)
{
cout << "我是主线程..." << endl;
sleep(1);
}
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
return 0;
}
这里可以看到,3个线程是同时运行的,但是我们看进程是只能看到一个的。我们在用指令ps -aL,就可以查看到线程了,LWP就是轻量级进程ID。
3个执行流的PID相同,说明3个属于一个进程,而那个PID和LWP都相同的就是主线程。
下面我们看一下pid:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void *callback1(void *args)
{
string name = (char*)args;
while(true)
{
cout << name << ": " << ::getpid() << endl;
sleep(1);
}
}
void *callback2(void *args)
{
string name = (char*)args;
while(true)
{
cout << name << ": " << ::getpid() << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, nullptr, callback1, (void*)"thread 1");
pthread_create(&tid2, nullptr, callback2, (void*)"thread 2");
while(true)
{
cout << "我是主线程...: " << ::getpid() << endl;
sleep(1);
}
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
return 0;
}
这里可以看到,3个线程的pid都是相同的,说明他们同属一个进程。
再用一下C++的线程:
#include <iostream>
#include <string>
#include <unistd.h>
#include <thread>
using namespace std;
int main()
{
std::thread t([](){
while(true)
{
cout << "线程运行起来了" << endl;
sleep(1);
}
});
t.join();
return 0;
}
如果我们不加-lpthread,就会出现上面的错误,因为C++的线程在LInux中就是封装的Linux的线程,所以一定要加上-lpthread
加上以后结果如下:
成功运行线程。
二.线程控制
1.使用线程
下面正式使用线程:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start routine)(void*), void *arg);
作用:创建线程
第一个参数*thread:线程id,输出型参数
第二个参数*attr:线程属性
第三个参数:返回值为void*,参数为void*的函数指针,线程调用函数的一个入口地址(回调方法)
第四个参数*arg:传给线程启动函数的参数(是第三个参数想传的参数)
int pthread_join(pthread_t thread, void** retval);
作用:等待一个终止的线程
第一个参数*thread:线程id
第二个参数**retval:线程退出时的退出码(二级指针),输出型参数
注意:线程退出的时候,一般必须要进程join,如果不进行join,就会造成类似于进程那样的内存泄漏问题
我们来看一下新的线程的ID:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void *startRoutine(void *args)
{
while(true)
{
cout << "线程正在运行..." << endl;
sleep(1);
}
}
int main()
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, startRoutine, (void*)"thread1");
cout << "new thread id : " << tid << endl; // 线程ID
while(true)
{
cout << "main thread 正在运行..." << endl;
sleep(1);
}
return 0;
}
这里可以看到这个线程ID很大。
两个线程以16进制打印出来的id:
下面看一下信号对于线程的控制:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void printTid(const char *name, const pthread_t &tid)
{
printf("%s 正在运行, thread id: 0x%x\n", name, tid);
}
void *startRoutine(void *args)
{
const char *name = static_cast<const char*>(args);
int cnt = 500;
while(true)
{
printTid(name, pthread_self());
sleep(1);
if(!(cnt--))
{
break;
}
}
cout << "线程退出了..." << endl;
return nullptr;
}
int main()
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, startRoutine, (void*)"thread1");
sleep(10000);
pthread_join(tid, nullptr);
while(true)
{
printTid("main thread", pthread_self());
sleep(1);
}
return 0;
}
这里可以发现,暂停了一个线程时,所有线程都暂停了。运行一个线程时,也都同时运行了。因为它们是在一个进程的。
下面来看看线程异常的样子:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void printTid(const char *name, const pthread_t &tid)
{
printf("%s 正在运行, thread id: 0x%x\n", name, tid);
}
void *startRoutine(void *args)
{
const char *name = static_cast<const char*>(args);
int cnt = 5;
while(true)
{
printTid(name, pthread_self());
sleep(1);
if(!(cnt--))
{
int *p = nullptr;
*p = 100; // 野指针问题·
}
}
cout << "线程退出了..." << endl;
return nullptr;
}
int main()
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, startRoutine, (void*)"thread1");
sleep(10);
pthread_join(tid, nullptr);
cout << "main thread join success" << endl;
sleep(10);
while(true)
{
printTid("main thread", pthread_self());
sleep(1);
}
return 0;
}
这里可以看到,待线程出现野指针问题时,左边会显示段错误,右边的线程直接就没了。
说明整个进程整体异常退出,线程异常 == 进程异常。
所以线程会影响其它线程的运行 ---- 线程的健壮性较低,鲁棒性
我们来真正的测试一下join函数的第二个参数的作用:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void printTid(const char *name, const pthread_t &tid)
{
printf("%s 正在运行, thread id: 0x%x\n", name, tid);
}
void *startRoutine(void *args)
{
const char *name = static_cast<const char*>(args);
int cnt = 5;
while(true)
{
printTid(name, pthread_self());
sleep(1);
if(!(cnt--))
{
break;
}
}
cout << "线程退出了..." << endl;
// 1. 线程退出的方式,return
return (void*)111;
}
int main()
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, startRoutine, (void*)"thread1");
(void)n;
void *ret = nullptr;
pthread_join(tid, &ret); // void **retval是一个输出型参数
cout << "main thread join success, *ret: " << (long long)(ret) << endl;
sleep(10);
while(true)
{
printTid("main thread", pthread_self());
sleep(1);
}
return 0;
}
这里我们就得到了新线程退出时的退出码 111。
join的第二个参数就是一个输出型参数,获取新线程退出时的退出码
在线程退出时,代码跑完,结果不正确和结果正确都可以得到退出码,但是线程异常时并不会出现退出码。
那么为什么异常时主线程没有获取新线程退出时的退出码呢?
因为线程出异常就不再是线程的问题,而是进程的问题,应该让父进程获取退出码,知道它什么时候退出的。
因此线程终止时,只需考虑正常终止。
线程终止有3种方式:
方式一:
直接return:return (void*)111;
方式二:
void pthread_exit(void *retval);
作用:终止线程
参数*retval:线程终止时的退出码
下面来使用一下pthread_exit:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void printTid(const char *name, const pthread_t &tid)
{
printf("%s 正在运行, thread id: 0x%x\n", name, tid);
}
void *startRoutine(void *args)
{
const char *name = static_cast<const char*>(args);
int cnt = 5;
while(true)
{
// cout << "线程正在运行..." << endl;
printTid(name, pthread_self());
sleep(1);
if(!(cnt--))
{
break;
}
}
cout << "线程退出了..." << endl;
// 2. 线程退出的方式,pthread_exit
pthread_exit((void*)1111);
}
int main()
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, startRoutine, (void*)"thread1");
(void)n;
void *ret = nullptr;
pthread_join(tid, &ret); // void **retval是一个输出型参数
cout << "main thread join success, *ret: " << (long long)(ret) << endl;
sleep(10);
while(true)
{
printTid("main thread", pthread_self());
sleep(1);
}
return 0;
}
这里为什么使用的是pthread_eixt,不是exit呢?
因为exit退出的是进程,任何一个线程调用exit,都标识整个进程退出。
方式三:
int pthread_cancel(pthread_t thread)
作用:取消一个线程
这个一般不常用,但我们依旧使用一下它:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void printTid(const char *name, const pthread_t &tid)
{
printf("%s 正在运行, thread id: 0x%x\n", name, tid);
}
void *startRoutine(void *args)
{
const char *name = static_cast<const char*>(args);
int cnt = 5;
while(true)
{
// cout << "线程正在运行..." << endl;
printTid(name, pthread_self());
sleep(1);
if(!(cnt--))
{
}
}
}
int main()
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, startRoutine, (void*)"thread1");
(void)n;
sleep(3); // 代表main thread对应的工作
pthread_cancel(tid);
cout << "new thread been canceled" << endl;
void *ret = nullptr;
pthread_join(tid, &ret); // void **retval是一个输出型参数
cout << "main thread join success, *ret: " << (long long)(ret) << endl;
sleep(10);
while(true)
{
printTid("main thread", pthread_self());
sleep(1);
}
return 0;
}
成功退出,退出的结果为-1
结果是-1的原因:线程和进程一样,用的都是PCB,退出时都有自己的退出码,调用return或exit就是自己修改PCB中的退出结果(退出码),取消这个线程时,是OS取消的,就直接向退出码中写-1.
2.线程栈和pthread_t
我们使用的线程库,是用户级线程库:pthread
pthread就是地址
① 线程是一个独立的执行流
② 线程一定会在自己的运行过程中,产生临时数据(调用函数、定义局部变量等)
③ 线程一定需要有自己的独立的栈空间
所以在代码区中,有3个,①自己写的 ②库的 ③系统的。
所有的代码执行,都是在进程的地址空间当中进行执行
pthread_t究竟是什么:
线程的全部实现,并没有全部体现在OS内,而是OS提供执行流,具体的线程结构由库来进行管理。
因此,库可以创建多个线程,所有库就要管理线程(先描述,在组织)。
struct thread_info
{
pthread_t tid;
void *stack; // 私有栈
}
3.线程的局部存储
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int global_value = 100;
void *startRoutine(void *args)
{
while(true)
{
cout << "thread " << pthread_self() << " global_value: " << global_value
<< " &global_value: " << &global_value << " Inc: " << global_value++ << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*) "thread 1");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 1");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
这里的global_value的地址都是相同的。
下面在int global_value前加__thread:
加了这个__thread就会默认把这个global_value再拷一份给每一个进程
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
__thread int global_value = 100;
void *startRoutine(void *args)
{
while(true)
{
cout << "thread " << pthread_self() << " global_value: " << global_value
<< " &global_value: " << &global_value << " Inc: " << global_value++ << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*) "thread 1");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 1");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
这里的global_value的地址则不一样。
int syscall(int number , ...)
作用:拿到线程的lwp
用一下syscall:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
using namespace std;
__thread int global_value = 100;
void *startRoutine(void *args)
{
while(true)
{
cout << "thread " << pthread_self() << " global_value: "
<< global_value << " &global_value: " << &global_value
<< " Inc: " << global_value++ << " lwp: " << syscall(SYS_gettid) << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*) "thread 1");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 1");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
4.分离线程
① 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
② 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
joinable和分离是冲突的,一个线程不能既是joinable又是分离的。
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离,最好是让主线程,分离其它进程。
线程分离了,意味着,不在关心这个线程的死活。所以这也相当于线程退出的第4种方式,延后退出。
立即分离或者延后分离都可以,但是要保证线程活着。
新线程分离,但是主线程先退出(进程退出),所有线程就都退了。
一般分离线程,对应的主线程不退出(常驻内存的进程)
int pthread_detach(pthread_t thread);
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
using namespace std;
__thread int global_value = 100;
void *startRoutine(void *args)
{
cout << "线程分离..." << endl;
while(true)
{
cout << "thread " << pthread_self() << " global_value: "
<< global_value << " &global_value: " << &global_value
<< " Inc: " << global_value++ << " lwp: " << syscall(SYS_gettid) << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*) "thread 1");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 1");
sleep(1);
pthread_detach(tid1);
pthread_detach(tid2);
pthread_detach(tid3);
return 0;
}
三.线程互斥
1.数据不一致问题
临界资源:多个执行流都能看到访问的资源
临界区:多个执行流,有不同的代码,但是访问临界资源的代码,称之为临界区
数据不一致问题:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
using namespace std;
// int 票数计数器
int tickets = 10000; // 临界资源,可能会因为共同访问,造成数据不一致问题
void *getTickets(void *args)
{
const char *name = static_cast<const char*>(args);
while(true)
{
if(tickets > 0)
{
usleep(1000);
cout << name << " 抢到了票,票的编号: " << tickets << endl;
tickets--;
}
else
{
cout << name << "] 已经放弃抢票了,因为没有了..." << endl;
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*) "thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*) "thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*) "thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*) "thread 4");
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid4, nullptr);
cout << n << ":" << strerror(n) << endl;
return 0;
}
这里就出现了cuo
tickets--:不是由一条语句完成的,而是3条:
① 从内存中取数据到CPU的寄存器中(load tickets to reg)
② 在内部对其进行--操作(reg--)
③ 将计算完成的结果写回内存中(write reg to tickets)
在执行语句的任何地方,线程都可能被切换走。
CPU内的寄存器是被所有的执行流共享的,但是寄存器里面的数据是属于当前执行流的上下文数据
线程被切换的时候,需要保存上下文
线程被换回的时候,需要恢复上下文
就可能A线程保存了数据之后,轮到B线程,待到tickets很多次--后,又轮到A线程,--了一次,这时返回到内存中的值是A线程原来保存的值--了1次的值,而不是真正的数据。
因为线程切换,多线程之间并发访问临界资源就会出现数据不一致的问题
上面的不只有--会出现数据不一致的问题,在判断tickets > 0时也同样会出现数据不一致。
为了防止出现数据不一致的问题就要保证--的行为是原子的。
当我们访问某种资源的时候,任何时候都只要一个执行流在进行访问,这个就叫做:互斥特性
不想让线程之间互相打扰,就要加锁,加锁就可以解决数据不一致的问题:
2.锁
初始化的两种方式:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
静态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict
attr);
动态分配
销毁:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
注意:
使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
不要销毁一个已经加锁的互斥量
已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
阻塞式加锁,如果其它线程正在使用,那么这个线程将会被阻塞住,放入等待队列,等带其它线程用完。
int pthread_mutex_trylock(pthread_mutex_t *mutex);
非阻塞式加锁,如果其它线程正在使用,那么这个线程就会返回。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
解锁。
使用接口:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
using namespace std;
// int 票数计数器
int tickets = 10000; // 临界资源,可能会因为共同访问,造成数据不一致问题
pthread_mutex_t mutex;
void *getTickets(void *args)
{
const char *name = static_cast<const char*>(args);
while(true)
{
// 临界区,只需要对临界区加锁,而且加锁的粒度越细越好
pthread_mutex_lock(&mutex);
if(tickets > 0)
{
usleep(1000);
cout << name << " 抢到了票,票的编号: " << tickets << endl;
tickets--;
pthread_mutex_unlock(&mutex);
// other code
usleep(100); // 模拟其它业务逻辑的执行
}
else
{
cout << name << "] 已经放弃抢票了,因为没有了..." << endl;
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
pthread_mutex_init(&mutex, nullptr);
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*) "thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*) "thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*) "thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*) "thread 4");
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid4, nullptr);
cout << n << ":" << strerror(n) << endl;
// 释放锁
pthread_mutex_destroy(&mutex);
return 0;
}
临界区,只需要对临界区加锁,而且加锁的粒度越细越好
加锁的本质式让线程执行临界区代码串行化
加锁是一套规范,通过临界区对临界资源进行访问的时候,要加锁就都要加
锁保护的是临界区,任何线程执行临界区代码访问临界资源,都必须先申请锁,前提是都必须先看到锁
那么这把锁,本身不就是也是临界资源吗?那谁来保护这把锁呢?
pthread_mutex_lock: 竞争和申请锁的过程,就是原子的,不会被影响。
在临界资源对应的临界区中加了锁,但是依旧是多行代码,那既然是多行代码,就不可以被切换了吗?
是可以切换的,因为线程执行的加锁解锁等对应的也是代码,线程在任意代码处都可以被切换,但是线程加锁是原子的(要么拿到了锁,要么没有),不存在拿了一半锁的情况,所以加锁的过程是安全的。
在被切走的时候,是绝对不会有线程进入临界区的,因为每个线程进入临界区都必须先申请锁。当前的锁,被线程A申请走了,即便当前的线程A没有被调度,但是它是被切走了,是带着锁走的。一旦一个线程持有了锁,该线程根本就不担心任何的切换问题。对于其它线程而言,线程A访问临界区也具有一定的原子性,只有没有进入和使用完毕两种状态才对其他线程有意义。因此,尽量不要在临界区内做耗时的事情。
线程加锁和解锁具有原子性,是如何实现的呢?
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
凡是在寄存器中的数据,全都是线程的内部上下文。
多个线程看起来同时在访问寄存器,但是互不影响,
将数据从内存读入寄存器,本质是将数据从共享变成线程私有。
初始化锁有好几种方式:
(1)全局变量静态分配
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *startRoutine(void *args)
{
const char *name = static_cast<const char*>(args);
while(true)
{
// 如果申请不到,就会阻塞进程
pthread_mutex_lock(&mutex);
if(tickets > 0)
{
usleep(10000);
cout << name << " get a ticker: " << tickets << endl;
tickets--;
pthread_mutex_unlock(&mutex);
// 做其它事情
usleep(500);
}
else
{
pthread_mutex_unlock(&mutex);
break;
}
}
}
int main()
{
static pthread_t t1, t2, t3, t4;
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&t2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&t3, nullptr, startRoutine, (void*)"thread 3");
pthread_create(&t4, nullptr, startRoutine, (void*)"thread 4");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
(2)局部变量静态分配
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int tickets = 1000;
void *startRoutine(void *args)
{
pthread_mutex_t *mutex_p = static_cast<pthread_mutex_t*>(args);
while(true)
{
// 如果申请不到,就会阻塞进程
pthread_mutex_lock(mutex_p);
if(tickets > 0)
{
usleep(10000);
cout << "thread: " << pthread_self() << " get a ticket: " << tickets << endl;
tickets--;
pthread_mutex_unlock(mutex_p);
// 做其它事情
usleep(500);
}
else
{
pthread_mutex_unlock(mutex_p);
break;
}
}
}
int main()
{
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t t1, t2, t3, t4;
pthread_create(&t1, nullptr, startRoutine, (void*)&mutex);
pthread_create(&t2, nullptr, startRoutine, (void*)&mutex);
pthread_create(&t3, nullptr, startRoutine, (void*)&mutex);
pthread_create(&t4, nullptr, startRoutine, (void*)&mutex);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
(3)传多种参数初始化
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int tickets = 1000;
#define NAMESIZE 64
typedef struct threadData
{
char name[NAMESIZE];
pthread_mutex_t *mutexp;
}threadData;
void *startRoutine(void *args)
{
threadData *td = static_cast<threadData *>(args);
while(true)
{
// 如果申请不到,就会阻塞进程
pthread_mutex_lock(td->mutexp);
if(tickets > 0)
{
usleep(10000);
cout << td->name << " get a ticket: " << tickets << endl;
tickets--;
pthread_mutex_unlock(td->mutexp);
// 做其它事情
usleep(500);
}
else
{
pthread_mutex_unlock(td->mutexp);
break;
}
}
}
int main()
{
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t t1, t2, t3, t4;
threadData *td1 = new threadData();
strcpy(td1->name, "thread 1");
td1->mutexp = &mutex;
threadData *td2 = new threadData();
strcpy(td2->name, "thread 2");
td2->mutexp = &mutex;
threadData *td3 = new threadData();
strcpy(td3->name, "thread 3");
td3->mutexp = &mutex;
threadData *td4 = new threadData();
strcpy(td4->name, "thread 4");
td4->mutexp = &mutex;
pthread_create(&t1, nullptr, startRoutine, td1);
pthread_create(&t2, nullptr, startRoutine, td2);
pthread_create(&t3, nullptr, startRoutine, td3);
pthread_create(&t4, nullptr, startRoutine, td4);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
delete td1;
delete td2;
delete td3;
delete td4;
return 0;
}
(4)使用init,还有用destroy
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
using namespace std;
// int 票数计数器
int tickets = 10000; // 临界资源,可能会因为共同访问,造成数据不一致问题
pthread_mutex_t mutex;
void *getTickets(void *args)
{
const char *name = static_cast<const char*>(args);
while(true)
{
// 临界区,只需要对临界区加锁,而且加锁的粒度越细越好
pthread_mutex_lock(&mutex);
if(tickets > 0)
{
usleep(1000);
cout << name << " 抢到了票,票的编号: " << tickets << endl;
tickets--;
pthread_mutex_unlock(&mutex);
// other code
usleep(100); // 模拟其它业务逻辑的执行
}
else
{
cout << name << "] 已经放弃抢票了,因为没有了..." << endl;
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
pthread_mutex_init(&mutex, nullptr);
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*) "thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*) "thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*) "thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*) "thread 4");
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid4, nullptr);
cout << n << ":" << strerror(n) << endl;
// 释放锁
pthread_mutex_destroy(&mutex);
return 0;
}
封装锁来实现卖票:
Lock.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_lock, nullptr);
}
void lock()
{
pthread_mutex_lock(&_lock);
}
void unlock()
{
pthread_mutex_unlock(&_lock);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
LockGuard(Mutex *mutex)
: _mutex(mutex)
{
_mutex->lock();
std::cout << "加锁成功..." << std::endl;
}
~LockGuard()
{
_mutex->unlock();
std::cout << "解锁成功..." << std::endl;
}
private:
Mutex *_mutex;
};
mythread.cc:
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include "Lock.hpp"
using namespace std;
int tickets = 1000;
Mutex mymutex;
bool getTickets()
{
bool ret = false;
LockGuard lockGuard(&mymutex); // 局部对象的生命周期是随代码块的
if (tickets > 0)
{
usleep(10000);
cout << "thread: " << pthread_self() << " get a ticket: " << tickets << endl;
tickets--;
ret = true;
}
return ret;
}
void *startRoutine(void *args)
{
const char *name = static_cast<const char *>(args);
while(true)
{
if(!getTickets())
{
break;
}
cout << name << "get tickets success" << endl;
// 做其他事情
usleep(100);
}
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&t2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&t3, nullptr, startRoutine, (void*)"thread 3");
pthread_create(&t4, nullptr, startRoutine, (void*)"thread 4");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
四.重入、线程安全
1.概念
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
2.可重入情况
① 不使用全局变量或静态变量
② 不使用用malloc或者new开辟出的空间
③ 不调用不可重入函数
④ 不返回静态或全局数据,所有数据都有函数的调用者提供
⑤ 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
3.不可重入情况
① 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
② 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
③ 可重入函数体内使用了静态的数据结构
4.线程安全情况
① 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
② 类或者接口对于线程来说都是原子操作
③ 多个线程之间的切换不会导致该接口的执行结果存在二义性
5.线程不安全情况
① 不保护共享变量的函数
② 函数状态随着被调用,状态发生变化的函数
③ 返回指向静态变量指针的函数
④ 调用线程不安全函数的函数
6.可重入与线程安全联系
① 函数是可重入的,那就是线程安全的
② 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
③ 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
7.可重入与线程安全区别
① 可重入函数是线程安全函数的一种
② 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
③ 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
五.死锁
1.概念
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
实现一个出现死锁的代码:
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include "Lock.hpp"
using namespace std;
pthread_mutex_t mutexA = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutexB = PTHREAD_MUTEX_INITIALIZER;
void *startRoutine1(void *args)
{
while (true)
{
pthread_mutex_lock(&mutexA);
sleep(1);
pthread_mutex_lock(&mutexB);
cout << "我是线程1,我的tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexA);
pthread_mutex_unlock(&mutexB);
}
}
void *startRoutine2(void *args)
{
while (true)
{
pthread_mutex_lock(&mutexB);
sleep(1);
pthread_mutex_lock(&mutexA);
cout << "我是线程2, 我的tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexB);
pthread_mutex_unlock(&mutexA);
}
}
int main()
{
pthread_t t1, t2;
pthread_create(&t1, nullptr, startRoutine1, nullptr);
pthread_create(&t2, nullptr, startRoutine2, nullptr);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
上面就出现了死锁问题,线程A和线程B都想要对方的锁,但是对方已经拿到了自己的锁,没法得到对方的锁了,就出现了死锁。所有虽然线程在运行,但是无法打印出结果。
一个锁也可能出现死锁问题:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int cnt = 100;
void *startRoutine(void *args)
{
string name = static_cast<char *>(args);
while(true)
{
pthread_mutex_lock(&mutex);
cout << name << "count : " << cnt-- << endl;
pthread_mutex_lock(&mutex);
sleep(1);
}
}
int main()
{
pthread_t t1;
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 1");
pthread_t t2;
pthread_create(&t2, nullptr, startRoutine, (void*)"thread 2");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
这里不小心将解锁unlock写成了lock,加了两次锁,就造成了死锁。如果加了两次锁,写了个解锁的依旧是死锁,只要加了两次以上就是死锁。
2.死锁四个必要条件
① 互斥条件:一个资源每次只能被一个执行流使用
② 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
③ 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
④ 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
3.避免死锁
① 破坏死锁的四个必要条件
② 加锁顺序一致
③ 避免锁未释放的场景
④ 资源一次性分配
六.线程同步
线程互斥,它是对的,但是它不合理,互斥有可能导致饥饿问题,某些执行流,长时间得不到某种资源。
1.概念
同步:在保证临界资源安全的前提下(互斥等),让线程访问某种资源,具有一定的顺序性,称之为同步。
竞态条件:因为时序问题,而导致程序异常,称之为竞态条件。
2.条件变量
条件变量使 唤醒线程由系统唤醒 -> 让程序员自己唤醒线程
(1)函数
初始化:
① 局部变量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict
attr);
参数一 cond:要初始化的条件变量
参数二 attr:NULL
② 静态全局变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁:
int pthread_cond_destroy(pthread_cond_t *cond)
等待:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
可设置等待时间:
int pthread_cond_timewait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
唤醒等待:
唤醒所有等待的线程:
int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒一个等待的线程:
int pthread_cond_signal(pthread_cond_t *cond);
先简单使用一下上面的函数:
#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// 定义一个条件变量
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
vector<function<void()>> funcs;
void show()
{
cout << "hello show" << endl;
}
void print()
{
cout << "hello print" << endl;
}
// 定义全局退出变量
volatile bool quit = false;
void *waitCommand(void *args)
{
pthread_detach(pthread_self());
while(!quit)
{
// 执行了下面的代码,就证明了某一种条件不就绪,要线程等待
// 让对应的线程进行等待,等待被唤醒
// 三个线程,都会在条件变量下进行排队
pthread_cond_wait(&cond, &mutex);
for(auto &f : funcs)
{
f();
}
}
cout << "thread id: " << pthread_self() << " end..." << endl;
return nullptr;
}
int main()
{
funcs.push_back(show);
funcs.push_back(print);
funcs.push_back([](){
cout << "hello world!" << endl;
});
pthread_cond_init(&cond, nullptr);
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, waitCommand, nullptr);
pthread_create(&t2, nullptr, waitCommand, nullptr);
pthread_create(&t3, nullptr, waitCommand, nullptr);
while(true)
{
char n = 'a';
cout << "请输入你的command(n/q): ";
cin >> n;
if(n == 'n')
{
pthread_cond_signal(&cond);
}
else
{
break;
}
sleep(1);
}
cout << "main thread quit" << endl;
pthread_cond_destroy(&cond);
return 0;
}
(2)为什么pthread_cond_wait需要互斥量
条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
由于解锁和等待不是原子操作。调用解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。
int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后,
会去看条件量是否等于0。等于,就把互斥量变成1,直到cond_ wait返回,并把条件量改成1,把互斥量恢复成原样。
(3)条件变量使用规范
① 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假){
pthread_cond_wait(cond, mutex);}
修改条件
pthread_mutex_unlock(&mutex);
② 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
七.生产者消费者模型
① 消费者有多个时,消费者之间是什么关系呢?
竞争关系 ---- 互斥
② 供应商有多个,供应商之间是什么关系呢?
竞争关系 ---- 互斥
③ 消费者和供应商之间是什么关系呢?
互斥关系,同步关系
生产者要生产就生产完,这样在消费者读取的数据的时候才具有确定性的结果
生产完了再消费,消费完了再生产。
如果进行临界资源的保护,可以提高访问效率
3种关系、2种角色、1个交易场所:
3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥 / 同步)
2种角色:生产者、消费者(线程承担)
1个交易场所:超市:内存种特定的一种内存结构。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型代码实现:
BlockQueue.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
const uint32_t gDefaultCap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap) :cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
public:
// 生产接口
void push(const T &in) // const&: 纯输入
{
// 加锁
// 判断->是否适合生产->bq是否为满->程序员视角的条件->1.满(不生产) 2.不满(生产)
// if(满) 不生产,休眠
// else if(不满) 生产,唤醒消费者
// 解锁
lockQueue();
// 必须使用while,因为下面wait被唤醒后不一定条件是满足的
while (isFull())
{
// before: 等待的时候,会自动释放mutex_
proBlockWait(); // 阻塞等待,等待被唤醒 被唤醒 != 条件被满足
// after: 醒来的时候,是在临界区里醒来的
}
// 条件满足,可以生产
pushCore(in); // 生产完成
// wakeupCon() // 写在这唤醒消费者也可以
unlockQueue();
wakeupCon(); // 唤醒消费者
}
// 消费接口
T pop()
{
// 加锁
// 判断->是否适合消费->bq是否为空->程序员视角的条件->1.空(不消费) 2.有(消费)
// if(空) 不消费,休眠
// else if(有) 消费,唤醒生产者
// 解锁
lockQueue();
if(isEmpty())
{
conBlockWait(); // 阻塞等待,等待被唤醒
}
// 条件满足,可以消费
T tmp = popCore();
unlockQueue();
wakeupPro(); // 唤醒生产者
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
void proBlockWait() // 生产者一定是在临界区中的
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&proCond_, &mutex_);
// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回
}
void conBlockWait() // 阻塞等待,等待被唤醒
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&conCond_, &mutex_);
// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回
}
void wakeupPro() // 唤醒生产者
{
pthread_cond_signal(&proCond_);
}
void wakeupCon() // 唤醒消费者
{
pthread_cond_signal(&conCond_);
}
void pushCore(const T &in)
{
bq_.push(in); // 生产完成
}
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
uint32_t cap_; // 容量
queue<T> bq_; // blockqueue
pthread_mutex_t mutex_; // 保护阻塞队列的互斥锁
pthread_cond_t conCond_; // 让消费者等待的条件变量
pthread_cond_t proCond_; // 让生产者等待的条件变量
};
Task.cpp:
#pragma once
#include <iostream>
#include <string>
using namespace std;
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0')
{}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
BlockTest.cc:
#include "Task.hpp"
#include "BlockQueue.hpp"
#include <ctime>
const std::string ops = "+-*/%" ;
// 并发,一般并不是在临界区中并发的,而是生产前(before blockqueue)和消费后(after blockqueue)对应的并发
void *consumer(void* args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while(true)
{
Task t = bqp->pop(); // 消费任务
int result = t(); // 处理任务 -- 也是要花时间的
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr)
<< " 消费了一个任务: " << one << op << two << "=" << result << endl;
}
}
void *productor(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1. 制作任务 -- 也要花时间
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 2. 生产任务
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr)
<< " 生产了一个任务: " << one << op << two << "=?" << endl;
sleep(1);
}
}
int main()
{
// 定义一个阻塞队列
// 创建两个线程, productor, consumer
// productor consumer
srand((unsigned long)time(nullptr) ^ getpid());
BlockQueue<Task> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
运行结果:
八.POSIX信号量
1. 概念
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
信号量就是一个计数器,描述临界资源1数量的计数器
--P->原子的->申请资源
++V->原子的->归还资源
信号量申请成功了,就一定保证会拥有一部分临界资源吗?
只要信号量申请成功,就一定会获得指定的资源。
申请mutex,只要拿到了锁,就可以获得临界资源,并且不担心被切换。
临界资源可以当成整体,可不可以看成一小部分一小部分呢?
结合场景,一般是可以的
信号量:1
--p:1->0 ---- 加锁
++v: 0->1 ---- 释放锁
这样的叫做二元信号量 == 互斥锁
2.初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
3.销毁信号量
int sem_destroy(sem_t *sem);
4.等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
5.发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
九.基于环形队列的生产消费模型
环形队列中的生产和消费什么时候会访问同一个位置时
当这两个同时指向同一个位置的时候,只有满or空的时候(互斥and同步)
其它时候都指向的是不同的位置(并发)
因此,操作的基本原则:
① 空:消费者不能超过生产者 -->生产者先运行
② 满:生产者不能把消费者套一个圈里,继续再往后写入 -->消费者先运行
谁来保证这个基本原则呢?
信号量来保证。
生产者最关心的是什么资源?
空间
消费者最关心的是什么资源?
数据
怎么保证,不同的线程,访问的是临界资源中不同的区域呢?
通过程序员编码保证。
环形队列的生产消费模型代码实现:
RingQueue.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 10;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gCap) : ringqueue_(cap), pIndex_(0), cIndex_(0)
{
// 生产
sem_init(&roomSem_, 0, ringqueue_.size());
// 消费
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_, nullptr);
pthread_mutex_init(&cmutex_, nullptr);
}
// 生产
void push(const T &in)
{
sem_wait(&roomSem_); //无法被多次的申请
pthread_mutex_lock(&pmutex_);
ringqueue_[pIndex_] = in; //生产的过程
pIndex_++; // 写入位置后移
pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
pthread_mutex_unlock(&pmutex_);
sem_post(&dataSem_);
}
// 消费
T pop()
{
sem_wait(&dataSem_);
pthread_mutex_lock(&cmutex_);
T temp = ringqueue_[cIndex_];
cIndex_++;
cIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
pthread_mutex_unlock(&cmutex_);
sem_post(&roomSem_);
return temp;
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
vector<T> ringqueue_; // 唤醒队列
sem_t roomSem_; // 衡量空间计数器,productor
sem_t dataSem_; // 衡量数据计数器,consumer
uint32_t pIndex_; // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源
uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
RingQueueTest.cc:
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
void *productor(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = rand() % 10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << "生产了一个数据: " << data << endl;
sleep(1);
}
}
void *consumer(void *args)
{
RingQueue<int> *rpq = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = rpq->pop();
cout << "pthread[" << pthread_self() << "]" << "消费一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
RingQueue<int> rq;
pthread_t c1, c2, c3, p1, p2, p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
结果如下:
十.线程池
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
① 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
② 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
③ 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
线程池示例:
1. 创建固定数量线程池,循环从任务队列中获取任务对象
2. 获取到任务对象后,执行任务对象中的任务接口
代码:
ThreadPool.hpp:
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Log.hpp"
#include "Lock.hpp"
using namespace std;
int gThreadNum = 5;
template <class T>
class ThreadPool
{
private:
ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (nullptr == instance) //仅仅是过滤重复的判断
{
LockGuard lockguard(&mutex); //进入代码块,加锁。退出代码块,自动解锁
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
//类内成员, 成员函数,都有默认参数this
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
prctl(PR_SET_NAME, "follower");
while (1)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
//这个任务就被拿到了线程的上下文中
T t = tp->pop();
tp->unlockQueue();
// for debug
int one, two;
char oper;
t.get(&one, &two, &oper);
//规定,所有的任务都必须有一个run方法
Log() << "新线程完成计算任务: " << one << oper << two << "=" << t.run() << "\n";
}
}
void start()
{
assert(!isStart_);
for (int i = 0; i < threadNum_; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
void push(const T &in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool haveTask()
{
return !taskQueue_.empty();
}
void waitForTask()
{
pthread_cond_wait(&cond_, &mutex_);
}
void choiceThreadForHandler()
{
pthread_cond_signal(&cond_);
}
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
private:
bool isStart_;
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *instance;
// const static int a = 100;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
ThreadPoolTest.cc:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <thread>
int main()
{
prctl(PR_SET_NAME, "master");
const string operators = "+/*/%";
unique_ptr<ThreadPool<Task>> tp(ThreadPool<Task>::getInstance());
tp->start();
srand((unsigned long)time(nullptr) ^getpid() ^ pthread_self());
// 派发任务的线程
while(true)
{
int one = rand() % 50;
int two = rand() % 10;
char oper = operators[rand() % operators.size()];
Log() << "主线程派发计算任务: " << one << oper << two << "=?" << "\n";
Task t(one, two, oper);
tp->push(t);
sleep(1);
}
}
Task.hpp:
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0')
{
}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
Lock.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&lock_, nullptr);
}
void lock()
{
pthread_mutex_lock(&lock_);
}
void unlock()
{
pthread_mutex_unlock(&lock_);
}
~Mutex()
{
pthread_mutex_destroy(&lock_);
}
private:
pthread_mutex_t lock_;
};
class LockGuard
{
public:
LockGuard(Mutex *mutex) : mutex_(mutex)
{
mutex_->lock();
std::cout << "加锁成功..." << std::endl;
}
~LockGuard()
{
mutex_->unlock();
std::cout << "解锁成功...." << std::endl;
}
private:
Mutex *mutex_;
};
Log.hpp:
#pragma once
#include <iostream>
#include <ctime>
#include <pthread.h>
std::ostream &Log()
{
std::cout << "Fot Debug |" << " timestamp: " << (uint64_t)time(nullptr) << " | " << " Thread[" << pthread_self() << "] | ";
return std::cout;
}
运行结果:
上面的线程池也使用了单例模式。
十一.STL、智能指针和线程安全
1.STL中的容器不是线程安全的。
原因:
STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶)。
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全,智能指针是否是线程安全的。
2.智能指针是否是线程安全的
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
挂起等待锁:当某个线程没有申请到锁的时候,此时该线程会被挂起,即加入到等待队列等待。当锁被释放的时候,就会被唤醒,重新竞争锁。当临界区运行的时间较长时,我们一般使用挂起等待锁。我们先让线程PCB加入到等待队列中等待,等锁被释放时,再重新申请锁。
之前所学的互斥锁就是挂起等待锁
自旋锁:当某个线程没有申请到锁的时候,该线程不会被挂起,而是每隔一段时间检测锁是否被释放。如果锁被释放了,那就竞争锁;如果没有释放,过一会儿再来检测。如果这里使用挂起等待锁,可能线程刚加入等待队列,锁就被释放了,因此,当临界区运行的时间较短时,我们一般使用自旋锁。
pthread_spin_lock
自旋锁只需要把mutex变成spin
十二.读者写者问题
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
3种关系:写者和写者(互斥),读者和读者(没有关系),读者和写者(互斥关系)
2种角色:读者、写者
1个交易场所:读写场所
读者写者 vs 生产者消费者 本质区别:消费者会把数据拿走,而读者不会
初始化:
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
销毁:
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁:
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 读者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 写者加锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 读写者解锁
设置读写优先:
分为读者优先和写者优先。
读者写者进行操作的时候,读者非常多,频率个别高,写者比较少,频率不高
存在写者饥饿的情况
代码实现的较为简单的读者写者问题:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
int board = 0;
pthread_rwlock_t rw;
using namespace std;
void *reader(void *args)
{
const char *name = static_cast<const char*>(args);
cout << "run..." << endl;
while(true)
{
pthread_rwlock_rdlock(&rw);
cout << "reader read : " << board << "tid: " << pthread_self() << endl;
sleep(10);
pthread_rwlock_unlock(&rw);
}
}
void *writer(void *args)
{
const char *name = static_cast<const char*>(args);
sleep(1);
while(true)
{
pthread_rwlock_wrlock(&rw);
board++;
cout << "I am writer" << endl;
sleep(10);
pthread_rwlock_unlock(&rw);
}
}
int main()
{
pthread_rwlock_init(&rw, nullptr);
pthread_t r1,r2,r3,r4,r5,r6, w;
pthread_create(&r1, nullptr, reader, (void*)"reader");
pthread_create(&r2, nullptr, reader, (void*)"reader");
pthread_create(&r3, nullptr, reader, (void*)"reader");
pthread_create(&r4, nullptr, reader, (void*)"reader");
pthread_create(&r5, nullptr, reader, (void*)"reader");
pthread_create(&r6, nullptr, reader, (void*)"reader");
pthread_create(&w, nullptr, writer, (void*)"writer");
pthread_join(r1, nullptr);
pthread_join(r2, nullptr);
pthread_join(r3, nullptr);
pthread_join(r4, nullptr);
pthread_join(r5, nullptr);
pthread_join(r6, nullptr);
pthread_join(w, nullptr);
pthread_rwlock_destroy(&rw);
return 0;
}
struct rwlock_t
{
int readers = 0;
int who;
mutex_t mutex;
}
读者:
ptjread_rwlock_rdlock()
lock(mutex)
readers++
unlock)mutex
read操作
lock(mutex)
readers--;
unlock(mutex)
写者:
pthread_rwlock_wrlock()
lock(mutex)
while(readers > 0) 释放锁, wait
write操作
unlock(mutex)