目录
- Linux线程概念
- 什么是线程
- 线程的优点
- 线程的缺点
- 线程异常
- 线程用途
- Linux进程VS线程
- 进程和线程
- 总结
- Linux线程控制
- POSIX线程库
- 创建线程
- 线程ID及进程地址空间布局
- 进程和线程ID区别
- 内核层面:pid & tgid
- 线程终止
- 线程等待
- __thread 和 pthread_self()
- 分离线程
- Linux线程互斥
- 进程线程间的互斥相关背景概念
- 互斥量mutex
- 互斥量的接口
- 互斥量实现原理探究
- 可重入VS线程安全
- 概念
- 常见的线程不安全的情况
- 常见的线程安全的情况
- 常见不可重入的情况
- 常见可重入的情况
- 可重入与线程安全联系
- 可重入与线程安全区别
- 常见锁概念
- 死锁
- 死锁四个必要条件
- 避免死锁
- 避免死锁算法
- Linux线程同步
- 前菜理解
- 同步概念与竞态条件
- 条件变量函数
- 为什么 pthread_cond_wait 需要互斥量?(就是wait为什莫需要第二个参数)
- 条件变量使用规范
- 生产者消费者模型
- 为何要使用生产者消费者模型
- 生产者消费者模型优点
- 基于BlockingQueue的生产者消费者模型
- C++ queue模拟阻塞队列的生产消费模型
- ConProd.cc
- BlockQueue.hpp
- lockGuard.hpp
- Task.hpp
- POSIX信号量
- 理解
- 函数
- 基于环形队列的生产消费模型
- 理解
- 代码里面细节理解
- 代码实现
- ringQueue.hpp
- sem.hpp
- testMain.cc
- 线程池
- 理解
- 代码
- lockGuard.hpp
- thread.hpp
- threadPool.hpp
- Task.hpp
- testMain.cc
- log.hpp
- Makefile
- 日志函数
- 线程安全的单例模式
- 懒汉方式实现单例模式(线程安全版本)
- threadPool.hpp(改)
- testMain.cc(改)
- STL,智能指针和线程安全
- 其他常见的各种锁
- 读者写者问题
- 读写锁
- 接口
- 示例代码
Linux线程概念
什么是线程
- 在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
- 一切进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
线程的优点
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
线程的缺点
- 性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型
线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的
同步和调度开销,而可用的资源不变。 - 健壮性降低
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了
不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。 - 缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。 - 编程难度提高
编写与调试一个多线程程序比单线程程序困难得多
线程异常
- 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该
进程内的所有线程也就随即退出
线程用途
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是
多线程运行的一种表现)
Linux进程VS线程
进程和线程
- 进程是资源分配的基本单位
- 线程是调度的基本单位
- 线程共享进程数据,但也拥有自己的一部分数据:
线程ID
一组寄存器
栈
errno
信号屏蔽字
调度优先级
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程
中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境
- 文件描述符表
- 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
进程和线程的关系如下图:
总结
如何看待之前学习的单进程?具有一个线程执行流的进程
Linux线程控制
POSIX线程库
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
- 要使用这些函数库,要通过引入头文<pthread.h>
- 链接这些线程函数库时要使用编译器命令的“-lpthread”选项
创建线程
错误检查:
- 传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误。
- pthreads函数出错时不会设置全局变量errno(而大部分其他POSIX函数会这样做)。而是将错误代码通
过返回值返回 - pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,
建议通过返回值业判定,因为读取返回值要比读取线程内的errno变量的开销更小
#include <iostream>
#include <string>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int x = 100;
void show(const string& name)
{
cout << name << ", pid: " << getpid() << " " << x << "\n"
<< endl;
}
void* threadRun(void* args)
{
const string name = (char*)args;
while (true)
{
show(name);
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid[5];
char name[64];
for (int i = 0; i < 5; i++)
{
snprintf(name, sizeof name, "%s-%d", "thread", i);
pthread_create(tid + i, nullptr, threadRun, (void*)name);
sleep(1); // 缓解传参的bug
}
while (true)
{
cout << "main thread, pid: " << getpid() << endl;
sleep(3);
}
}
线程ID及进程地址空间布局
- thread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID
不是一回事。 - 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要
一个数值来唯一表示该线程。 - pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,
属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。 - 线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID:
pthread_t 到底是什么类型呢?取决于实现。对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质
就是一个进程地址空间上的一个地址。
进程和线程ID区别
内核层面:pid & tgid
内核有一个线程组(Thread Group)的概念。
先看看task_struct结构体的大致结构:
struct task_struct {
...
pid_t pid;
pid_t tgid;
...
struct task_struct *group_leader;
...
struct list_head thread_group;
...
}
属于同一个进程的线程,又被称为线程组,线程组内的每一个线程在内核之中都存在一个进程描述符(task_struct)与之对应。进程描述符结构体中的pid。表面上看对应的是进程ID,其实不然,它对应的是线程ID;进程描述符中的tgid,含义是Thread Group ID(线程组ID),该值对应的是用户层面的进程ID:
- pid即线程id
- tgid叫线程组id。也就是一个进程的id
-
getpid获得的是task_struct中的tgid,即线程组编号。
-
gettid获得的是task_struct中的pid,即线程编号。(也就是LWP/TID)
-
pthread_self获得的是线程在共享区数据的起始地址。
-
因此,在多线程中,使用getpid获取的全都相同,gettid获取的各不相同。
-
getpid和gettid是系统接口,获得的是内核数据;而pthread_self是库函数,获得的是用户级数据。
线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
- 线程可以调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
线程异常退出的问题
1. return 结束线程
2. pthread_exit() 线程库提供的,线程退出
3. 下面另一份代码
void* threadRoutine(void* args)
{
int i = 0;
while (true)
{
cout << "新线程: " << (char*)args << " running ..." << endl;
sleep(1);
if (i++ == 10) break;
}
cout << "new thread quit ..." << endl;
// exit(10); // // 不要调用exit,exit是终止进程的!
// return nullptr;
// pthread_exit((void*)13); // 13会放到pthread_join 中的ret中
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");
void* ret = nullptr;
// 将新线程返回内容放到ret中
pthread_join(tid, (void **)&ret); // 默认会阻塞等待新线程退出
cout << "main thread wait done ... main quit ...\n";
return 0;
}
3. pthread_cancel() 主线程中取消一个线程(需要保证线程已经运行起来了,当不需要这个线程后,主线程取消它)
void* threadRoutine(void* args)
{
while (true)
{
cout << "新线程: " << (char*)args << " running ..." << endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");
int count = 0;
while(true)
{
cout <<"main线程: " << " running ..." << endl;
sleep(1);
if (count++ == 10) break;
}
pthread_cancel(tid);
cout << "pthread cancel: " << tid << endl;
void* ret = nullptr;
pthread_join(tid, (void **)&ret); // 默认会阻塞等待新线程退出
// 线程被取消,join的时候,退出码是-1 #define PTHREAD_CANCELED ((void *) -1)
cout << "main thread wait done ... main quit ...: new thead quit : " << (long long)ret << "\n";
return 0;
}
线程等待
为什么需要线程等待?
- 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
- 创建新的线程不会复用刚才退出线程的地址空间。
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的
终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数
PTHREAD_ CANCELED。 - 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参
数。 - 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
下面代码现象,线程发生除零错误,main线程和新线程全部退出
1. 线程谁先运行与调度器相关
2. 线程一旦异常,都可能导致整个进程整体退出
void* threadRoutine(void* args)
{
while (true)
{
cout << "新线程: " << (char*)args << " running ..." << endl;
sleep(1);
int a = 100;
a /= 0;
}
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void *)"thread 1");
// 3. 线程在创建并执行的时候,线程也是需要进行等待的,如果主线程如果不等待,即会引起类似于进程的僵尸问题,导致内存泄漏
while(true)
{
cout <<"main线程: " << " running ..." << endl;
sleep(1);
}
return 0;
}
///
下面这份代码是为了解决上面线程退出,主线程也会推出的问题,解决办法: 线程等待
下面代码现象,新线程循环10次后新线程退出,主线程执行cout语句后退出
线程等待
void* threadRoutine(void* args)
{
int i = 0;
while (true)
{
cout << "新线程: " << (char*)args << " running ..." << endl;
sleep(1);
if (i++ == 10) break;
}
cout << "new thread quit ..." << endl;
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");
pthread_join(tid, nullptr); // 默认会阻塞等待新线程退出
cout << "main thread wait done ... main quit ...\n";
return 0;
}
///
下面代码主要为了演示使用join线程等待,接受返回内容
线程的输入和返回值问题
void* threadRoutine(void* args)
{
int* data = new int[10];
int i = 0;
while (true)
{
cout << "新线程: " << (char*)args << " running ..." << endl;
sleep(1);
data[i] = i;
if (i++ == 10) break;
}
cout << "new thread quit ..." << endl;
// return (void*)10; // 是返回给谁呢?一般是给main thread, main如何获取到呢?pthread_join
return (void*)data;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");
//void* ret = nullptr;
将新线程返回内容放到ret中
//pthread_join(tid, (void **)&ret); // 默认会阻塞等待新线程退出
//cout << "main thread wait done ... main quit ...: new thead quit : " << (long long)ret << "\n";
int* ret = nullptr;
// 将新线程放回内容放到ret中
pthread_join(tid, (void**)&ret); // 默认会阻塞等待新线程退出
for(int i = 0; i < 10; i++)
{
cout << ret[i] << endl;
}
return 0;
}
__thread 和 pthread_self()
void* threadRoutine(void* args)
{
// pthread_cancel(pthread_self()); // 不推荐: 自己取消自己
while (true)
{
cout << "新线程: " << (char*)args << " running ..." << pthread_self()<< endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");
while (true)
{
cout << "main线程: " << " running ..." << "main tid : " << pthread_self()<< endl;
sleep(1);
}
void* ret = nullptr;
pthread_join(tid, (void**)&ret); // 默认会阻塞等待新线程退出
cout << "main thread wait done ... main quit ...: new thead quit : " << (long long)ret << "\n";
return 0;
}
///
// 全局变量
int g_val = 0; // 现象: g_val 是主线程和新线程共享的,其打印的值和地址是一样的
// _ _thread : 修饰全局变量,带来的结果就是让每一个线程各自拥有一个全局的变量 -- 线程的局部存储
__thread int g_val = 0; // 现象:加了 __thread,主线程和新线程打印的的g_val的值和地址都不一样的了
void* threadRoutine(void* args)
{
///任何一个线程执行了程序替换,相当于这个进程被全部替换其进程内的任何一个线程都不在执行,只执行替换的程序
///所以 程序替换适用于进程
// execl("/bin/ls", "ls", nullptr);
while(true)
{
cout << (char*)args << " : " << g_val << " &: " << &g_val << endl;
g_val++;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");
while(true)
{
cout << "main thread" << " : " << g_val << " &: " << &g_val << endl;
sleep(1);
}
void* ret = nullptr;
pthread_join(tid, (void**)&ret); // 默认会阻塞等待新线程退出
cout << "main thread wait done ... main quit ...: new thead quit : " << (long long)ret << "\n";
return 0;
}
分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放
资源,从而造成系统泄漏。 - 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线
程资源。
__thread int g_val = 0;
void* threadRoutine(void* args)
{
pthread_detach(pthread_self());
while (true)
{
cout << (char*)args << " : " << g_val << " &: " << &g_val << endl;
g_val++;
sleep(1);
break;
}
pthread_exit((void*)11);
}
int main()
{
pthread_t tid; // 本质是一个地址!
pthread_create(&tid, nullptr, threadRoutine, (void *)"thread 1");
while(true)
{
cout << "main thread" << " : " << g_val << " &: " << &g_val << endl;
sleep(1);
break;
}
// 线程分离后,不需要join了(操作系统自己会回收), 如果下面执行了join,会报错
//int n = pthread_join(tid, nullptr);
//cout << "n :" << n << "errstring: " << strerror(n) << endl;
如果线程分离,可能会出现主线程先退出情况
如果主线程先退出,那么进程就会退出,所有没有退出的线程也会被退出
所以主线程通常最后退出,线程分离用于主线程一直不会退出,而其有很多线程,当其某些线程执行完其任务后,
不再需要了,然后可以自己退出
return 0;
}
Linux线程互斥
进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完
成
互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个
线程,其他线程无法获得这种变量。 - 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之
间的交互。 - 多个线程并发的操作共享变量,会带来一些问题。
#include <iostream>
#include <thread>
#include <cerrno>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <cstdio>
using namespace std;
// 如果多线程访问同一个全局变量,并对它进行数据计算,多线程会互相影响吗?
int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题!
void* getTickets(void* args)
{
(void)args;
while (true)
{
if (tickets > 0)
{
usleep(1000);
printf("%p: %d\n", pthread_self(), tickets);
tickets--;
}
else {
break;
}
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3;
// 多线程抢票的逻辑
pthread_create(&t1, nullptr, getTickets, nullptr);
pthread_create(&t2, nullptr, getTickets, nullptr);
pthread_create(&t3, nullptr, getTickets, nullptr);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
}
为什么可能无法获得争取结果?
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- –ticket 操作本身就不是一个原子操作
– 操作并不是原子操作,而是对应三条汇编指令: - load :将共享变量ticket从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1操作
- store :将新值,从寄存器写回共享变量ticket的内存地址
要解决以上问题,需要做到三点: - 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临
界区。 - 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
互斥量的接口
初始化互斥量
初始化互斥量有两种方法:
#include <iostream>
#include <thread>
#include <cerrno>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <cassert>
#include <cstdio>
// 如果多线程访问同一个全局变量,并对它进行数据计算,多线程会互相影响
// 加锁保护:加锁的时候,一定要保证加锁的粒度,越小越好!!(就是加锁和解锁内的代码越少越好,前提是保证程序正确)
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // pthread_mutex_t 就是原生线程库提供的一个数据类型
int tickets = 10000; // 票为临界资源, 加锁和解锁部分代码为临界区
void* getTickets(void* args)
{
(void)args;
while (true)
{
// 加锁
pthread_mutex_lock(&mtx);
if (tickets > 0) // // 1. 判断的本质也是计算的一种
{
usleep(rand() % 1500);
printf("%p: %d\n", pthread_self(), tickets);
tickets--; // // 2. 也可能出现问题
// 解锁
pthread_mutex_unlock(&mtx);
}
else
{
// 解锁
pthread_mutex_unlock(&mtx);
break;
}
}
// 抢完票,其实还需要后续的动作
usleep(rand() % 2000);// 如果没有这行代码,可能会看到一个线程将所有的票抢完
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);
pthread_t t1, t2, t3;
// 多线程抢票的逻辑
pthread_create(&t1, nullptr, getTickets, nullptr);
pthread_create(&t2, nullptr, getTickets, nullptr);
pthread_create(&t3, nullptr, getTickets, nullptr);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
}
销毁互斥量
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,
那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
int tickets = 10000;
#define THREAD_NUM 800
class ThreadData
{
public:
ThreadData(const std::string& n, pthread_mutex_t* pm) :tname(n), pmtx(pm)
{}
public:
std::string tname;
pthread_mutex_t* pmtx;
};
void* getTickets(void* args)
{
ThreadData* td = (ThreadData*)args;
while (true)
{
// 抢票逻辑
int n = pthread_mutex_lock(td->pmtx);
assert(n == 0);
// 临界区
if (tickets > 0)
{
usleep(rand() % 1500);
printf("%s: %d\n", td->tname.c_str(), tickets);
tickets--;
n = pthread_mutex_unlock(td->pmtx);
assert(n == 0);
}
else
{
n = pthread_mutex_unlock(td->pmtx);
assert(n == 0);
break;
}
// 抢完票,其实还需要后续的动作
usleep(rand() % 2000);
}
delete td;
return nullptr;
}
int main()
{
time_t start = time(nullptr);
pthread_mutex_t mtx; // 创建一个锁
pthread_mutex_init(&mtx, nullptr);
srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);
pthread_t t[THREAD_NUM];
// 多线程抢票的逻辑
for (int i = 0; i < THREAD_NUM; i++)
{
std::string name = "thread ";
name += std::to_string(i + 1);
ThreadData* td = new ThreadData(name, &mtx);
pthread_create(t + i, nullptr, getTickets, (void*)td);
}
for (int i = 0; i < THREAD_NUM; i++)
{
pthread_join(t[i], nullptr);
}
pthread_mutex_destroy(&mtx); // 需要销毁
return 0;
}
互斥量实现原理探究
加锁就是串行执行了吗?是的,执行临界区代码一定是串行的!
加锁了之后,线程在临界区中,是否会切换,会有问题吗?
会被切换!
第一次理解:当执行临界区代码【if else部分】时,虽然被切换了,但是不同的是,这时候是 持有一把锁后 被切换的
(锁还没有释放),
所以其他抢票线程要执行临界区代码,也必须先申请锁,锁它是无法申请成功的,所以,也不会让其他线程进入临界区,
就保证了临界区中数据一致性!
要访问临界资源,每一个线程都必须现申请锁,每一个线程都必须先看到同一把锁&&访问它,锁本身是不是就是一种共享资源?
谁来保证锁的安全呢??所以,为了保证锁的安全,申请和释放锁,必须是 原子的!!!自己保证
如何保证??锁是如何实现的?
- 经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单
元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一
个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪
代码改一下
- swap或exchange指令以一条汇编的方式,将内存和CPU内寄存区数据进行交换
如果我们在汇编的角度,只有一条汇编语句,我们就认为该汇编语句的执行是原子的! - 在执行流视角,是如何看待CPU上面的寄存器的?
CPU内部的寄存器,本质,叫做当前执行流的上下文! !寄存器们,空间是被所有的执行流共享的,但是寄存器的内容,是被每-个执行流私有的!上下文!
可重入VS线程安全
概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,
并且没有锁保护的情况下,会出现该问题。 - 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们
称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重
入函数,否则,是不可重入函数
常见的线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生
死锁,因此是不可重入的。
常见锁概念
死锁
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资
源而处于的一种永久等待状态。
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁算法
- 死锁检测算法(了解)
- 银行家算法(了解)
Linux线程同步
前菜理解
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问
题,叫做同步 - 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
条件变量函数
注意: 下面的代码只是演示一下条件变量相关函数如何使用,重在使用,对环境变量的真正理解和应用是在生产者消费者模型
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#define TNUM 4
typedef void (*func_t)(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;
// pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
class ThreadData
{
public:
ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
:name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
{}
public:
std::string name_;
func_t func_;
pthread_mutex_t *pmtx_;
pthread_cond_t *pcond_;
};
void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
// wait一定要在加锁和解锁之间进行wait!
// v2:
pthread_mutex_lock(pmtx);
// if(临界资源是否就绪-- 否) pthread_cond_wait
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " running -- 播放" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void func2(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
if(!quit) std::cout << name << " running -- 下载" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void func3(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " running -- 刷新" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void func4(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " running -- 扫码用户信息" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void *Entry(void *args)
{
ThreadData *td = (ThreadData*)args; //td在每一个线程自己私有的栈空间中保存
td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回!
delete td;
return nullptr;
}
int main()
{
pthread_mutex_t mtx;
pthread_cond_t cond;
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t tids[TNUM];
func_t funcs[TNUM] = {func1, func2, func3, func4};
for (int i = 0; i < TNUM; i++)
{
std::string name = "Thread ";
name += std::to_string(i+1);
ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);
pthread_create(tids + i, nullptr, Entry, (void*)td);
}
sleep(5);
// ctrl new thread
int cnt = 10;
while(cnt)
{
std::cout << "resume thread run code ...." << cnt-- << std::endl;
pthread_cond_signal(&cond); // 一次唤醒一个线程
// pthread_cond_broadcast(&cond); // 一次唤醒一批线程
sleep(1);
}
// 上面循环10次后,所有线程阻塞在pthread_cond_wait(pcond, pmtx);这里,
// 需要先将线程里面while()条件,改为false,然后再唤醒一次,当所有线程再进入while()时就结束
std::cout << "ctrl done" << std::endl;
quit = true;
pthread_cond_broadcast(&cond);
// 等待线程
for(int i = 0; i < TNUM; i++)
{
pthread_join(tids[i], nullptr);
std::cout << "thread: " << tids[i] << "quit" << std::endl;
}
// 释放锁和条件变量
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
为什么 pthread_cond_wait 需要互斥量?(就是wait为什莫需要第二个参数)
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
} p
thread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作。调用解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到
互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远
阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。 - int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后,
会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复
成原样。
条件变量使用规范
等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
pthread_cond_signal即可以放在pthread_mutex_unlock前面也可以放在后面
放前面
问题:当要A唤醒时B线程时,A自己还没有释放锁(unlock),对方B线程原来在条件变量下等,现在变成在锁上等,一但A自己解锁,
唤醒的B线程wait就可以获取锁了;但是又要担心如果多个线程,比如A解锁后,B线程wait没有抢到A释放的锁,没关系B线程继续等待锁
放后面:
问题: 比如A一解锁,在唤醒B线程时wait没有抢到A释放的锁,没关系B线程继续等待锁
综上: 不要担心放前面还是放后面,我们只关心生产和消费这两个行为,不用关心那个线程抢到锁,不管那个线程抢到锁,
反正线程不是执行生产就是执行消费,就是在完成上面两个中某一个动作
生产者消费者模型
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而
通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者
要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队
列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别
在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元
素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程
操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型
ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int myAdd(int x, int y)
{
return x + y;
}
void* consumer(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 获取任务
Task t;
bqueue->pop(&t);
// 完成任务
std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;
// sleep(1);
}
return nullptr;
}
void* productor(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
// int
// int a = 1;
while(true)
{
// 制作任务
int x = rand()%10 + 1;
usleep(rand()%1000);
int y = rand()%5 + 1;
// int x, y;
// std::cout << "Please Enter x: ";
// std::cin >> x;
// std::cout << "Please Enter y: ";
// std::cin >> y;
Task t(x, y, myAdd);
// 生产任务
bqueue->push(t);
// 输出消息
std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
BlockQueue<Task> *bqueue = new BlockQueue<Task>();
pthread_t c[2],p[2];
pthread_create(c, nullptr, consumer, bqueue);
pthread_create(c + 1, nullptr, consumer, bqueue);
pthread_create(p, nullptr, productor, bqueue);
pthread_create(p + 1, nullptr, productor, bqueue);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
delete bqueue;
return 0;
}
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
// #define INI_MTX(mtx) pthread_mutex_init(&mtx, nullptr)
// #define INI_COND(cond) pthread_cond_init()
const int gDefaultCap = 5;
template <class T>
class BlockQueue
{
private:
bool isQueueEmpty()
{
return bq_.size() == 0;
}
bool isQueueFull()
{
return bq_.size() == capacity_;
}
public:
BlockQueue(int capacity = gDefaultCap) : capacity_(capacity)
{
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&Empty_, nullptr);
pthread_cond_init(&Full_, nullptr);
}
void push(const T &in) // 生产者
{
// pthread_mutex_lock(&mtx_);
// //1. 先检测当前的临界资源是否能够满足访问条件
// // pthread_cond_wait: 我们竟然是在临界区中!我是持有锁的!如果我去等待了,锁该怎么办呢?
// // pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
// // 当我被唤醒时,我从哪里醒来呢??从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的啊
// // 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
// // pthread_cond_wait: 但是只要是一个函数,就可能调用失败
// // pthread_cond_wait: 可能存在 伪唤醒 的情况
// while(isQueueFull()) pthread_cond_wait(&Full_, &mtx_);
// //2. 访问临界资源,100%确定,资源是就绪的!
// bq_.push(in);
// // if(bq_.size() >= capacity_/2) pthread_cond_signal(&Empty_);
// pthread_cond_signal(&Empty_);
// pthread_mutex_unlock(&mtx_);
lockGuard lockgrard(&mtx_); // 自动调用构造函数
while (isQueueFull())
pthread_cond_wait(&Full_, &mtx_);
// 2. 访问临界资源,100%确定,资源是就绪的!
bq_.push(in);
pthread_cond_signal(&Empty_);
} // 自动调用lockgrard 析构函数
void pop(T *out)
{
lockGuard lockguard(&mtx_);
// pthread_mutex_lock(&mtx_);
while (isQueueEmpty())
pthread_cond_wait(&Empty_, &mtx_);
*out = bq_.front();
bq_.pop();
pthread_cond_signal(&Full_);
// pthread_mutex_unlock(&mtx_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&Empty_);
pthread_cond_destroy(&Full_);
}
private:
std::queue<T> bq_; // 阻塞队列
int capacity_; // 容量上限
pthread_mutex_t mtx_; // 通过互斥锁保证队列安全
pthread_cond_t Empty_; // 用它来表示bq 是否空的条件
pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
Task.hpp
#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
int operator ()()
{
return func_(x_, y_);
}
public:
int x_;
int y_;
// int type;
func_t func_;
};
POSIX信号量
理解
函数
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于
线程间同步。
基于环形队列的生产消费模型
理解
代码里面细节理解
代码实现
ringQueue.hpp
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
const int g_default_num = 5;
// 多生产多消费的意义在哪里??你不要狭隘的认为,把任务或者数据放在交易场所,就是生产和消费啦
// 将数据或者任务生产前和拿到之后处理,才是最耗费时间的。
// 生产的本质:私有的任务-> 公共空间中
// 消费的本质:公共空间中的任务-> 私有的
// 信号量本质是一把计数器-> 计数器的意义是什么??可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断!
// 申请锁 -> 判断与访问 -> 释放锁 --> 本质是我们并不清楚临界资源的情况!!
// 信号量要提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况!
//
// 多线程
template<class T>
class RingQueue
{
public:
RingQueue(int default_num = g_default_num)
: ring_queue_(default_num),
num_(default_num),
c_step(0),
p_step(0),
space_sem_(default_num),
data_sem_(0)
{
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&plock, nullptr);
}
~RingQueue()
{
pthread_mutex_destroy(&clock);
pthread_mutex_destroy(&plock);
}
// 生产者: 空间资源, 生产者们的临界资源是什么?下标
void push(const T &in)
{
// 先申请信号量(0)
space_sem_.p();
pthread_mutex_lock(&plock); // ?
// 一定是竞争成功的生产者线程 -- 就一个!
ring_queue_[p_step++] = in;
p_step %= num_;
pthread_mutex_unlock(&plock);
data_sem_.v();
}
// 消费者: 数据资源, 消费者们的临界资源是什么?下标
void pop(T *out)
{
data_sem_.p();
pthread_mutex_lock(&clock);
// 一定是竞争成功的消费者线程 -- 就一个!
*out = ring_queue_[c_step++];
c_step %= num_;
pthread_mutex_unlock(&clock);
space_sem_.v();
}
// void debug()
// {
// std::cerr << "size: " << ring_queue_.size() << " num: " << num << std::endl;
// }
private:
std::vector<T> ring_queue_;
int num_;
int c_step; // 消费下标
int p_step; // 生产下标
Sem space_sem_;
Sem data_sem_;
pthread_mutex_t clock;
pthread_mutex_t plock;
};
#endif
sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_
#include <iostream>
#include <semaphore.h>
class Sem
{
public:
Sem(int value)
{
sem_init(&sem_, 0, value);
}
void p()
{
sem_wait(&sem_);
}
void v()
{
sem_post(&sem_);
}
~Sem()
{
sem_destroy(&sem_);
}
private:
sem_t sem_;
};
#endif
testMain.cc
#include "ringQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
void *consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while(true)
{
sleep(1);
int x;
// 1. 从环形队列中获取任务或者数据
rq->pop(&x);
// 2. 进行一定的处理 -- 不要忽略它的时间消耗问题
std::cout << "消费: " << x << " [" << pthread_self() << "]" << std::endl;
}
}
void *productor(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while(true)
{
// sleep(1);
// 1. 构建数据或者任务对象 -- 一般是可以从外部来 -- 不要忽略它的时间消耗问题
int x = rand() % 100 + 1;
std::cout << "生产: " << x << " [" << pthread_self() << "]" << std::endl;
// 2. 推送到环形队列中
rq->push(x); // 完成生产的过程
}
}
// 改成任务派发的版本呢??
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
RingQueue<int> *rq = new RingQueue<int>();
// rq->debug();
pthread_t c[3],p[2];
pthread_create(c, nullptr, consumer, (void*)rq);
pthread_create(c+1, nullptr, consumer, (void*)rq);
pthread_create(c+2, nullptr, consumer, (void*)rq);
pthread_create(p, nullptr, productor, (void*)rq);
pthread_create(p+1, nullptr, productor, (void*)rq);
for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
return 0;
}
线程池
理解
线程池:
* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着
监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:
* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技
术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个
Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情
况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,
出现错误.
* 线程池的种类:
* 线程池示例:
* 1. 创建固定数量线程池,循环从任务队列中获取任务对象,
* 2. 获取到任务对象后,执行任务对象中的任务接口
代码
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
// std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
// std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// typedef std::function<void* (void*)> fun_t;
typedef void *(*fun_t)(void *);
class ThreadData
{
public:
void *args_;
std::string name_;
};
class Thread
{
public:
Thread(int num, fun_t callback, void *args) : func_(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
name_ = nameBuffer;
tdata_.args_ = args;
tdata_.name_ = name_;
}
void start()
{
pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
}
void join()
{
pthread_join(tid_, nullptr);
}
std::string name()
{
return name_;
}
~Thread()
{
}
private:
std::string name_;
fun_t func_;
ThreadData tdata_;
pthread_t tid_;
};
threadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num = 3;
// 本质是: 生产消费模型
template <class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
public:
ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, routine, this));
}
}
// 1. run()
void run()
{
for (auto &iter : threads_)
{
iter->start();
// std::cout << iter->name() << " 启动成功" << std::endl;
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
// 线程池本质也是一个生产消费模型
// void *routine(void *args)
// 消费过程
static void *routine(void *args)
{
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while(tp->isEmpty()) tp->waitCond();
// 读取任务
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
}
task(td->name_);
// lock
// while(task_queue_.empty()) wait();
// 获取任务
// unlock
// 处理任务
}
}
// 2. pushTask()
void pushTask(const T &task)
{
lockGuard lockguard(&lock);
task_queue_.push(task);
pthread_cond_signal(&cond);
}
// test func
// void joins()
// {
// for (auto &iter : threads_)
// {
// iter->join();
// }
// }
~ThreadPool()
{
for (auto &iter : threads_)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread *> threads_;
int num_;
std::queue<T> task_queue_;
//方案2:
// queue1,queue2
// std::queue<T> *p_queue, *c_queue
// p_queue->queue1
// c_queue->queue2
// p_queue -> 生产一批任务之后,swap(p_queue,c_queue),唤醒所有线程/一个线程
// 当消费者处理完毕的时候,你也可以进行swap(p_queue,c_queue)
// 因为我们生产和消费用的是不同的队列,未来我们要进行资源的处理的时候,仅仅是指针
pthread_mutex_t lock;
pthread_cond_t cond;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
void operator ()(const std::string &name)
{
// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
}
public:
int x_;
int y_;
// int type;
func_t func_;
};
testMain.cc
#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
int main()
{
// logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);
srand((unsigned long)time(nullptr) ^ getpid());
ThreadPool<Task> *tp = new ThreadPool<Task>();
tp->run();
while(true)
{
//生产的过程,制作任务的时候,要花时间
int x = rand()%100 + 1;
usleep(7721);
int y = rand()%30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
// std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
// 推送任务到线程池中
tp->pushTask(t);
sleep(1);
}
return 0;
}
log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
// printf("%s%s\n", stdBuffer, logBuffer);
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
Makefile
thread_pool:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:
rm -f thread_pool
日志函数
va_start
原型:void va_start (va_list ap, paramN);
功能:初始化一个可变参数列表
参数:第一个参数 ap 接受一个 va_list 变量,第二个参数 paramN 接受函数传入参数中的可变参数前的最后一个有名字的参数。
va_arg
原型:type va_arg (va_list ap, type);
功能:获取可变参数列表中的下一个参数
参数:第一个参数 ap 是 va_list 变量,第二个参数 type 指明下一个参数的类型。
va_end
原型:void va_end (va_list ap);
功能:结束使用可变参数列表
参数:ap 接受一个结束访问的 va_list 变量。
va_copy ( C++11 only )
原型:void va_copy (va_list dest, va_list src);
功能:产生一个可变参数列表的副本
参数:第一个参数 dest 为目标可变参数列表,第二个参数 src 是源可变参数列表。
#include <stdio.h>
#include <stdarg.h>
double average(int n,...)
{
double sum = 0;
va_list args; // 声明一个 va_list 类型的变量 args
//va_list v2;
va_start(args, n); // 通过调用 va_start 来初始化这个 args
//va_copy(v2, args); // 复制操作
for(int i = 0; i < n; i++)
{
sum += va_arg(args, int); // 通过调用 va_arg 获取下一个参数
}
va_end(args); // 释放 va_list 变量
return sum / n;
}
int main()
{
printf("%.2lf", average(4, 1.0, 2.0, 3.0, 4.0));
return 0;
}
void logMessage(int level, const char* format, ...)
{
char logBuffer[1024];
va_list args;
va_start(args, format);
// vprintf(format, args); // 打印到屏幕里面
vsnprintf(logBuffer, sizeof logBuffer, format, args); // 显示到logBuffer数组里面
va_end(args);
}
int main()
{
logMessage(1, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);
return 0;
}
线程安全的单例模式
转链接
懒汉方式实现单例模式(线程安全版本)
threadPool.hpp(改)
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num = 3;
// 本质是: 生产消费模型
template <class T>
class ThreadPool
{
public:
pthread_mutex_t* getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
private:
ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, routine, this));
}
}
ThreadPool(const ThreadPool<T>& other) = delete;
const ThreadPool<T>& operate = (const ThreadPool<T>& other) = delete;
// 但是未来任何一个线程想要获取单例,都必须调用getThreadPool接口,都需要先加锁,再判断
// 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费的,因为单例只会创建一次
// 为什莫外面又加了一个判断?
// 1. 只有new对象那个判断,当多线程进入存在互斥
// 2. 有多个线程会去调用getThreadPool接口,当单例还没有创建时,进入第一层if里面,线程抢锁,某个线程抢到锁,又进入第二层if里面,创建线程池
// 3. 后面再有线程去调用getThreadPool接口时,第一层if就进不去(因为线程池已经存在),直接就返回获取到对象,不会执行申请和释放锁的行为
public:
// 考虑一下多线程使用单例的过程
ThreadPool<T>* getThreadPool(int num = g_thread_num)
{
if (nullptr == thread_ptr)
{
lockGuard lockguard(&mutex); // // 加锁保证了线程安全
//pthread_mutex_t lock(&mutex);
if (nullptr == thread_ptr)
{
thread_ptr = new ThreadPool<T>(num);
}
//pthread_mutex_t unlock(&mutex);
}
return thread_ptr;
}
// 1. run()
void run()
{
for (auto& iter : threads_)
{
iter->start();
// std::cout << iter->name() << " 启动成功" << std::endl;
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
static void* routine(void* args)
{
ThreadData* td = (ThreadData*)args;
ThreadPool<T>* tp = (ThreadPool<T> *)td->args_;
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while (tp->isEmpty()) tp->waitCond();
// 读取任务
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
}
task(td->name_);
}
}
// 2. pushTask()
void pushTask(const T& task)
{
lockGuard lockguard(&lock);
task_queue_.push(task);
pthread_cond_signal(&cond);
}
~ThreadPool()
{
for (auto& iter : threads_)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread*> threads_;
int num_;
std::queue<T> task_queue_;
static ThreadPool<T>* thread_ptr;
static pthread_mutex_t mutex;
pthread_mutex_t lock;
pthread_cond_t cond;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::thread_ptr = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
注意事项:
- 加锁解锁的位置
- 双重 if 判定, 避免不必要的锁竞争
- volatile关键字防止过度优化
testMain.cc(改)
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
//ThreadPool<Task>* tp = ThreadPool<Task>::getThreadPool(5);
//tp->run();
// 如果单例本身也在被多线程使用呢?
ThreadPool<Task>::getThreadPool()->run();
while (true)
{
//生产的过程,制作任务的时候,要花时间
int x = rand() % 100 + 1;
usleep(7721);
int y = rand() % 30 + 1;
Task t(x, y, [](int x, int y)->int {
return x + y;
});
// std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
// 推送任务到线程池中
tp->pushTask(t);
sleep(1);
}
return 0;
}
STL,智能指针和线程安全
- STL中的容器是否是线程安全的?
不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全. - 智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这
个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行
锁等),当其他线程想要访问数据时,被阻塞挂起。 - 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,
会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。 - CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不
等则失败,失败则重试,一般是一个自旋的过程,即不断重试。 - 自旋锁,公平锁,非公平锁?
读者写者问题
读写锁
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的
机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地
降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
接口
示例代码
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void* reader(void* arg)
{
char* id = (char*)arg;
while (1)
{
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0)
{
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void* writer(void* arg)
{
char* id = (char*)arg;
while (1)
{
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0)
{
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i)
{
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void*)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i)
{
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void*)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const& vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend(); ++it) {
pthread_t const& tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
main : main.cpp
g++ - std = c++11 - Wall - Werror $ ^ -o $@ - lpthread