文章目录
- 一、线程
- 1.C++11线程库的概述
- 2.构造函数
- 3.线程启动: 线程入口函数的传递方式
- 4.线程终止
- 5.线程状态
- 6.获取线程id:get_id()
- 二、互斥锁
- 1.什么是互斥锁
- 2.头文件
- 3.常用函数接口
- 三、lockguard与unique_lock
- 1.lock_guard
- 2.unique_lock
- (1)概念
- (2)函数接口
- 3.原子数据类型:atomic
- 四、条件变量 condition_variable
- 1.条件变量的引入
- 2.头文件
- 3.成员函数
- (1)构造函数
- (2)条件变量的等待:wait
- (3)条件变量的通知
- 五、生产者与消费者模型
- 1.概述
- 2.原理图
- 3.类图
- 4.禁止复制
- 5.代码
- 6.注意事项
- 六、线程池
- 1.线程池基础概述
- 2.原理图
- 3.面向对象线程池类图
- (1)代码
- (2)问题
- ①问题一:任务执行不完,程序就退出
- ②问题二:如何让程序退出
- (3)序列图:画类的函数的调用顺序
- 4.基于对象线程池类图
一、线程
1.C++11线程库的概述
对于早期的C++语言而言,如果想使用线程,我们需要根据不同的平台使用不用的接口,比如:在Linux平台上,我们需要借助POSIX标准的线程库,在windows上需要借助windows线程库,因为C++自己没有独立的线程库。为了解决这个问题,在C++11标准中,做了完善,C++自己引入了与平台无关的线程库,这个库是语言层面的库,这就是C++11线程库,接下来我们就来学习一下C++11线程库的知识点。
2.构造函数
1.头文件
#include <thread>
2.thread线程的构造函数
//1.无参构造
thread() noexcept;
//2.移动构造
thread( thread&& other ) noexcept;
//3.可变参数,类似std::bind。主要用这种创建线程的方式 (会传递线程入口函数)
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
//4.删除了拷贝构造
thread(const thread&) = delete;
第一种形式,可以创建一个空的线程对象。但是线程创建出来之后,需要做任务,单独使用这种形式没有意义。
第二种形式,可以从另外一个线程对象转移过来。传右值:std::move()或临时对象。
第三种形式,传递任何可调用对象的形式,这种形式使用的最为通用;
第四种形式,表明线程对象不能进行复制。
3.线程启动: 线程入口函数的传递方式
1.传递普通函数 或 成员函数(必须加取地址&) 作为线程入口函数
thread th(threadFunc, 1);
//1.成员函数做参数被传递,需要加 &类名::
//2.线程入口函数(创建线程)的参数和bind一样,默认是值传递。
//若要引用传递(要传递的参数是引用),则需要引用包装器std::ref()
thread pro(&Producer::produce, &producer, std::ref(taskque));
//线程的创建1:传递普通函数
//get_id():获取线程id
#include <iostream>
#include <thread>
using std::cout;
using std::endl;
using std::thread;
void threadFunc(int x){
cout << "child thread id === " << std::this_thread::get_id() << endl;
cout << "threadFunc(int x)" << endl;
cout << "x = " << x << endl;
}
int main()
{
cout << "main thread id = " << std::this_thread::get_id() << endl;
thread th(threadFunc, 1); //th是子线程
cout << "child thread id = " << th.get_id() << endl;
th.join();
return 0;
}
2.传递函数指针
typedef void (*pFunc)(int);
pFunc f = threadFunc;
thread th(f, 2);
3.传递函数引用
typedef void (&pFunc)(int);
pFunc f = threadFunc;
thread th(f, 3);
4.传递函数对象
class Example
{
public:
void operator()(int x)
{
cout << "void Example::operator()" << endl;
cout << "x = " << x << endl;
}
};
void test()
{
Example ex;
thread th1(ex, 10);
}
5.传递lambda表达式
void test()
{
int a = 10;
thread th1([&a](int x){
a = 100;
cout << "a = " << a << endl;
cout << "x = " << x << endl;
}, 5);
}
6.传递function对象
(1)function接收lambda表达式
function<void(int)> f = [](int x){
cout << "child thread id === " << std::this_thread::get_id() << endl;
cout << "void threadFunc(int x)" << endl;
cout << "x = " << x << endl;
};
thread th(f, 6); //创建子线程
(2)function接收bind
void threadFunc(int x){
cout << "child thread id === " << std::this_thread::get_id() << endl;
cout << "void threadFunc(int x)" << endl;
cout << "x = " << x << endl << endl;
}
//2.用function接收bind
void test2()
{
cout << "main thread id = " << std::this_thread::get_id() << endl;
function<void()> f = bind(threadFunc, 7);
thread th(f); //创建子线程
thread th2(bind(threadFunc, 8));
cout << "child thread id = " << th.get_id() << endl << endl;
th.join();
th2.join();
}
7.传递bind
thread th2(bind(threadFunc, 7));
4.线程终止
线程的等待:主线程等待子线程的退出
1.join函数
void join(); //线程中的成员函数
thread th(threadFunc, 1);
th.join(); //在主线程中,由子线程的对象调用join()
2.如果主线程没有等待子线程,主线程比子线程先退出,编译会报错:
terminate called without an active exception
5.线程状态
线程类中有一成员函数joinable,可以用来检查线程的状态。如果该函数为true,表示可以使用join()或
者detach()函数来管理线程生命周期。
void test()
{
thread t([]{
cout << "Hello, world!" << endl;
});
if (t.joinable()) {
t.detach();
}
}
void test2()
{
thread th1([]{
cout << "Hello, world!" << endl;
});
if (t.joinable()) {
t.join();
}
}
6.获取线程id:get_id()
(1)通过线程对象调用,获得该线程的tid
thread th(threadFunc, 1);
th.get_id();
(2)获取当前线程的tid
std::this_thread::get_id();
本线程
std::this_thread::
举例:
#include <iostream>
#include <thread>
using std::cout;
using std::endl;
using std::thread;
void threadFunc(int x){
cout << "child thread id === " << std::this_thread::get_id() << endl;
cout << "threadFunc(int x)" << endl;
cout << "x = " << x << endl;
}
int main()
{
cout << "main thread id = " << std::this_thread::get_id() << endl;
thread th(threadFunc, 1); //th是子线程
cout << "child thread id = " << th.get_id() << endl;
th.join();
return 0;
}
二、互斥锁
1.什么是互斥锁
互斥锁是一种同步原语,用于协调多个线程对共享资源的访问。互斥锁的作用是保证同一时刻只有一个
线程可以访问共享资源,其他线程需要等待互斥锁释放后才能访问。在多线程编程中,多个线程可能同
时访问同一个共享资源,如果没有互斥锁的保护,就可能出现数据竞争等问题。
然而,互斥锁的概念并不陌生,在Linux下,POSIX标准中也有互斥锁的概念,这里我们说的互斥锁是
C++11语法层面提出来的概念,是C++语言自身的互斥锁std::mutex,互斥锁只有两种状态:上锁与解
锁。
2.头文件
#include <mutex>
using std::thread;
3.常用函数接口
1.构造函数
constexpr mutex() noexcept; //互斥锁只有无参构造
mutex( const mutex& ) = delete; //拷贝构造被删除
2.上锁
void lock();
3.尝试上锁
bool try_lock();
4.解锁
void unlock();
5.举例:
试下互斥锁。结果远远不到两千万。用两万一般情况下就是两万。
#include <iostream>
#include <thread>
#include <mutex>
using std::cout;
using std::endl;
using std::thread;
using std::mutex;
int gCnt = 0;
void threadFunc()
{
for(size_t idx = 0; idx < 10000; ++idx){
++gCnt;
}
}
int main()
{
thread th1(threadFunc);
thread th2(threadFunc);
th1.join();
th2.join();
cout << gCnt << endl;
return 0;
}
加锁后:执行速度会变慢,但是结果是正确的
#include <iostream>
#include <thread>
#include <mutex>
using std::cout;
using std::endl;
using std::thread;
using std::mutex;
int gCnt = 0;
mutex mtx; //创建互斥锁对象
void threadFunc()
{
for(size_t idx = 0; idx < 10000000; ++idx){
mtx.lock(); //对共享资源加锁
++gCnt;
mtx.unlock(); //解锁
}
}
int main()
{
thread th1(threadFunc);
thread th2(threadFunc);
th1.join();
th2.join();
cout << gCnt << endl;
return 0;
}
6.缺陷:
锁必须成对出现。如果忘记unlock(),否则下次lock()时会造成程序卡住,造成死锁。
7.优化
RAII思想:实现MutexLockGuard类
//RAII思想:实现MutexLockGuard类
#include <iostream>
#include <thread>
#include <mutex>
using std::cout;
using std::endl;
using std::thread;
using std::mutex;
int gCnt = 0;
mutex mtx;//创建互斥锁的对象
/* mutex mtx2; */
//利用RAII的思想:利用栈对象的生命周期管理资源
class MutexLockGuard
{
public:
MutexLockGuard(mutex &metx) //引用
: _mtx(metx)
{
_mtx.lock();//在构造函数中上锁
}
~MutexLockGuard()
{
_mtx.unlock();//在析构函数中进行解锁
}
private:
mutex &_mtx; //引用
};
void threadFunc()
{
for(size_t idx = 0; idx < 10000000; ++idx)
{
//创建MutexLockGuard类的栈对象,生命周期结束时自动析构,解锁
MutexLockGuard autoLock(mtx);
++gCnt;
}
}
int main()
{
thread th1(threadFunc);
thread th2(threadFunc);
th1.join();
th2.join();
cout << "gCnt = " << gCnt << endl;
return 0;
}
实际上,C++11的库中,已经实现了这种思想。也就是下文的lockguard与unique_lock。
三、lockguard与unique_lock
对于std::mutex互斥锁而言,必须手动上锁与解锁且必须成对出现,如果上锁了,但是由于某些原因没
有解锁,就会导致程序一直处于锁定状态而无法解锁,所以C++11中使用的C++之父提出来的思想RAII设计了两种锁std::lock_guard与std::unique_lock,下面就来看看这两种锁的使用
RAII思想:利用栈对象的生命周期来管理资源。构造函数中上锁,析构函数中解锁。
1.lock_guard
1.头文件
#include <mutex>
template< class Mutex >
class lock_guard;
using
2.构造函数
explicit lock_guard( mutex_type& m );
lock_guard(mutex_type &m, std::adopt_lock_t t );
lock_guard( const lock_guard &) = delete;
3.举例
3_guard.cc
4.缺点
解锁比较机械,只能在生命周期结束的时候才能解锁。
不能自己进行解锁。不能自由控制锁的粒度(锁的范围)
2.unique_lock
(1)概念
1.特点:
可以进行手动的解锁,不必像lock_guard那样必须等到生命周期结束时自动回收。就算不写解锁,在离开作用域时也会自动解锁。
2.优点:可以自由控制锁的粒度(锁的范围),提供了更多的功能,更加灵活。
3.缺点:开销比lock_guard更大。
(2)函数接口
1.头文件
#include <mutex>
using std::unique_lock;
2.构造函数
unique_lock() noexcept;//(1)
unique_lock( unique_lock&& other ) noexcept; //(2)主要用这种
explicit unique_lock( mutex_type& m );//(3)
unique_lock( mutex_type& m, std::defer_lock_t t ) noexcept;//(4)
unique_lock( mutex_type& m, std::try_to_lock_t t );//(5)
unique_lock( mutex_type& m, std::adopt_lock_t t );//(6)
3.上锁与解锁
(1)上锁
void lock();
(2)尝试上锁
bool try_lock();
(3)解锁
void unlock();
4.举例
利用了RAII思想,相比较lock_guard而言,更加灵活,可以手动的加锁与解锁,可以配合条件变量进行使用。但是耗费资源。
//RAII:构造函数中上锁,析构函数中解锁
unique_lock<mutex> ul(mtx);
//也可以手动解锁和上锁
ul.unlock();
ul.lock();
3.原子数据类型:atomic
C++11提出了原子数据类型。
这个原子数据类型,虽然底层也是加锁(总线层面)。但是效率比手动加锁要快。
#include <atomic>
using std::atomic;
atomic<int> gCnt(0); //把数据改为原子类型
检测是否加上了锁。内置类型一定可以原子操作,但是自定义类型不一定,需要检测。
true,是原子的。false,不是原子的。
is_lock_free();
atomic底层原理:用的是CAS机制。
先比较,再改值。这两步也是原子的。
(和预期值不相等,说明存在其他线程对该值进行修改)
CAS(Compare-And-Swap,比较并交换)是一种常见的原子操作机制,主要用于实现无锁算法和数据结构。它是一种硬件级别的原子操作,能够在多线程环境下确保对共享数据的安全访问。
《C++并发编程实战》:讲的就是无锁编程。
四、条件变量 condition_variable
1.条件变量的引入
条件变量类是一个同步原语,它可以在同一时间阻塞一个线程或者多个线程,直到其他线程改变了共享变量(条件)并通知。它必须跟互斥锁一起配合使用,条件变量之所以要和互斥锁一起使用,主要是因为互斥锁的一个明显的特点就是它只有两种状态:锁定和非锁定,而条件变量可以通过允许线程阻塞和等待另一个线程发送信号来弥补互斥锁的不足,所以互斥锁和条件变量通常一起使用。条件变量本身不是锁,但它也可以造成线程阻塞,通常与互斥锁配合使用,给多线程提供一个会合的场所。
2.头文件
#include <condition_variable>
class condition_variable;
3.成员函数
(1)构造函数
condition_variable(); //条件变量只有无参构造
condition_variable(const condition_variable&) = delete; //拷贝构造被删除
(2)条件变量的等待:wait
1.函数原型
wait只能用unique_lock作为参数
void wait( std::unique_lock<std::mutex>& lock );
void wait( std::unique_lock<std::mutex>& lock, Predicate pred);
2.wait时的两个步骤
①先释放拿到的锁(解锁),再进行睡眠。
②被唤醒后,先尝试上锁。若失败,wait内部会有循环不断尝试获得锁。直至成功才返回。
3.举例:
//判断任务队列是不满的
while(full()){
//如果是满的,生产者线程需要在对应的条件变量上阻塞等待
//上半部:解锁、阻塞等待
//下半部:被唤醒、不断尝试上锁、上锁成功后返回
_notFull.wait(ul);
}
(3)条件变量的通知
//通知(唤醒)一个等待线程
void notify_one() noexcept;
//通知(唤醒)所有等待线程
void notify_all() noexcept;
对于POSIX线程库的:pthread_cond_siginal是至少唤醒一个
五、生产者与消费者模型
生产者、消费者模型(用面向对象的方式重写)
1.概述
生产者与消费者模型,生活中的典例:排队点饭。
生产者与消费者问题,是一个经典的常规问题,其实也是线程问题。可以把生产者看成是一类线程,消费者看成是另一类线程,也就是C++11线程库中的std::thread。因为生产者与消费者需要从共享的仓库中存数据或者取数据,涉及到对仓库的互斥访问,所以需要加锁,也就是std::mutex。这里我们把仓库用一个任务队列TaskQueue进行封装,提供互斥锁、条件变量的基本数据成员,当然任务队列里面也就是基本操作,队列是不是满的,是不是空的,存数据与取数据等基本操作。
2.原理图
3.类图
TaskQueue任务队列,是用来存放任务的类结构,作为仓库(缓冲区)。
容量是_capacity(_queSize)。
任务队列中,用队列std::queue来存放int型数据,先进先出,先存放进来的任务先执行。
生产者与消费者都需要互斥地访问仓库,所以需要加锁,就需要mutex类的对象。
在生产过程中,如果生产者的生产速度比较快,当queue.size()达到仓库最大容量_capacity时,生产者就要在对应的条件变量_notFull上阻塞等待(睡眠)。
同理,如果消费者消费的速度过快,导致queue.size() == 0,仓库为空,则消费者也要在条件变量_notEmpty上阻塞等待(睡眠)。
4.禁止复制
在C++中,我们讲过两种语义,值语义与对象语义,特别是对于对象语义,表示的是不能进行复制或者赋值,所以我们可以将类的拷贝构造函数与赋值运算符函数删除或者设置为私有。
但因为具有对象语义的有很多,不可能每个类中都将拷贝构造函数与赋值运算符函数都设置为私有,这样写起来比较麻烦。所以可以想象其他的办法,比如使用继承。写一个禁止复制类,让需要删除拷贝构造和赋值运算的类继承禁止赋值类。
class NoCopyable
{
protected:
NoCopyable() { }
~NoCopyable() { }
NoCopyable(const NoCopyable &) = delete;
NoCopyable &operator=(const NoCopyable &) = delete;
};
//继承 禁止复制类,自动删除了拷贝构造和复制
class Derived
: public NoCopyable
{
};
void test()
{
Derived d1;
Derived d2(d1);//error
Derived d3;
d3 = d1;//error
}
5.代码
代码链接:https://github.com/WangEdward1027/Cpp-thread/blob/main/生产者消费者模型/producerConsumer.cpp
//生产者消费者模型
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdlib.h> //随机种子
#include <time.h> //时间
#include <unistd.h>
using std::cout;
using std::endl;
using std::thread;
using std::queue;
using std::mutex;
using std::unique_lock;
using std::condition_variable;
class TaskQueue
{
public:
TaskQueue(size_t capacity);
~TaskQueue();
void push(const int &value);
int pop();
bool empty() const;
bool full() const;
private:
size_t _capacity; //仓库容量
queue<int> _que;
mutex _mutex;
condition_variable _notEmpty;
condition_variable _notFull;
};
TaskQueue::TaskQueue(size_t capacity)
: _capacity(capacity)
, _que()
, _mutex()
, _notEmpty()
, _notFull()
{
}
TaskQueue::~TaskQueue()
{
}
//添加任务与获取任务
void TaskQueue::push(const int &value)
{
unique_lock<mutex> ul(_mutex);
while(full()){
_notFull.wait(ul);
}
_que.push(value);
ul.unlock(); //解锁:控制锁的粒度,只在判满和queue.push()之间加锁
_notEmpty.notify_one();//唤醒消费者
}
int TaskQueue::pop()
{
unique_lock<mutex> ul(_mutex);
while(empty()){
_notEmpty.wait(ul);
}
int tmp = _que.front();
_que.pop();
ul.unlock(); //解锁:控制锁的粒度,只在判空和queue.pop()之间加锁
_notFull.notify_one();//唤醒生产者
return tmp;
}
//判断空还是满
bool TaskQueue::empty() const
{
return 0 == _que.size();
}
bool TaskQueue::full() const
{
return _capacity == _que.size();
}
//生产者:生产商品
class Producer
{
public:
Producer(){}
~Producer(){}
void produce(TaskQueue &taskQueue)
{
//全局作用域解析运算符:确保调用的是全局命名空间中的srand和clock
::srand(::clock());
int cnt = 20;
while(cnt--){
int number = ::rand() % 100; //产生随机数
cout << "produce : " << number << endl;
taskQueue.push(number);
sleep(1);
}
}
};
//消费者:消费商品
class Consumer
{
public:
Consumer(){}
~Consumer(){}
void consume(TaskQueue &taskQueue)
{
int cnt = 10;
while(cnt--){
int number = taskQueue.pop();
cout << "consume : " << number << endl;
sleep(1);
}
}
};
int main()
{
TaskQueue taskque(20);
Producer producer; //生产者
Consumer consumer1; //消费者1
Consumer consumer2; //消费者2
thread pro(&Producer::produce, &producer, std::ref(taskque));
thread con1(&Consumer::consume, &consumer1, std::ref(taskque));
thread con2(&Consumer::consume, &consumer2, std::ref(taskque));
pro.join();
con1.join();
con2.join();
return 0;
}
6.注意事项
1.Producer.h和TaskQueue.h里的类,Producer和TaskQueue如果需要用到对方类的内容:
(1)只需要指针或引用:头文件的循环引用,会出现问题。改为类的前向声明。
(2)成员子对象:只能 #include “那个类的头文件”
2.什么时候用类的前向声明,什么时候引入头文件?
①A类中仅需用到B 类的指针或引用,使用B 类的前向声明。因为指针大小固定为8,不需要知道类的结构。
②A类中有B 类的对象,则必须包含B 类定义的头文件。因为只有知道类的结构,才能确定对象的大小。(对象的大小即类的大小)
六、线程池
线程池代码(用面向对象的方式重写)
1.线程池基础概述
为什么要有线程池?假设没有使用线程池时,一个请求用一个子线程来处理。每来一个请求,都得创建子线程,子线程执行请求,关闭子线程。当请求量(并发)比较大的时候,频繁地创建和关闭子线程,也是有开销的。因此提出线程池,提前开辟好N个子线程,当有任务过来的时候,先放到任务队列中,之后N个子线程从任务队列中获取任务,并执行,这样能大大提高程序的执行效率。其实当任务数大于线程池中子线程的数目的时候,就需要将任务放到缓冲区(队列)里面,所以本质上还是一个生产者消费者模型。
查看线程的状态的命令
ps -elLf | grep xxx
2.原理图
3.面向对象线程池类图
ThreadPool类:
①用 size_t _threadNum表示线程的数目
②线程创建出来后,需要用容器进行存储。考虑使用vector存储:vector<thread> _threads
③任务队列作为仓库,用来存放任务。因为该缓冲区是临界区共享资源,所以需要加锁。并用size_t _queSize表示任务队列的大小。
④用标志位_isExit表示线程池是否结束
⑤doTask:线程池交给工作线程需要执行的任务,具体就是getTask()获取任务。只要任务队列非空就一直获取任务,并执行任务。
(1)代码
代码链接:https://github.com/WangEdward1027/Cpp-thread/tree/main/线程池_面向对象
(2)问题
①问题一:任务执行不完,程序就退出
主线程执行的快。解决,让主线程等待任务队列为空。
多线程编程,同样的代码,每次执行的结果可能不同。
问题一的原因:主线程和子线程都在执行,主线程先执行完,就执行stop函数了,第一步将isExit标志位改为true,导致子线程的while(!isExit)进不去,无法执行任务。导致子线程任务没有执行完。
解决方案:在stop函数中添加代码,只有任务执行不完,就让主线程睡眠。
需要保证任务执行完毕后,主线程才能继续向下执行,否则就卡在此处。
卡在这里,由sleep(1); 改为C++11 thread库的睡觉函数:sleep_for()
②问题二:如何让程序退出
问题二:
process()
让子进程做的慢一点
但是sleep不是个事,想一下更好的优化方案:
ps -elLf | grep ./a.out
会发现主线程和子线程在sleep,阻塞等待。
解决方案一:使用_flag标志和wakeup(里面是notify_all)
为了防止唤醒后,继续在while里出不来。再加一个_flag标志位,跳出while循环。
解决方案二:有同学的方法:使用假任务,往队列里放入nullptr,就不要flag了。
(3)序列图:画类的函数的调用顺序
StarUML,Sequence
4.基于对象线程池类图
1.基于对象的概念
①基于对象编程(OBP,Object based programming):不包括继承和多态
②基于对象的写法,开销比面向对象少,适用于简单的项目
2.该线程池中,基于对象与面向对象的区别
①基于对象,不用继承+纯虚函数,改用bind+function。
②传指针,改为传右值引用。
3.类图
4.代码链接:https://github.com/WangEdward1027/Cpp-thread/tree/main/线程池/线程池_基于对象