C++11线程
创建线程
创建线程需要包含头文件<thread>
,使用线程类std::thread
构造函数
-
默认构造函数
thread() noexcept;
默认构造函数,构造一个线程对象,但它不会启动任何实际的线程执行。 -
任务函数构造函数
template< class Function, class... Args > explicit thread(Function&& fx, Args&&... args);
这是最常用的构造函数。它允许你指定一个任务函数(和其他参数),并创建和启动一个新的线程来执行它。这个任务函数可以是:
- 普通函数
- 类的非静态成员函数
- 类的静态成员函数
- lambda函数
- 仿函数
-
拷贝构造函数:
thread(const thread&) = delete;
线程对象不能被复制。所以这个构造函数被删除。 -
移动构造函数:
thread(thread&& other) noexcept;
允许你将一个线程对象的所有权转移到另一个线程对象。
赋值操作
线程中的资源不能被复制,但是你可以通过移动语义转移其资源。
注意事项
- 先创建的子程序不一定跑的最快(程序运行的速度有很大的偶然性)
- 线程的任务函数返回后,子程序将终止
- 如果主程序(主线程)结束,全部的子线程都会被立即强制终止,不论他们是否完成任务。
下面是使用五种任务函数同时执行线程任务的示例:
#include<iostream>
#include<thread>
#include<string>
#include<windows.h> // sleep
using namespace std;
// 1. 普通函数
void func1(const string& str){
for(int i=0; i<5; i++) {
cout << "func1: " << str << endl;
Sleep(1000);
}
}
//2. 类的非静态成员函数
//3. 类的静态成员函数
class A{
public:
void func2(const string& str){
for(int i=0; i<5; i++) {
cout << "func2: " << str << endl;
Sleep(1000);
}
}
static void func3(const string& str){
for(int i=0; i<5; i++) {
cout << "func3: " << str << endl;
Sleep(1000);
}
}
};
//4. lambda函数
auto func4 = [](const string& str){
for(int i=0; i<5; i++) {
cout << "func4: " << str << endl;
Sleep(1000);
}
};
//5. 仿函数
class B{
public:
void operator()(const string& str){
for(int i=0; i<5; i++){
cout<< "func5: " << str << endl;
Sleep(1000);
}
}
};
int main(){
// 1. 普通函数
thread t1(func1, "普通函数");
// 2. 类的非静态成员函数
A a;
thread t2(&A::func2, &a, "类的非静态成员函数");
// 3. 类的静态成员函数
thread t3(&A::func3, "类的静态成员函数");
// 4. lambda函数
thread t4(func4, "lambda函数");
// 5. 仿函数
B b;
thread t5(b, "仿函数");
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
return 0;
}
线程资源的回收
当我们在C++中创建线程时,有两个非常重要的操作需要注意:等待线程完成 和 释放线程资源。这些操作不仅确保线程正确完成它们的任务,而且确保不浪费系统资源。
等待线程完成:join()
- 使用
join()
时,你告诉主线程或调用线程等待指定的线程完成执行。这样做的一个很好的理由是,当你需要确保线程完成其工作并且所有计算都已完成时,主线程才能继续。 - 如果在子线程运行时不使用
join()
,主线程可能在子线程完成之前结束。这会导致程序的不确定行为。 - 如果尝试对一个线程调用多次
join()
或对已经调用了detach()
的线程使用join()
,程序会抛出异常。
2. 分离线程:detach()
- 有时,你可能希望子线程独立于主线程运行,并允许它自己完成。在这种情况下,你可以使用
detach()
。这告诉C++运行时,当线程完成时,自动清理它的资源。 - 一旦线程被
detach()
,你不能再重新连接它(即不能再次join()
它)。它现在完全独立于创建它的线程。 - 需要注意的是,分离线程可能会导致某些不确定的行为,如果主线程结束,尚未完成的分离线程可能会突然被终止。
3. 判断线程状态:joinable()
- 在某些情况下,你可能想知道是否可以对线程调用
join()
或detach()
。joinable()
就是为此目的而生的。 - 如果
joinable()
返回true
,这意味着线程是活动的,并且可以被join()
或detach()
。如果它返回false
,这通常意味着线程已经被join()
或detach()
了。
示例
考虑一个简单的场景,你希望在一个线程中执行一项任务,并在完成后立即收集其结果。
#include <iostream>
#include <thread>
using namespace std;
void task() {
cout << "Task is running..." << std::endl;
}
int main() {
thread th(task);
if (th.joinable()) {
th.join(); // 等待线程完成
cout << "Task completed!" << std::endl;
}
return 0;
}
但如果你的线程只是进行一些后台操作,你需要使用 detach()
:
thread th(task);
th.detach(); // 使线程独立运行
this::thread的全局函数
在多线程编程中,有时我们需要对正在执行的线程本身进行某些操作或获取一些信息。C++11为此提供了this::thread
命名空间,其中包含一系列实用函数。
1. 获取线程ID: get_id()
每个线程都有一个唯一的ID,这是线程在操作系统中的标识符。get_id()
函数允许我们获取当前正在执行的线程的ID。它返回一个thread::id
类型的对象,这个对象可以用来识别或比较线程。
2. 让线程休眠:sleep_for() 和 sleep_until()
-
sleep_for():此函数用于让当前线程休眠指定的时长。例如,要让线程休眠3秒,可以使用
this_thread::sleep_for(chrono::seconds(3))
。 -
sleep_until():此函数用于让当前线程休眠至指定的时间点。例如,如果你想让线程在某个特定时间(例如午夜)醒来,可以使用这个函数。它接受一个
chrono::time_point
对象作为参数,表示预期的唤醒时间。这两个函数都使用了
chrono
库,它为时间和日期操作提供了强大的工具。
3. 让出CPU时间片:yield()
线程调度是操作系统的职责。当多个线程都尝试访问CPU时,操作系统会决定哪个线程应该先运行。yield()
函数允许线程主动放弃当前的CPU时间片,这给了其他线程机会执行。这可以用于实现更加友好的线程协同操作。
4. 其它线程相关的操作
线程类(std::thread
)还包括一些其他的实用成员函数。例如:
swap()
:此成员函数允许两个线程对象之间交换状态。hardware_concurrency()
: 返回可以并发执行的线程数的建议值。通常,这是系统上的物理或逻辑核心数。
示例:
-
获取线程ID:
#include <iostream> #include <thread> using namespace std; void showThreadId() { this_thread::sleep_for(chrono::seconds(1)); cout << "minor thread ID: " << this_thread::get_id() << endl; } int main() { thread t(showThreadId); cout<< "Main thread ID: " << this_thread::get_id()<< endl; t.join(); return 0; }
-
休眠线程:
#include <iostream> #include <thread> #include <chrono> using namespace std; int main() { cout << "Sleeping for 2 seconds..." << endl; this_thread::sleep_for(chrono::seconds(2)); cout << "Awake now!" << endl; return 0; }
-
主动放弃时间片:
#include <iostream> #include <thread> using namespace std; void work() { for(int i = 0; i < 5; ++i) { this_thread::yield(); cout << "Working..." << endl; } } int main() { thread t(work); t.join(); return 0; }
call_once函数
在多线程编程中,一个经常出现的问题是如何确保某个操作只被执行一次,即使有多个线程试图并发地执行这个操作。这种情况在某些初始化任务中尤为常见,比如当你要确保一个全局对象只被初始化一次时。call_once
函数提供了这样的能力。
基本原理:
call_once
它能确保给定的函数只被调用一次,不论有多少线程尝试去调用。
once_flag:这是 call_once
的关键组件。你可以将其视为一个特殊的标记,它确保关联的函数只被执行一次。尽管名为“标记”,但在内部,它更像是一个轻量级的锁。
call_once 函数:它是如何工作的?
- 当第一个线程尝试调用与
once_flag
关联的函数时,这个线程会执行这个函数,并更新标记的状态。 - 任何后续尝试调用此函数的线程都会看到标记的更新状态,并决定跳过函数调用。
- 这确保了函数不论被多少线程访问,都只执行一次。
#include <iostream>
#include <thread>
#include <mutex> //std::once_flag和std::call_once()函数需要包含这个头文件。
using namespace std;
// once_flag全局变量。本质是取值为0和1的锁。
once_flag globalFlag;
// 这是一个我们只想在多线程环境中执行一次的函数
void uniqueInitialization(int id, const string& message) {
cout << "Thread " << id << ": " << message << endl;
}
// 这是每个线程将执行的函数
void threadFunction(int id) {
// 使用call_once确保uniqueInitialization只被一个线程执行
call_once(globalFlag, uniqueInitialization, id, "I'm the chosen one!");
for (int i = 1; i <= 3; i++) {
cout << "Thread " << id << ": Regular operation #" << i << endl;
this_thread::sleep_for(chrono::milliseconds(1000)); // 暂停1秒
}
}
int main() {
// 启动两个线程
thread t1(threadFunction, 1);
thread t2(threadFunction, 2);
t1.join(); // 等待第一个线程完成
t2.join(); // 等待第二个线程完成
return 0;
}
补充:
call_once
与互斥锁相比有什么优势?- 互斥锁(mutex)是确保在给定时间只有一个线程访问某些代码的方法。但每次访问都需要加锁和解锁,这有性能成本。
- 而
call_once
与once_flag
结合,仅确保代码只执行一次,且通常比反复使用互斥锁更高效。
- 为什么我们不只用全局变量来检查函数是否已执行?
- 简单地检查全局变量在多线程环境中可能不是线程安全的。多个线程可能会看到变量的不同状态,并尝试同时执行函数。
std::call_once
内部使用了合适的同步机制,确保了线程安全。
- 简单地检查全局变量在多线程环境中可能不是线程安全的。多个线程可能会看到变量的不同状态,并尝试同时执行函数。
native_handle函数
C++11定义了线程标准,旨在为开发者提供一种跨平台的方式来使用线程。这样,无论在哪个操作系统或平台上,C++代码都能使用统一的API来进行多线程编程。然而,这种统一的API是建立在各种操作系统的线程实现之上的,而不同的操作系统往往有其特定的线程功能和特性。
为了不失去对底层操作系统线程特性的访问能力,thread
提供了一个名为native_handle()
的函数。这个函数返回的是一个与特定平台或操作系统相关的线程句柄(或标识符)。开发者可以使用这个句柄来调用那些不在C++11线程库里的特定平台的线程功能。
例如,在Linux下,native_handle()
返回的是一个pthread_t
类型的值,这个实际上是POSIX线程的句柄。有了这个句柄,你就可以直接调用POSIX线程API,比如设置线程的调度策略、调整线程的堆栈大小等,这些是C++11线程标准库没有提供的功能。
示例:
#include <iostream>
#include <thread>
#include <pthread.h>
using namespace std;
void func() // 线程任务函数。
{
for (int ii = 1; ii <= 10; ii++)
{
cout << "ii=" << ii << endl;
this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒。
}
}
int main()
{
thread tt(func); // 创建线程。
pthread_t thid = tt.native_handle(); // 获取Linux操作系统原生的线程句柄。
// 将线程设置为分离状态,这意味着当线程结束时,它的资源会被自动回收。
pthread_detach(thid);
tt.join(); // 注意:因为线程已经设置为分离状态,这里调用join会失败。
}
注意事项:
- 使用
native_handle()
意味着你的代码将不再是跨平台的。因为使用native_handle()
返回的句柄进行任何操作都是特性于平台或操作系统的。这就意味者,如果你的代码依赖于native_handle()
它可能无法在其他平台上运行或需要进行修改。 - 一些原生线程操作可能需要额外的操作系统权限。例如更改线程的优先级或调度测罗可能需要高级权限。
线程安全
关于线程安全,我们从基本的概念开始,然后逐步深入其复杂性,并探索如何确保线程安全。
线程安全:线程安全是指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,是程序功能正确完成。
线程不安全:与线程安全相反,线程不安全意味着代码在并发执行时可能不会正常工作。竞态条件(两个或更多线程读取和写入某些共享数据,并且最终的结果取决于线程的执行方式)是线程不安全性的一个常见原因。
首先,我们来看一个线程不安全的例子:
#include <iostream>
#include <thread> // 线程类头文件。
using namespace std;
int aa = 0; // 定义全局变量。
// 普通函数,把全局变量aa加1000000次。
void func() {
for (int ii = 1; ii <= 1000000; ii++)
aa++;
}
int main()
{
// 用普通函数创建线程。
thread t1(func); // 创建线程t1,把全局变量aa加1000000次。
thread t2(func); // 创建线程t2,把全局变量aa加1000000次。
t1.join(); // 回收线程t1的资源。
t2.join(); // 回收线程t2的资源。
cout << "aa=" << aa << endl; // 显示全局变量aa的值。
}
尝试运行上述代码,理论上,如果func
是线程安全的,aa
的值应该是2000000(因为两个线程各自增加了1000000次)。但由于线程安全的问题,实际输出的aa
的值可能小于2000000。
1. 线程共享资源的本质
在一个线程内,所有线程共享该进程的内存空间和其他系统资源。这意味着他们都可以访问和修改同一块内存,例如全局变量、堆上分配的的对象等。
2. 线程冲突与顺序性
当多个线程尝试访问或修改相同的内存位置时,如果不采取合适的措施,就可能会发生冲突,导致不可预知的结果。这就涉及到顺序性的问题。
- 顺序性问题:虽然我们期望程序按照编写的代码顺序执行,但由于多种原因,如编译器优化、处理器优化等,实际执行的顺序可能与编写的代码顺序不同。尽管处理器会确保这种重排不会影响单线程的执行结果,但在多线程环境下可能会导致问题。
3. 数据的可见性
每个处理器都有自己的缓存,当一个线程修改了某个变量的值后,该值首先被存储在这个处理器的缓存中。如果其他线程运行在另一个处理器上并尝试读取这个变量,它可能看到的是过时的值。
- 可见性问题:当一个线程修改了某个共享变量的值,其他线程可能看不到这个修改。这个不同步的现象可能会导致不可预测的行为。
4. 原子性
CPU执行指令:读取指令、读取内存、执行指令、写回内存。
例如:i++
,首先从内存读取i
的值,然后把i
加上1,再把结果写回内存。
原子性是指一个操作或一系列操作要么全部执行且执行的过程不会被任何因素打断,要不就都不执行。
- 原子性问题:如上述
i++
为例,这个操作涉及到三个子步骤,如果在多线程环境下,不同线程在这三个步骤之间交替执行,就可能导致数据不一致。
回到func
函数和aa
变量的例子,为什么func
函数是非线程安全的呢?因为两个线程都在修改同一个全局变量aa
,但没有采取任何措施来确保这个修改的原子性。这就意味着,当一个线程读取aa
的值并试图增加它时,另一个线程也可能正在读取或修改它。这就可能导致所谓的“竞争条件“,最终的结果是不确定的。
5. 如何保证线程安全
确保线程安全的方法有很多,这里我们讨论几种主要的方法:
volatile
关键字: 在C++中,volatile
关键字可以用来防止编译器对变量进行优化,从而确保对该变量的每次读写都是直接从内存中进行的。但它本身并不能确保复杂操作的原子性。- 原子操作和原子类型: C++提供了一系列原子类型(如
std::atomic
),这些类型提供了一种机制,使得对它们的基本操作是原子的。 - 线程同步: 线程同步通常涉及到使用互斥量(mutexes)或其他同步原语,如信号量、条件变量等,来保护对共享资源的访问。例如,
std::mutex
可以确保在同一时间内只有一个线程可以执行被保护的代码块。
线程同步
线程同步并不是指所有线程都同时启动或同时停止。而是指在并发环境下,如何协调多个线程的行为,使它们能够顺利、有效地完成工作,而不出现资源冲突、数据不一致或其他意外问题。
它的主要目的是确保多个线程尝试访问或修改共享资源时,资源的完整性和一致性得以维护。
为什么需要线程同步?考虑一个例子:你和你的朋友正在一起做饭。你负责炒菜,你的朋友负责煮饭。但你们只有一个炉子。显然,你和你的朋友不能同时使用这个炉子。你们需要某种方法来协调,确保一次只有一个人使用炉子。
在计算机中,类似的情况也经常发生。多个线程可能需要访问共享的数据结构、文件或其他资源。如果没有适当的协调机制,就可能出现不一致或错误的结果。
线程同步的工具
为了帮助开发者协调线程的行为,现代编程语言和框架提供了多种同步原语和工具。
- 互斥锁 (互斥量)
- 条件变量
- 生产/消费者模型
下面我们来详细讲解维护线程同步的这三个工具。
1. 互斥锁
互斥锁(Mutex)是一个同步原语,用于解决多线程中的竞态条件问题。其基本操作包括:加锁和解锁。
C++提供了一个互斥库,其中包括了多种互斥锁类型:
- mutex:标准的互斥锁。它只能被锁定一次。使用
lock()
进行加锁,unlock()
进行解锁。 - timed_mutex(带超时机制的互斥锁。):这是一个特殊类型的互斥锁,允许在尝试获取锁时设置超时。
try_lock_for()
和try_lock_until()
是两个特殊的成员函数,它们尝试获取锁,但会在指定的时间段或时间点后超时。 - recursive_mutex(递归互斥锁。):这种互斥锁允许同一个线程多次对它进行加锁。这是有用的,特别是在递归函数中,一个线程可能多次尝试加锁。但必须注意,加锁多少次,就要解锁多少次。
- recursive_timed_mutex(带超时机制的递归互斥锁。):结合了timed_mutex和recursive_mutex的特性。
mutex类
1. 加锁 - lock() 一个互斥锁在任何给定的时间点都处于两种状态之一:锁定或未锁定。
- 当一个线程尝试对未锁定的互斥锁进行锁定时,它会成功,获得该锁并继续执行。
- 如果锁已经被另一个线程锁定,那么尝试锁定该锁的线程将被阻塞,直到锁被释放。
2. 解锁 - unlock() 一旦线程完成对共享资源的访问,它必须释放锁,允许其他线程获得它。只有当前锁的持有者可以释放锁。
3. 尝试加锁 - try_lock() 有时,我们不想让线程在等待锁时被无限期地阻塞。try_lock()
方法提供了一种非阻塞的方式来尝试获取锁。
- 如果锁处于未锁定状态,线程会获得锁并返回true。
- 如果锁已经被另一个线程锁定,那么函数立即返回false。
示例:
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex protectionMutex; // 使用protectionMutex作为互斥锁的名称
void displayMessage(int id, const string& message) {
for (int i = 0; i < 10; i++) {
protectionMutex.lock();
cout << "Message " << i + 1 << " from Thread-" << id << ": " << message << endl;
protectionMutex.unlock();
this_thread::sleep_for(chrono::milliseconds(1000)); // 让线程休眠1秒
}
}
int main() {
thread th1(displayMessage, 1, "Hello from thread 1");
thread th2(displayMessage, 2, "Greetings from thread 2");
thread th3(displayMessage, 3, "Hi from thread 3");
thread th4(displayMessage, 4, "Salutations from thread 4");
thread th5(displayMessage, 5, "Hey from thread 5");
th1.join();
th2.join();
th3.join();
th4.join();
th5.join();
return 0;
}
示例代码中,所有线程都共享资源——标准输出流cout
,为了确保同一时间只有一个线程能够写入到cout
,我们使用互斥锁mutex
。
timed_mutex类
timed_mutex
与常规的mutex
非常相似。但它有一个独特的特性:它允许线程尝试在一定的时间段内或直到某个特定时间点来获取锁。如果在这段时间内线程成功获取了锁,它就可以执行保护的代码段;如果时间过去了而线程还未能获得锁,则线程会继续执行其他任务,而不是无限制地等待。
timed_mutex
的主要成员函数
try_lock_for(duration)
:- 这个函数允许线程尝试获取锁,并在尝试获取时等待一个指定的时间段。
- 如果在这段时间内锁可用(未被其他线程持有或锁定),线程会获取该锁并返回
true
。 - 如果时间段过去了,线程仍然没有获取到锁,则函数返回
false
。
try_lock_until(time_point)
:(意义不大)- 这个函数允许线程尝试获取锁,直到达到指定的时间点。
- 如果在那个时间点之前锁变得可用,线程会获取该锁并返回
true
。 - 如果指定的时间点已到,线程仍然没有获取到锁,则函数返回
false
。
示例:
#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
using namespace std;
timed_mutex tm1;
void task(int id) {
// 尝试在3秒内获取锁
if(tm1.try_lock_for(chrono::seconds(3))) {
cout << "Thread " << id << " acquired the lock." << endl;
this_thread::sleep_for(chrono::seconds(10)); // 模拟工作
//this_thread::sleep_for(chrono::seconds(3)); // 模拟工作
tm1.unlock();
cout << "Thread " << id << " released the lock." << endl;
} else {
cout << "Thread " << id << " failed to acquire the lock in 3 seconds." << endl;
}
}
int main() {
thread t1(task, 1);
thread t2(task, 2);
t1.join();
t2.join();
return 0;
}
recursive_mutex类
当我们在多线程编程中使用互斥锁(mutex)时,一个常见的问题是死锁。死锁是一种情况,当两个或更多的线程都在等待其他线程释放资源,因此它们都永远地被阻塞。
考虑这样一个场景:一个线程已经锁定了一个互斥量,然后它试图再次锁定相同的互斥量。对于普通的mutex,这将导致死锁,因为线程会永远等待自己释放锁——这显然是不可能的。
这正是recursive_mutex
出现的原因。与常规的互斥锁不同,recursive_mutex
允许同一线程多次获取锁。这意味着,如果一个线程已经拥有锁,它可以再次获取锁而不会被阻塞。
示例
#include <iostream>
#include <mutex>
using namespace std;
class MyClass {
recursive_mutex m_recursiveMutex;
// mutex m_recursiveMutex;
public:
void method1() {
m_recursiveMutex.lock();
cout << "Inside method1" << endl;
m_recursiveMutex.unlock();
}
void method2() {
m_recursiveMutex.lock();
cout << "Inside method2" << endl;
method1();
m_recursiveMutex.unlock();
}
};
int main() {
MyClass obj;
// Directly call method1
obj.method1();
// Call method2 which internally calls method1
obj.method2();
return 0;
}
lock_guard类
lock_guard
是一个简化了互斥锁使用的模板类,它提供了更安全、更方便的方法来处理锁定。其主要工作原理是利用了C++的RAII(Resource Acquisition Is Initialization)思想,这意味着我们在对象的生命周期中管理资源。具体到lock_guard
,这意味着在lock_guard
对象的生命周期开始时获取锁,并在其生命周期结束时自动释放锁。
如何工作
- 构造函数:当你创建一个
lock_guard
对象时,它的构造函数会立即锁定给定的互斥锁。这意味着,一旦你创建了这个对象,锁就会被持有,直到该对象被销毁。 - 析构函数:当
lock_guard
对象离开其作用域或被明确销毁时,其析构函数会自动解锁先前锁定的互斥锁。这确保了无论如何(即使在异常情况下)都会释放锁。
示例:
#include <iostream>
#include <thread> // 线程类头文件。
#include<mutex>
using namespace std;
int aa = 0; // 定义全局变量。
mutex m;
// 普通函数,把全局变量aa加1000000次。
void func() {
lock_guard<mutex> lock(m);
for (int ii = 1; ii <= 1000000; ii++)
aa++;
}
int main()
{
// 用普通函数创建线程。
thread t1(func); // 创建线程t1,把全局变量aa加1000000次。
thread t2(func); // 创建线程t2,把全局变量aa加1000000次。
t1.join(); // 回收线程t1的资源。
t2.join(); // 回收线程t2的资源。
cout << "aa=" << aa << endl; // 显示全局变量aa的值。
}
条件变量
条件变量主要被用于线程间的同步。具体来说,它允许一个或多个线程在某个条件为真时进行等待。当这个条件被满足(通常由另一个线程触发),等待的线程会被唤醒并继续它们的执行。
关键点:
- 当某个条件不满足时(如队列为空),线程会被阻塞在条件变量上。
- 当另一个线程改变了该条件(如往队列中添加了数据),它可以使用条件变量来唤醒一个或多个正在等待的线程。
与互斥锁的结合
条件变量并不直接提供资源的互斥访问。为了保证数据的一致性和比米娜竞态条件,条件变量通常与互斥锁结合使用。
流程通常如下:
- 线程锁定一个互斥锁。
- 检查某个条件是否满足,如果不满足则等待条件变量。
- 当被唤醒后,线程会重新检查条件(因为可能存在虚假唤醒)。
- 执行相关的操作。
- 解锁互斥锁。
生产/消费者模型
生产/消费者模型是一个常见的并发模式,其中生产者线程负责生成数据并放入共享缓存(如队列),而消费者线程从队列中提取并处理数据。
关键点:
- 当队列为空时,消费者线程需要等待,直到有数据可用。
- 当队列已满或达到某个阈值时,生产者线程可能需要等待,直到队列中有空间。
条件变量在此模型中扮演着关键角色,因为它们允许消费者线程在没有数据时进行等待,并允许生产者线程在队列已满时进行等待。当生产者生产了新数据,它可以使用条件变量来唤醒等待的消费者线程;同样,当消费者从队列中消费了数据,它可以唤醒等待的生产者线程。
这种模型确保了队列中的数据被有效、及时地生产和消费,同时避免了线程的忙等待,提高了整体的效率。
C++为条件变量提供了两个类:
condition_variable
:支持与普通mutex搭配,效率更高。condition_variable_any
:是一种通用的条件变量,可以与任意mutex搭配(包括用户自定义的锁类型)。
包含头文件<condition_variable
condition_variable类
condition_variable()
这是condition_variable
的默认构造函数,用于创建一个新的条件变量实例。
2 & 3. 拷贝和赋值
这两个函数声明确保condition_variable
对象不能被拷贝或赋值。这是为了保证多线程之间的同步机制不会被意外复制或破坏。
notify_one()
这个函数用于唤醒正在等待此条件变量的一个线程。如果没有线程正在等待,此函数不做任何操作。
notify_all()
此函数唤醒所有正在等待此条件变量的线程。如果没有线程正在等待,此函数不做任何操作。
wait(unique_lock<mutex> lock)
该函数使调用线程阻塞并释放所提供的互斥锁,直到其他线程调用notify_one()
或notify_all()
。一旦得到通知或者发生虚假唤醒,线程再次获取锁并继续执行。
wait(unique_lock<mutex> lock, Pred pred)
这是带有谓词的wait
版本。线程会持续等待,直到谓词返回true
或者条件变量被通知。这对于处理虚假唤醒非常有用。
8 & 9. wait_for(...)
这两个函数允许线程等待一个指定的时间长度。如果在此时间内条件变量被通知或谓词变为真(如果提供了谓词的话),线程就会被唤醒。否则,线程会在指定时间后被唤醒。
10 & 11. wait_until(...)
这两个函数允许线程等待到某个指定的时间点。如果在此时间点之前条件变量被通知或谓词变为真(如果提供了谓词的话),线程就会被唤醒。否则,线程会在指定时间点被唤醒。
unique_lock类
template<class Mutex> class unique_lock
是模板类,模板类参数为互斥锁类型。
unique_lock
和 lock_guard
都是管理锁的辅助类,都是RAII风格(在构造时获得锁,在析构时释放锁)。他们的区别在于:为了为了配合condition_variable
,unique_lock
还有lock()
和unlock()
成员函数。
示例
#include<iostream>
#include<thread>
#include<mutex>
#include<queue>
#include<deque>
#include<string>
#include<condition_variable>
using namespace std;
// 模拟书店的情境,其中有一个生产者线程负责进货,而有多个消费者线程模拟客户购买图书。
// 当书店没有书时,消费者线程等待,当书店有书时,消费者线程购买图书。
// 书店类
class Bookstore{
mutex m_mtx;
condition_variable m_cv;
queue<string> m_books;
public:
// 进货
void stockBooks(int count){
lock_guard<mutex> lock(m_mtx);
for(int i = 0; i < count; ++i){
string book = "Book" + to_string(i+1);
m_books.push(book);
}
m_cv.notify_all(); // 通知所有等待的线程
}
// 购买图书
void purchaseBook(){
while(true){
string book;
{
unique_lock<mutex> lock(m_mtx);
while(m_books.empty()){
m_cv.wait(lock); // 等待条件变量
}
book = m_books.front();
m_books.pop();
}
cout<<"Customer "<<this_thread::get_id()<<" purchase book: "<<book<<endl;
this_thread::sleep_for(chrono::milliseconds(5000)); // 模拟购买图书的时间
}
}
};
int main(){
Bookstore store;
thread c1(&Bookstore::purchaseBook, &store);
thread c2(&Bookstore::purchaseBook, &store);
thread c3(&Bookstore::purchaseBook, &store);
this_thread::sleep_for(chrono::milliseconds(2000)); // 等待消费者线程启动
store.stockBooks(4); // 进货10本书
this_thread::sleep_for(chrono::milliseconds(10000)); // 等待消费者线程购买图书
store.stockBooks(6); // 进货5本书
c1.join();
c2.join();
c3.join();
return 0;
}
在示例代码中
等待条件变量:
while(m_books.empty()){
m_cv.wait(lock);
}
在purchaseBook
函数中,消费者线程会首先检查m_books
队列是否为空。如果是空,线程会等待条件变量,即m_cv.wait(lock)
。这实际上做了两件事:
- 首先,它会释放互斥锁
m_tex
,这样其他线程(例如:进货线程)可以获得锁并进货。 - 然后,当前线程会被阻塞,直到其他线程调用
m_cv
的notify_*
方法。
这样,消费者线程可以有效地在没有书的时候休眠,并且在有新书进货时被唤醒。
通知所有线程:
m_cv.notify_all();
在stockBooks
函数中,每当新的书进货,notify_all
方法就会被调用,它会唤醒所有因m_cv.wait(lock);
被阻塞的消费者线程。这样,它们可以检查m_books
队列,看是否可以购买新书。
注意: 在实际应用中,如果你知道只需要唤醒一个消费者线程,你可以使用notify_one
代替notify_all
,这样只有一个等待的线程会被唤醒。
为什么使用while
循环来检查条件? 在上述代码中,为了检查队列是否为空,我们使用了while
循环,而不是if
语句。这是因为,有时候线程可能会在条件仍然不成立的情况下被唤醒(这称为"虚假唤醒")。使用while
循环可以确保只有当条件真正成立时线程才会继续执行。
也可以使用lambda函数作为谓词传递给
wait
。m_cv.wait(lock, [this](){ return !m_books.empty(); });
这种方法实际上内部也是使用循环来处理可能的虚假唤醒
原子类型 atomic
原子类型,它可能够在不需要额外的锁的情况下,确保在多线程环境下对共享变量的操作是原子的,即不会被其他线程中断,保证数据的完整性。这一特性通常用于构建高性能多线程应用程序,尤其在并发访问计数器、标志位、队列等场景非常有用。
C++11中atomic
模板类(结构体),支持多种原子类型,包括整数类型(如int
,long
,char
,bool
,指针类型),它不支持浮点型或自定义数据类型。
原子操作有底层CPU指令提供支持,这使得它的性能比传统的互斥锁更高,因为它不需要程序员手动管理锁的加锁和解锁。一些 常见的原子操作包括修改、读取、交换、比较并交换等。
头文件:<atomic>
常用函数:
void store(const T val) noexcept
: 用来将val
的值存入原子变量。T load() noexcept
: 用来读取原子变量的值。T fetch_add(const T val) noexcept
: 将原子变量的值与val
相加,然后返回原值。T fetch_sub(const T val) noexcept
: 将原子变量的值减去val
,然后返回原值。T exchange(const T val) noexcept
: 将val
的值存入原子变量,并返回原值。T compare_exchange_strong(T &expect, const T val) noexcept
: 比较原子变量的值和预期值expect
,如果两者相等,将val
存储到原子变量中,函数返回true
。如果两者不相等,使用原子变量的值更新预期值,函数返回false
。这是 CAS(Compare-And-Swap)指令的基础操作,确保原子更新。bool is_lock_free()
: 用来查询某一原子类型的操作是直接使用底层 CPU 指令(返回true
),还是使用编译器内部的锁机制(返回false
)。
#include <iostream>
#include <thread>
#include <atomic>
int main() {
std::atomic<int> counter(0);
// 存储操作:将值存入原子变量
counter.store(42);
// 读取操作:读取原子变量的值
int value = counter.load();
std::cout << "Value after load: " << value << std::endl;
// 原子加法:将原子变量的值与给定值相加,返回原值
int increment = 10;
int original = counter.fetch_add(increment);
std::cout << "Original value: " << original << std::endl;
// 原子减法:将原子变量的值减去给定值,返回原值
int decrement = 5;
original = counter.fetch_sub(decrement);
std::cout << "Original value after subtraction: " << original << std::endl;
// 原子交换:将原子变量的值替换为给定值,返回原值
int newValue = 100;
original = counter.exchange(newValue);
std::cout << "Original value after exchange: " << original << std::endl;
// CAS(Compare-And-Swap)操作
int expected = 42; // 预期值
int newVal = 200; // 新值
bool success = counter.compare_exchange_strong(expected, newVal);
if (success) {
std::cout << "CAS operation successful. New value: " << counter.load() << std::endl;
} else {
std::cout << "CAS operation failed. Current value: " << counter.load() << std::endl;
}
// 检查操作是否是无锁的
bool lockFree = counter.is_lock_free();
if (lockFree) {
std::cout << "The operation is lock-free." << std::endl;
} else {
std::cout << "The operation uses locks." << std::endl;
}
return 0;
}
原子类型的别名
- 原子类型重载了整数操作的各种运算符,因此你可以像操作普通整数一样操作原子变量。
atomic<T>
模板类支持指针类型,但它不表示指向的对象是原子类型,只是指针本身是原子操作。- 原子整型常用作计数器,用于多线程环境下统计事件发生的次数。原子布尔常用作开关,控制某个标志的状态。
- CAS 指令是无锁算法的基础,用于确保多线程环境下的原子更新。