文章目录
- 线程
- 线程互斥
- 锁
- 死锁
- 线程同步
- 生产者消费者模型
- POSIX信号量
- 基于环形队列的生产消费模型
- 线程池
线程
线程是进程内部可以独立运行的最小单位
进程是资源分配的基本单位,线程是调度器调度的基本单位
线程在进程的地址空间内运行
进程内的大部分资源线程是共享的,但也有属于线程自己的独立资源,主要是寄存器和栈(位于共享区)
为什么引入线程?
创建新线程的工作要比创建新进程的工作少得多
线程的切换要比进程的切换所作的工作少得多
线程异常
单个线程出现崩溃会导致整个进程奔溃,因为线程是在进程内部运行的
Linux下的线程
Linux将线程视为一种特殊类型的进程,称作轻量级进程(Lightweight Process,LWP)
Linux并没有提供系统调用来对线程进行操作,对线程操作使用的是第三方库
来段代码感受一下
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
void *routine(void *args)
{
while (true)
{
printf("sub process running ... pid:%d\n", getpid());
sleep(1);
}
}
int main()
{
pthread_t t;
pthread_create(&t, nullptr, routine, nullptr);
while (true)
{
printf("main process running ... pid:%d\n", getpid());
sleep(3);
}
pthread_join(t, nullptr);
return 0;
}
运行结果如下
程序运行起来之后检测可以看到,有两个线程,其中一个pid和lwp相等,这就是主线程,下面的就是新线程
每个进程内部至少有一个线程(主线程)
线程互斥
锁
多个线程在对同一份资源进行访问时,很可能会造成数据紊乱的问题,来看个例子
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
int num = 10000;
void *routine(void *args)
{
while (true)
{
if (num > 0)
{
usleep(100);
printf("%d\n", num);
num--;
}
else
{
break;
}
usleep(1000);
}
}
int main()
{
const int n = 100;
pthread_t t[n];
for (size_t i = 0; i < n; i++)
{
pthread_create(t + i, nullptr, routine, nullptr);
}
for (size_t i = 0; i < n; i++)
{
pthread_join(t[i], nullptr);
}
return 0;
}
创建两个新线程,让这两个线程对同一个数字进行争夺,类似于抢票,没有票就退出
只看最后的几个结果
这里n==0时明显不能进入if语句,但是后面居然打印出负数
这就是多线程对于同一份资源在进行访问时造成的数据紊乱
临界区
临界区是指一段代码,当一个线程(或进程)进入这段代码并开始执行时,其他线程(或进程)不能同时进入执行该段代码的区域。这是为了确保共享资源在同一时间只能被一个线程访问,避免数据竞争和不一致的状态。
临界资源
临界资源是指在多线程或多进程环境中被共享访问的数据、对象或资源。因为多个线程或进程可能同时访问这些资源,所以需要在访问它们时确保数据的一致性和正确性。
如何对临界资源进行保护?------‘锁’
锁的使用可以确保当一个线程在访问共享资源时,其他线程无法同时访问该资源,从而保证数据的一致性和正确性。
对刚才代码稍作修改
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int num = 10000;
void *routine(void *args)
{
while (true)
{
pthread_mutex_lock(&mutex);
if (num > 0)
{
usleep(100);
printf("%d\n", num);
num--;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
break;
}
usleep(1000);
}
}
int main()
{
const int n = 100;
pthread_t t[n];
for (size_t i = 0; i < n; i++)
{
pthread_create(t + i, nullptr, routine, nullptr);
}
for (size_t i = 0; i < n; i++)
{
pthread_join(t[i], nullptr);
}
pthread_mutex_destroy(&mutex);
return 0;
}
这次运行结果就正常了
有了锁就能保证每次只有一个线程能够访问到临界资源
死锁
死锁是指两个或多个线程互相等待对方持有的资源而无法继续执行的情况。
死锁的4个必要条件
- 互斥:一个资源每次只能被一个执行流使用
- 请求与保持:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- …
线程同步
在多个线程互斥的访问临界资源时,只有一个能获得临界资源,其他线程只能忙等待,什么也做不了
两个问题:
- 存在多个线程竞争同一个临界资源的情况,每次都是其中一个线程获得临界资源,造成其他线程的饥饿问题
- 在临界资源没有就绪的时候,线程不停的申请锁—判断临界资源事否就绪—释放锁,一直重复,不合理
如何解决?条件变量
条件变量允许一个或多个线程等待满足某些条件时继续执行。
void push(const T &val)
{
pthread_mutex_lock(&_mtx);
while (is_full())
{
pthread_cond_wait(&_full, &_mtx);
}
_q.push(val);
cout << "Producer Thread [ " << pthread_self() << " ] Produced " << val << endl;
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_empty);
}
截取一段代码举例
要点
- 等待时线程自动挂起,并且释放手中的锁,如果收到唤醒信号,再在被阻塞的位置唤醒,重新去申请锁
- 为什么使用while循环,不用if。要避免伪唤醒,if出现伪唤醒的情况时会直接向下运行,而while循环会再次检查资源事否就绪,防止伪唤醒
生产者消费者模型
核心点
- 生产者在阻塞队列已满时不能继续放入数据
- 消费者在阻塞队列为空时不能继续取出数据
- 生产者与生产者之间互斥
- 消费者与消费者之间互斥
阻塞队列
为了代码的健壮性,使用泛型编程
字段
阻塞队列是个队列,所以封装STL的队列
阻塞队列属于临界区,对临界区访问应该似乎互斥的,所以需要一把锁来控制生产者和消费者的互斥访问
条件变量,不让临界区外的资源忙等待
队列容量
函数
构造、析构
基本的入队列和出队列
判断是否空,是否满
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
#define BLOCK_SIZE 10
template <class T>
class BlockQueue
{
public:
BlockQueue(size_t capacity = BLOCK_SIZE)
: _capacity(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
void push(const T &val)
{
pthread_mutex_lock(&_mtx);
while (is_full())
{
pthread_cond_wait(&_full, &_mtx);
}
_q.push(val);
cout << "Producer Thread [ " << pthread_self() << " ] Produced " << val << endl;
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_empty);
}
void pop()
{
pthread_mutex_lock(&_mtx);
while (isEmpty())
{
pthread_cond_wait(&_empty, &_mtx);
}
T s = _q.front();
_q.pop();
cout << "Consumer Thread [ " << pthread_self() << " ] Consumed " << s << endl;
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_full);
}
~BlockQueue()
{
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
pthread_mutex_destroy(&_mtx);
}
private:
bool isEmpty()
{
return _q.empty();
}
bool is_full()
{
return _capacity == _q.size();
}
private:
queue<T> _q;
size_t _capacity;
pthread_mutex_t _mtx;
pthread_cond_t _full;
pthread_cond_t _empty;
};
POSIX信号量
信号量的两个操作
- p操作:申请资源
- v操作:释放资源
基于环形队列的生产消费模型
允许生产者和消费者在同一个数据结构上进行操作
核心点
- 生产者不能套圈消费者
- 消费者不能超过生产者
- 生产者与生产者之间互斥
- 消费者与消费者之间互斥
环形队列
字段
封装vector
容量
生产者在队列的位置
消费者在队列的位置
消费信号
生产信号
消费锁
生产锁
函数
构造(这里没有实现析构,因为封装的锁和信号量各自的析构已经实现,析构时会自动调用)
入队列和出队列
#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"
#define SIZE 10
template <class T>
class RingQueue
{
public:
RingQueue(size_t size = SIZE)
: _size(size),
_rq(size),
_p_sem(size),
_c_sem(0),
_c_pos(0),
_p_pos(0)
{
}
void push(const T &val)
{
_p_sem.p();
_p_mtx.lock();
_rq[_p_pos++] = val;
_p_pos %= _size;
_p_mtx.unlock();
_c_sem.v();
}
void pop(T *pv)
{
_c_sem.p();
_c_mtx.lock();
*pv = _rq[_c_pos++];
_c_pos %= _size;
_c_mtx.unlock();
_p_sem.v();
}
private:
std::vector<T> _rq;
size_t _size;
size_t _c_pos;
size_t _p_pos;
Sem _c_sem;
Sem _p_sem;
Mutex _c_mtx;
Mutex _p_mtx;
};
要点
- 为什么是先申请资源(p操作),再加锁?申请信号量实际上一种“预定”,先买票后入座,这样可以确保每个进入临界区的线程要访问的资源已经就绪,可以提升效率。
对锁进行封装
#pragma once
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mtx, nullptr);
}
~Mutex()
{
pthread_mutex_destroy(&_mtx);
}
void lock()
{
pthread_mutex_lock(&_mtx);
}
void unlock()
{
pthread_mutex_unlock(&_mtx);
}
private:
pthread_mutex_t _mtx;
};
对信号量进行封装
#pragma once
#include <semaphore.h>
class Sem
{
public:
Sem(int value)
{
sem_init(&_sem, 0, value);
}
~Sem()
{
sem_destroy(&_sem);
}
void p()
{
sem_wait(&_sem);
}
void v()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
线程池
详情代码见:实现简易线程池