C++多线程编程(包含c++20内容)
文章目录
- C++多线程编程(包含c++20内容)
- 线程
- 通过函数指针创建线程
- 通过函数对象创建线程
- 通过lambda创建线程
- 通过成员函数创建线程
- 线程本地存储
- 取消线程
- 自动join线程
- 从线程获得结果
- 原子操作库
- 原子操作
- 原子智能指针
- 原子引用
- 使用原子类型
- 等待原子变量
- 互斥
- 互斥体类
- 自旋锁
- 非定时的互斥体类
- 锁
- 1.lock_guard
- 2.unique_lock
- 3.shared_lock
- 4.一次性获得多个锁
- 5.scoped_lock
- std::call_once
- 互斥体对象使用示例
- 1.以线程安全的方式写入流
- 同步流
- 使用锁
- 2.使用定时锁
- 3.双重检查锁
- 条件变量
- 虚假唤醒
- latch
- barrier
- semaphore
- future
- std::promise 和 std::future
- std::packaged_task
- std::async
- 异常处理
- std::shared_future
- 示例:多线程的Logger类
- 线程池
- 线程设计和最佳实践总结
线程
借助在<thread>
中定义的c++线程库,启动新的线程将变得非常容易。可以通过多种方式指定新线程中需要执行的内容。可以让新线程执行全局函数、函数对象的operator()、lambda
表达式甚至某个类实力的成员函数。
通过函数指针创建线程
像windows上的CreateThread()、beginthread()
等函数、及pthreads
库中的pthread_create()
函数,都要求线程函数只有一个参数。另一方面,标准c++的是std::thread
类使用的函数可以有任意数量的参数。
假设counter()
函数接收两个整数,第一个表示id,第二个表示这个函数要循环的次数,函数是一个循环,这个循环执行给定次数的迭代。在每次迭代中,向标准输出打印一条消息:
void counter(int id, int num)
{
for (int i{0}; i < num; ++i)
{
cout << "counter " << id << " has value " << i << endl;
}
}
可以通过std::thread启动执行此函数的多个线程。可创建t1,使用参数1和6执行counter()
;
thread t1{counter, 1, 6};
thread
类的构造函数时一个可变参数模板,也就是说,可以接收任意数目的参数。第一个参数是新线程需要执行的函数,当线程执行时,将随后的可变数目的参数传递给这个函数。
上面程序的执行结果(记得编译选项要加上-pthread
):
terminate called without an active exception
Aborted
如果一个线程对象表示系统当前或者过去的某个活动线程,则认为他是可以结合的(joinable
)。即使这个线程执行完毕,该线程对象也依然处于可结合状态,默认构造的线程对象是不可结合的。在销毁一个可结合的线程对象前,必须调用其join()
或者detach()
方法。对join()
的调用是阻塞调用,会一直等待线程完成工作为止。调用detach()
时,线程对象会与底层OS线程分离(注意用户自己创建的线程被称为用户级线程,操作系统真正执行体为内核级线程(你买cpu时会看到几核几线程中的几线程就是这个),用户级线程与内核级线程又可以是一对一、多对一、多对多的对应关系)。此外,OS线程将会继续独立运行。调用这两个方法都会导致线程变得不可结合。如果仍可结合的线程对象被销毁,析构函数会调用std::terminate(),这会突然间终止所有线程及应用程序本身。
thread t1{counter, 1, 6};
thread t2{counter, 2, 5};
t1.join();
t2.join();
输出结果:
counter 1 has value 0
counter 1 has value 1
counter 1 has value 2
counter 1 has value 3
counter 1 has value 4
counter 1 has value 5
counter 2 has value 0
counter 2 has value 1
counter 2 has value 2
counter 2 has value 3
counter 2 has value 4
不同系统上的输出结果可能不同,很可能每次运行的结果都不同。这是因为两个线同时执行counter()
函数,所以输出取决于系统中央处理核心的数量及操作系统的线程调度。
默认情况下,从不同线程访问cout
时线程安全的,没有任何数据争用,除非在第一个输出或者输入操作之前调用cout.sync_with_stdio(false)
。然而,即使没有数据争用,来自不同线程的输出仍然可以交错。这意味着,前面的输出可能会混合在一起。这个问题如果在你的电脑运行中产生,那个后续的博文涉及到的线程同步会解决该现象。
ps:线程函数的参数总是被复制到线程的某个内部存储中。通过<functional>
中的std::ref()或者cref()按引用传递参数。
通过函数对象创建线程
不使用函数指针,也可以使用函数对象在线程中执行,前面介绍的方法,给线程传递信息的唯一方式是给函数传递参数。而使用函数对象,可向函数对象类添加成员变量,并可以采用任何方式初始化和使用这些变量。为让Counter
类成为函数对象,需要实现operator()
。operator()
的实现和counter()
函数一样。
class Counter
{
public:
Counter(int id, int num) : m_id{id}, m_num{num} {}
void operator()() const
{
for (int i{0}; i < m_num; ++i)
{
cout << "Counter " << m_id << " has value " << i << endl;
}
}
private:
int m_id;
int m_num;
};
int main()
{
thread t1{Counter{1, 2}};
Counter c{2, 5};
thread t2(ref(c)); // 防止复制,通过引用传递
t1.join();
t2.join();
return 0;
}
通过lambda创建线程
lambda
表达式能很好的用于标准C++线程库。示例:
int main()
{
int id{1};
int num{5};
thread t1{ [id, num] {
for (int i{0}; i < num; ++i) {
cout << "Counter " << id << " has value " << i << endl;
}
}
};
t1.join();
return 0;
}
通过成员函数创建线程
还可以在线程中指定要执行的类的成员函数。
class Request
{
public:
Request(int id) : m_id{id} {}
void process()
{
cout << "process request " << m_id << endl;
}
private:
int m_id;
};
int main()
{
Request req{10};
// 线程t执行Request实例req的process()成员函数
thread t{&Request::process, &req};
t.join();
}
线程本地存储
c++标准支持线程本地存储的概念。通过关键字thread_local
,可以将任何变量标记为线程本地数据,即每个线程都有这个变量的独立副本,而且这个变量能在线程的整个生命周期中持续存在。对于每个线程,该变量正好初始化一次。 例如,在下面代码中定义了两个全局变量;每个线程都共享唯一的k副本,且每个线程都有自己的n副本:
#include <iostream>
#include <thread>
#include <format>
using namespace std;
int k;
thread_local int n;
// c++20引入format,可以搜索本人博客相关内容学习
void threadFunc(int id)
{
cout << format("Thread {} : k = {}, n = {}", id, k, n) << endl;
++n;
++k;
}
int main()
{
thread t1{threadFunc, 1};
t1.join();
thread t2{threadFunc, 2};
t2.join();
}
输出结果:
Thread 1 : k = 0, n = 0
Thread 2 : k = 1, n = 0
从输出结果来看,所有线程只共享一个k实例,而每一个线程都有自己的n的拷贝。
注意:如果thread_local
变量在函数作用域内声明,那么这个变量的行为和声明为静态变量是一致的。只不过每个线程都有自己独立的副本,而且不论这个函数在线程中调用多少次。每个线程仅初始化这个变量一次。
取消线程
c++标准没有包含在一个线程中取消另外一个已运行线程的任何机制。一种解决方案是使用c++20
的jthread
类,如果不能选择这种,那么实现这一目标最好方法是提供两个线程都支持的某种通信机制。最简单的机制是提供一个共享变量,目标线程定期检查这个变量,判断是否应该终止。其他线程可以设置这个共享变量,其中至少有一个线程向共享变量写入内容。
自动join线程
如前面所述,如果销毁了仍然可以joinable
的线程实例,c++运行时会调用std::terminate()
来终止应用程序。c++20引入了std::jthread
,同样在<thread>
中定义。jthread
实际上等同于thread
,除了:
- 在析构函数中自动
join
- 支持协作式取消
他被称为协作式取消,因为支持取消的线程需要定期检查他是否需要取消自己。需要引入两个关键类,他们都定义在<stop_token>
中。
- std::stop_token:支持主动检查取消的请求。一个可取消的线程需要定期在
stop_token
上调用stop_requested()
,以确定是否需要停止他的工作。stop_token
可以和condition_variable_any
一起使用,这样线程在需要停止时就可以被唤醒。 - std::stop_source:用于请求线程取消执行。通过调用
stop_souce
上的request_stop()
方法来完成。如果stop_souce
被用于请求取消,那么该停止请求对所有相关的stop_souce
和stop_token
都可见,stop_requested()
方法可以用来检查是否已经请求了停止。
下面的代码创建了一个jthread
来执行给定的lambda
表达式。传递给jthread
的可调用对象可以用stop_token
作为第一个参数。可调用对象的主题可以使用stop_token
来确定它是否需要取消自己。
jthread job{[](stop_token token) {
while (!token.stop_requested()) {
// ...
}
}};
在另一个线程中,可以请求这个线程取消自己
job.request_stop();
要从jthread
中直接访问stop_token
和stop_source
。
从线程获得结果
如果想要获得线程执行后的结果,一种方法是向线程传入指向结果变量的指针或者引用,线程将结果保存在其中。另外一种方法是将结果存储在函数对象的类成员变量中,线程执行结束后可获得结果值。使用std::ref()
,将函数对象按引用传递给thread
构造函数时,这才能生效。
还有更简单的方式可以获得线程的结果,future
。通过future
也更能方便的处理线程中发生的错误。
原子操作库
原子类型允许原子访问,这意味着不需要额外的同步机制就可以执行并发的读写操作。没有原子操作,递增变量就不是线程安全的,因为编译器首先将值从内存加载到寄存器中,递增后再把结果保存回内存。另一个线程就可能在这个递增的操作执行过程中访问内存,导致数据争用。
为使这个线程安全且不显示的使用任何线程同步机制,可以使用std::atomic
类型。例如:
atomic<int>counter {0};
++counter;
这些原子类型都定义在<atomic>
中。c++标准为所有基本类型定义了命名的整型原子类型:
可以使用原子类型,而不是显示的使用任何同步机制。但在底层,某些类型的原子操作可能使用同步机制如互斥对象。如果目标硬件缺少以原子方式执行操作的指令,则可能会发生这种情况。可在原子类型上使用is_lock_free()
方法来查询它是否支持无锁操作,所谓无锁操作,是指在运行时,底层没有显示的同步机制。
可将std::atomic
类模板与所有类型一起使用,并非仅限于整数类型。例如,可创建atomic<double>
或者atomic<MyType>
,但这要求MyType
具有is_trivially_copy
特点。底层可能需要显示的同步机制,具体取决于指定类型的大小。在下例中,Foo
和Bar
具有is_trivially_copy
特点。即std::is_trivially_copyable_v
都等于true
。但是atomic<Foo>
并非无锁操作,而atomic<Bar>
是无锁操作。
class Foo
{
private:
int mArray[123];
};
class Bar
{
private:
int mInt;
};
int main()
{
atomic<Foo> f;
// f.is_lock_free()会报错
cout << is_trivially_copyable_v<Foo> << " " << f.is_lock_free() << endl;
atomic<Bar> b;
cout << is_trivially_copyable_v<Bar> << " " << b.is_lock_free() << endl;
}
在多线程中访问一段数据的时候,原子也可以解决内存排序、编译优化等问题。基本上不使用原子操作或者显示的的同步机制,就不可能安全的在多线程中读写同一段数据。
原子操作
原子操作示例:
bool atomic<T>::compare_exchange_strong(T& expected, T desired);
这个操作以原子方式实现了一下的逻辑,伪代码如下:
if (*this == expected)
{
*this = desired;
return true;
} else {
expected = *this;
return false;
}
这个逻辑乍一看有点陌生,但是编写无锁并发数据结构的关键组件。无锁并发数据结构允许不使用任何同步机制来操作数据。
另一个例子是atomic<T>::fetch_add()
。这个操作获取该原子类型的当前值,将给定的递增值添加到这个原子值,然后返回未递增的原始值。
atomic<int> value{10};
cout << "Value = " << value << endl;
int fetched{value.fetch_add(2)};
cout << "Fetched = " << fetched << endl;
cout << "Value =" << value << endl;
输出结果:
Value = 10
Fetched = 10
Value =12
注意: 在c++20之前,对浮点数类型使用atomic
,例如atomic<float>
和atomic<double>
提供了原子的读写操作,但是并没有提供原子的算数操作。C++20为浮点原子类型添加了fetch_add()
和fetch_sub()
的支持。
大部分原子操作可接收一个额外参数,用于指定想要的内存顺序。例如:
T atomic<T>::fetch_add(T value, memory_order = memory_order_seq_cst);
可改变默认的memory_order
。c++标准提供了memory_order_relaxed、 memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel
和memory_order_seq_cst
。这些都定义在std名称空间中。然而,很少有必要使用默认之外的顺序。因为稍有使用不当,就可能会再次引入争用条件, 或这其他线程相关难以跟踪的问题。
原子智能指针
C++20通过<memory>
引入对atomic<std::shared_ptr<T>>
的支持。在早期的c++
标准中不允许这样。因为shared_ptr
不可拷贝。shared_ptr
中存储引用计数的控制块一直是线程安全的。这保证所指向的对象只被删除一次。然而。shared_ptr
中其他内容都不是线程安全的。如果在shared_ptr
实例上调用非const
方法,那么在多线程中同时使用一个shared_ptr
实例,会导致数据竞争。另一方面,当在多个线程中使用同一个atomic<shared_ptr<T>>
示例时,即使调用非const
的shared_ptr
方法也是线程安全的。请注意,在shared_ptr
所指向的对象调用非const
方法仍然是线程不安全的。
原子引用
C++20也引入了std::atomic_ref
。即使使用相同的接口,他基本上与std::atomic
相同,但是它使用的是引用。而atomic
总是拷贝提供给它的值。atomic_ref
实例本身的生命周期应该比它引用的对象短。atomic_ref
是可拷贝的。可以创建任意多个atomic_ref
实例来引用同一个对象。如果atomic_ref
实例引用某个对象,则不允许在没有经过其中一个atomic_ref
实例的情况下接触该对象。atomic_ref<T>
类模板可以与任何简单的可复制类型T一起使用,就像std::atomic
。此外,标准库还提供了一下内容
- 指针类型的偏特化,支持
fetch_add()和fetch_sub()
- 整数类型的全特化,支持
fetch_add()、fetch_sub()、fetch_and()和fetch_xor()
- 浮点类型的全特化,支持
fetch_add()和fetch_sub()
使用原子类型
假设有下面一个名为increment()
函数,它在一个循环中递增一个通过引用参数传入的整数值。这段代码使用std::this_thread::sleep_for()
在每个循环中引入一小段延迟。sleep_for()
的参数是std::chrono::duration
#include <atomic>
#include <format>
#include <iostream>
#include <thread>
#include <vector>
using namespace std;
void increment(int &counter)
{
for (int i{0}; i < 100; ++i)
{
++counter;
this_thread::sleep_for(1ms);
}
}
int main()
{
int counter{0};
vector<thread> threads;
for (int i{0}; i < 10; ++i)
{
threads.push_back(thread{increment, ref(counter)});
}
for (auto &t : threads)
{
t.join();
}
cout << "result = " << counter << endl;
}
输出结果:
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 1000
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 1000
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 999
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 998
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 1000
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 1000
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
result = 1000
wy@DESKTOP-PBM4A2J:~/code_c++$ ./a.out
上述结果中,10个线程,每个线程加100次,本应该输出1000,但实际上多次执行,有一些结果小于1000,但是值不固定。正是因为多个线程竞争的结果。下面使用原子类型解决该问题。
void increment(atomic<int> &counter) // 修改处
{
for (int i{0}; i < 100; ++i)
{
++counter;
this_thread::sleep_for(1ms);
}
}
int main()
{
atomic<int> counter{0}; // 修改处
vector<thread> threads;
for (int i{0}; i < 10; ++i)
{
threads.push_back(thread{increment, ref(counter)});
}
for (auto &t : threads)
{
t.join();
}
cout << "result = " << counter << endl;
}
接下来无论执行多少次都是1000了,如果本例在你的电脑上即使不用原子操作,无论执行多少次都是正确值,那么请你把你的线程每次加100次改成1000、10000次!
不用在代码中显式的添加任何同步机制,就得到线程安全且没有争用条件程序。因为对原子类型支持++counter
操作会在原子事务中加载值、递增值、并保存,这个过程是不会被打断的。
通过C++20的atomic_ref
,可以像下面这样解决数据竞争问题:
void increment(int &counter)
{
atomic_ref<int> atomicCounter{counter}; // 修改处
for (int i{0}; i < 100; ++i)
{
++atomicCounter; // 修改处
this_thread::sleep_for(1ms);
}
}
int main()
{
int counter{0};
vector<thread> threads;
for (int i{0}; i < 10; ++i)
{
threads.push_back(thread{increment, ref(counter)});
}
for (auto &t : threads)
{
t.join();
}
cout << "result = " << counter << endl;
}
虽然上述代码可以解决问题,但是会降低性能。应该最小化同步次数,包括原子操作和显式同步。可改成下面的方式:
void increment(atomic<int> &counter)
{
int result{0};
for (int i{0}; i < 100; ++i)
{
++result;
this_thread::sleep_for(1ms);
}
counter += result;
}
等待原子变量
c++20在std::atomic
和atomic_ref
中添加了下表所示方法,用来有效的等待原子变量被修改
方法 | 描述 |
---|---|
wait(Value) | 阻塞线程,直到另一个线程调用notify_one()或者notify_all()并且原子变量的值已经被修改,即不等于oldValue; |
notify_one() | 唤醒一个阻塞在wait()调用上的线程 |
notify_all() | 唤醒所有阻塞在wait()调用上的线程 |
示例:
int main()
{
atomic<int> value{0};
thread job{[&value] {
cout << "Thread starts waiting, value = " << value << endl;
value.wait(0);
cout << "Thread wakes up, value = " << value << endl;
}};
this_thread::sleep_for(2s);
cout << "Main thread is going to change value to 1" << endl;
value = 1;
value.notify_all();
job.join();
}
输出结果:
Thread starts waiting, value = 0
Main thread is going to change value to 1
Thread wakes up, value = 1
互斥
如果编写的多线程应用程序,那么必须分外留意操作顺序。如果线程读写共享数据,就可能发生问题。可采用许多方法来避免这个问题,例如绝不在线程之间共享数据。然而,如果不能避免数据共享,那么必须提供机制,使一次只有一个线程能更改数据。
布尔值和整数等标量经常使用上述原子操作来实现同步,但当数据更复杂且必须在多个线程中使用这些数据时,就必须显示提供同步机制。
标准库支持互斥的形式包括互斥体类和锁类。这些类都可以用来实现线程之间的同步。
互斥体类
互斥体(mutex、代表mutual exclusion)的基本使用机制如下:
- 希望与其他线程共享内存读写的一个线程试图锁定互斥体对象。如果另一个线程正在持有这些个锁,希望获得访问的线程将被阻塞,直到锁被释放,或直到超时。
- 一旦线程获得锁,这个线程就可与随意使用共享的内存,因为这要假定希望使用共享数据的所有线程 都正确获得了互斥体对象上的锁。
- 线程读写完共享的内存后,线程将锁释放,使其他线程有机会获得访问内存共享内存的锁。如果两个或者两个以上的线程正在等待锁,没有机制能保证那个线程优先获得锁,并且继续访问数据。
c++标准提供了非定时的互斥体类和定时的互斥体类。有递归和非递归两种风格。在介绍互斥体之前,需要了解一下自旋锁。
自旋锁
自旋锁是互斥锁的一种形式,其中线程使用忙碌循环(自旋)方式来尝试获得锁,执行工作,请释放锁。在旋转时,线程保持活跃,但不做任何有用的工作。即便如此,自旋锁在某些情况下还是很有用,因为它们完全可以在自己的代码中实现,不需要对操作系统进行任何昂贵的调用。也不会造成线程切换的任何开销。如下面的代码所示,自旋锁可以使用单个原子类型实现:atomic_flag。
#include <atomic>
#include <format>
#include <iostream>
#include <thread>
#include <vector>
using namespace std;
atomic_flag spinlock = ATOMIC_FLAG_INIT;
static const size_t NumberOfThreads{50};
static const size_t LoopsPerThread{100};
void dowork(size_t threadNumber, vector<size_t> &data)
{
for (size_t i{0}; i < LoopsPerThread; ++i)
{
while (spinlock.test_and_set()) // spins until lock is acquired
{
}
data.push_back(threadNumber);
spinlock.clear(); // releases the acquired lock
}
}
int main()
{
vector<size_t> data;
vector<thread> threads;
for (size_t i{0}; i < NumberOfThreads; ++i)
{
threads.push_back(thread{dowork, i, ref(data)});
}
for (auto &t : threads)
{
t.join();
}
cout << format("data contains {} elements, expected {}.\n", data.size(), NumberOfThreads * LoopsPerThread);
}
在这段代码中,每个线程都试图反复调用atomic_flag
上的test_and_set()
来获取一个锁,直到成功。
警告:由于自旋锁使用忙碌等待循环,因此只有在确定线程只会在短时间内锁定自旋锁时,才应该考虑使用这种方式。
非定时的互斥体类
标准库有三个非定时的互斥体类:std::mutex
、recursive_mutex
和shared_mutex
。前两个类在<mutex>
中定义,最后一个类在<shared_mutex>
中定义。每个类都支持下列方法。
- lock():调用线程将尝试获取锁,并阻塞直到获取锁。这个方法会无限期阻塞。如果希望设置线程阻塞的最长时间,因该使用定时的互斥体类。
- try_lock():调用线程将尝试获取锁。如果当前锁被其他线程持有,这个调用会立即返回。如果成功获取锁,
try_lock()
返回true
,否则返回false
。 - unlock():释放由调用线程持有的锁,使另外一个线程获取这个锁。
std::mutex
是一个标准的具有独占所有权语义的互斥类。只能有一个线程拥有互斥体。如果另外一个线程想要获得互斥体的使用权,则可以使用lock()
阻塞,也可使用try_lock()
尝试阻塞。已经拥有std::mutex
所有权的线程不能在这个互斥体上再次调用lock()和try_lock()
,否则可能导致死锁。
std::recursive_mutex
的行为几乎和std::mutex
一致,区别在于已经获得递归互斥体的所有权的线程允许在同一个互斥体上再次调用lock()
和try_lock()
。调用线程调用unlock()
方法的次数应该等于获得这个递归互斥体上的锁的次数。
shared_mutex
支持"共享锁拥有权"的概念,这也称为readerswriters
锁。线程可获取锁的独占所有权或共享所有权。独占所有权也称为写锁,仅当没有其他线程拥有独占或共享所有权时才能获得。共享所有权也称为读锁,如果其他线程都没有独占所有权,则可获得,但允许其他线程获取共享所有权。shared_mutex
类支持lock()、try_lock()和unlock()
。这些方法获取和释放独占锁。另外,他们具有以下与共享所有权相关的方法:lock_shared()、try_lock_shared()和unlock_shared()
。这些方法与其他方法集合的工作方式相似,尝试获取获释放共享所有权。
不允许已经在shared_mutex
上拥有锁的线程在互斥体上获得第二个锁,否则会产生死锁!
锁
锁类是RALL类,可用于更方便地正确获得和释放互斥体上的锁;锁类的析构函数会自动释放所关联的互斥体。 c++标准定义了4种类型的锁:std::lock_guard、unique_lock、shared_lock和scoped_lock。
1.lock_guard
lock_guard
在<mutex>
中定义,有两个构造函数。
-
explicit lock_guard(mutex_type& m);
接收一个互斥体引用的构造函数。这个构造函数尝试获得互斥体上的锁,并阻塞直到获得锁。
-
lock_guard(mutex_type& m, adopt_lock_t);
接收一个互斥体引用和一个
std::adop_lock_t
实例的构造函数,c++提供了一个预定义的adopt_lock_t
实例,名为std::adopt_lock
。该锁假定调用线程已经获得引用的互斥体上的锁,管理该锁,在销毁锁时自动释放互斥体。
2.unique_lock
std::unique_lock
定义在<mutex>
中,是一类更复杂的锁,允许将获得锁的时间延迟到计算需要时,远在声明之后。使用owns_lock()
方法可以确定是否获得了锁。unique_lock
也有bool
转换运算符,可用于检查是否获得了锁。使用这个转换运算符的例子在后面给出。unique_lock
有如下几个构造函数。
-
explicit unique_lock(mutex_type& m);
接收一个互斥体引用构造函数,这个构造函数尝试获得互斥体上的锁,并且阻塞直到获得锁。
-
unique_lock(mutex_type& m, defer_lock_t)noexcept;
unique_lock
存储互斥体的引用,但不立即尝试获得锁,锁可以稍后获得。 -
unique_lock(mutex_type& m, try_to_lock_t);
这个锁尝试获得引用互斥体上的锁,但是即使没有获得也不阻塞,此时,会在稍后获得锁。
-
unique_lock(mutex_type& m, adopt_lock_t);
这个锁假定调用线程已经获得引用的互斥体上的锁,锁管理互斥体,并在销毁的时候自动释放互斥体。
-
unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>&abs_time);
这个构造函数试图获取一个锁,直到系统时间超过给定的绝对时间。
-
unique_lock(mutex_type& m, const chrono::duration<Rep,Period>&rel_time);
这个构造函数试图获取一个锁,直到到达给定的相对超时时间。
unique_lock
类也有以下方法:lock()、try_lock()、try_lock_for()、try_lock_until()和unlock()。
这些方法的行为和前面介绍的定时互斥体类中的方法一致。
3.shared_lock
shared_lock
类在<shared_mutex>
中定义,它的构造函数和方法与unique_lock
相同。区别是,shared_lock
类在底层的共享互斥体上调用与共享拥有权相关的方法,因此shared_lock
的方法称为lock()、try_lock()
等,但是在底层的共享互斥体上,它们称为lock_shared()、try_lock_shared()
等。因此,shared_lock
与unique_lock
有相同的接口,可以用作unique_lock
的替代品,但是获得的是共享锁,而不是独占锁。
4.一次性获得多个锁
c++有两个泛型锁函数,可用于同时获得多个互斥体对象上的锁,而不会出现死锁。这两个泛型锁函数都在std
名称空间中定义,都是可变参数模板函数。
第一个函数lock()
不按指定顺序锁定所有给定的互斥体对象,没有出现死锁的风险。如果其中一个互斥锁调用抛出异常,则在已经获得的所有锁上调用unlock()。
原型如下:
template<class L1, class L2, class... L3>void lock(L1&, L2&, L3&...);
第二个try_lock()
函数具有类似的原型,但它通过顺序调用每个给定互斥体对象的try_lock()
,试图获得所有互斥体对象上的锁。如果try_lock()调用成功,那么这个函数返回-1。如果任何try_lock()调用失败,那么对所有已经获得的锁调用unlock()
,返回值是在其上调用try_lock()
失败的互斥体的参数位置索引(从0开始)
示例:process
函数首先创建两个锁,每个互斥体一个锁,然后将一个std::defer_lock_t
实例作为第二个参数传入,告诉unique_lock
不要在构造期间获得锁。然后调用std::lock()
以获得这两个锁,而不出现死锁:
mutex mut1;
mutex mut2;
void process()
{
unique_lock lock1{mut1, defer_lock};
unique_lock lock2{mut2, defer_lock};
lock(lock1, lock2);
}
5.scoped_lock
std::scoped_lock
在<mutex>
中定义,与lock_guard
类似,只是接收数量可变的互斥体。这样,就可极方便地获取多个锁。例如,可以使用scoped_lock
,编写刚才的process
函数:
mutex mut1;
mutex mut2;
void process()
{
scoped_lock(mut1, mut2);
}
scoped_lock
不仅简化了获取多个锁的过程,因为不需要担心需要以正确的顺序获取他们,而且它的性能也比手动获取锁要好。
std::call_once
结合使用call_once()
和once_flag
可以确保某个函数或方法正好只调用一次,无论有多少个线程试图调用call_once()
(在同一个once_flag
上)都同样如此。只有一个call_once()
调用能真正调用给定的函数或方法。如果给定的函数不抛出任何异常,则这个调用称为有效的call_once()
调用。如果给定的函数抛出异常,异常将传回调用者,选择另一个调用者来执行此函数。某个特定的once_flag
实例的有效调用在对同一个once_flag
实例的其他所有call_once()
调用之前完成。在同一个once_flag
实例上调用call_once()
的其他线程将会被阻塞,直到有效调用结束。
下面演示call_once()
的使用,该例子运行使用某个共享资源的processFunction()
,启动了3个线程,这些线程应调用initializeSharedResourcess()
一次,为此,每个线程用全局的once_flag
调用call_once()
,结果是只有一个线程执行initializeSharedResourcess()
,且执行一次。在调用call_once()
的过程中,其他线程被阻塞,直到initializeSharedResourcess()
返回:
once_flag g_onceFlag;
void initializeSharedResourcess()
{
cout << "Shared resources initialized." << endl;
}
void processingFunction()
{
call_once(g_onceFlag, initializeSharedResourcess);
cout << "preocessing" << endl;
}
int main()
{
vector<thread> threads{3};
for (auto &t : threads)
{
t = thread(processingFunction);
}
for (auto &t : threads)
{
t.join();
}
}
输出如下:
Shared resources initialized.
preocessing
preocessing
preocessing
互斥体对象使用示例
接下来几个例子将展示如何使用互斥体对象同步多个线程。
1.以线程安全的方式写入流
c++中的流是不会出现争用条件的,但是来自不同线程的输出仍会交错。这里有两个解决方案
- 使用c++20的同步流
- 使用互斥对象以确保每次只有一个线程对流对象进行读写
同步流
c++20引入了std::basic_osyncstream
,并分别为char
流和wchar_t
流预定义了类型别名osyncstream
和wosyncstream
,他们都定义在<syncstream>
。这些类名中的‘o’表示输出。这些类保证所有通过他们完成的输出都将在同步流被销毁的那一刻出现在最终的输出流中。保证输出不会和其他线程的其他输出交错。
class Counter
{
public:
Counter(int id, int numIterations) : m_id{id}, m_numIterations{numIterations} {};
void operator()() const
{
for (int i{0}; i < m_numIterations; ++i)
{
osyncstream{cout} << "Counter " << m_id << " has value " << i << endl;
}
}
private:
int m_id;
int m_numIterations;
};
使用锁
如果不能使用同步流,可以使用以下代码段所示的互斥锁来同步Counter
类中对cout
的所有访问,为此添加了一个静态互斥对象,它应该是静态的,因为类的所有实例都因该使用同一个互斥对象实例。lock_guard
用于在写入cout
之前获取互斥锁。
class Counter
{
public:
Counter(int id, int numIterations) : m_id{id}, m_numIterations{numIterations} {};
void operator()() const
{
for (int i{0}; i < m_numIterations; ++i)
{
lock_guard lock{ms_mutex};
cout << "Counter " << m_id << " has value " << i << endl;
}
}
private:
int m_id;
int m_numIterations;
inline static mutex ms_mutex;
};
这段代码在for
循环的每次迭代中创建了一个lock_guard
实例,建议尽可能限制拥有锁的时间,否则阻塞其他线程的时间就会过长。例如,如果lock_guard
实例在for
循环之前创建一次,就基本上丢失了这段代码中的所有多线程特征,因为一个线程在其for
循环的整个执行期间都拥有锁,所有其他线程都等待这个锁被释放。
2.使用定时锁
下面的示例演示如何使用定时的互斥体。这与此前是同一个Counter
类,但是这一次结合unique_lock
使用了timed_mutex
。将200毫秒的相对时间传给unique_lock
构造函数,试图在200毫秒内获得一个锁。如果不能在这个时间间隔内获得这个锁,构造函数返回。之后,可检查这个锁是否已经获得,对这个lock
变量应用if
语句就可执行这种检查,因为unique_lock
类定义了bool
转换运算符。使用chrono
库指定超时时间。
class Counter
{
public:
Counter(int id, int numIterations) : m_id{id}, m_numIterations{numIterations} {};
void operator()() const
{
for (int i{0}; i < m_numIterations; ++i)
{
unique_lock lock{ms_timedMutex, 200ms};
if (lock)
cout << "Counter " << m_id << " has value " << i << endl;
}
}
private:
int m_id;
int m_numIterations;
inline static timed_mutex ms_timedMutex;
};
3.双重检查锁
双重检查锁实际上是一种反模式,应该避免使用!但是这里还是应该讲一下, 因为可能会在现有代码库中遇见。双重检查锁定模式旨在尝试避免使用互斥体对象。这是编写比使用互斥体对象更有效代码的一种半途而废大的尝试。如果在后续示例中想要提高速度,真的可能出错。
例如双重检查锁可用于确保资源正好初始化一次。下面演示了如何实现这个功能。之所以称为双重检查算法,是因为它检查g_initialized
变量的值两次,一次在获得锁之前,另一次在获得锁之后。第一次检查g_initialized
变量是为了防止获得不需要的锁,第二次是为了检查用于确保没有其他线程在第一次g_initialized
检查和获得锁之间执行初始化。
void initializeSharedResources()
{
cout << "initializeSharedResources" << endl;
}
atomic<bool> g_initialized{false};
mutex g_mutex;
void processingFunction()
{
if (!g_initialized)
{
unique_lock lock{g_mutex};
if (!g_initialized)
{
initializeSharedResources();
g_initialized = true;
}
}
cout << "OK" << endl;
}
int main()
{
vector<thread> threads;
for (int i{0}; i < 5; ++i)
{
threads.push_back(thread{processingFunction});
}
for (auto &t : threads)
{
t.join();
}
return 0;
}
输出结果:
initializeSharedResources
OK
OK
OK
OK
OK
可以清楚的看到只有一个线程初始化了共享资源,对于这个例子,还是推荐使用call_once()
条件变量
条件变量允许一个线程阻塞,直到另一个线程设置某个条件变量或系统时间到达某个指定的时间。条件变量允许显式的线程间通信。
有两类条件变量,他们都定义在<condition_variable>
中。
-
std::condition_variable:
只能等待unique_lock<mutex>
上的条件变量。根据c++标准的描述,这个变量可以在特定的平台上达到最高的性能 -
std::condition_variable_any:
可等待任何对象的条件变量,包括自定义的锁类型。它支持如下方法:
-
notify_one():
唤醒等待这个条件变量的线程之一 -
notify_all():
唤醒等待这个条件变量的所有线程 -
wait(unique_lock<mutex>&lk):
调用wait()
的线程应该已经获得lk
上的锁,调用wait()
的效果是以原子方式调用了lk.unlock()
并阻塞线程,等待通知。当线程被另外一个线程中的notify_one()
或notify_all()
,调用解除阻塞时,这个函数会再次调用lk.lock()
,可能会被这个锁阻塞,然后返回。 -
wait_for(unique_lock<mutex>& lk, const chrono::duration<Rep, Period>& rel_time):
类似于此前的wait()
方法,区别在于这个线程会被notify_one()
或notify_all()
调用解除阻塞,也可能在给定超时时间到达后解除阻塞。 -
wait_until(unique_lock<mutex>& lk, const chrono::time_point<Clock, Duration>& abs_time):
类似于此前的wait()
方法,区别在于这个线程会被notify_one()或notify_all()
调用解除阻塞,也可能在系统时间超过给定的绝对时间时解除阻塞。 也有一些其他版本的
wait()、wait_for()和 wait_until()
接收一个额外的谓词参数。例如,接收一个额外谓词的wait()
等同于:while (!predicate()) wait(1k);
condition_variable_any
类支持的方法和condition_variable
类相同,区别在于condition_variable_any
可接收任何类型的锁类,而不只是unique_lock<mutex>
。锁类应提供lock()
和unlock()
方法。
-
虚假唤醒
等待条件变量的线程可在另一个线程调用notify_one()或notify_all()
时醒过来,或在系统时间超过给定时间时醒过来,也可能不合时宜地醒过来。这意味着,即使没有其他线程调用任何通知方法,线程也会醒过来。因此,当线程等待一个条件变量并醒过来时,就需要检查它是否因为获得通知而醒过来。一种检查方法是使用接收谓词参数的wait()
版本。
条件变量可用于处理队列项的后台线程。可定义队列,在队列中插入要处理的项。后台线程等待队列中出现项。把一项插入队列中时,线程就醒过来,处理项,然后继续休眠,等待下一项。假设有以下队列:
queue<string> m_queue;
需要确保在任何时候只有一个线程修改这个队列。可通过互斥体实现这一点:
mutex m_mutex;
为了能在添加一项时通知后台线程,需要一个条件变量:
condition_riable m_condVar;
需要向队列中添加项的线程首先要获得这个互斥体上的锁,然后向队列中添加项,最后通知后台线程。无论当前是否拥有锁,都可以调用notify_one()或notify_all()
,它们都会正常工作:
// Lock mutex and add entry to the queue.
unique_lock lock {m_mutex};
m_queue.push(entry);
// Notify condition variable to wake up thread.
m_condVar.notify_all();
后台线程在一个无限循环中等待通知。注意这里使用接收谓词参数的wait()
方法正确处理线程不合时宜地醒过来的情形。谓词检查队列中是否有队列项。对wait()
的调用返回时,就可以肯定队列中有队列项了。
unique_lock lock { m_mutex };
while (true) {
// Wait for a notification.
m_condVar.wait(lock, [this]( return !m_queue.empty();});
// Condition variable is notified, so something is in the queue.
// Process queue item...
}
C++标准还定义了辅助函数std::notify_all_at_thread_exit(cond,lk),
其中cond
是一个条件变量,lk
是一个unique_lock<mutex>
实例。调用这个函数的线程应该已经获得了锁lk
。当线程退出时,会自动执行以下代码:
lk.unlock();
cond.notify_all();
注意: 将锁lk
保持锁定,直到该线程退出为止。因此,一定要确保这不会在代码中造成任何死锁,例如由于错误的锁顺序而产生的死锁。
latch
latch
是一次性使用的线程协调点。一旦给定数量的线程达到latch
点时,所有线程都会解除阻塞并继续执行。基本上它是个计数器,在每个线程到达latch
点时倒数。一旦计数器达到零,latch
将无限期保持在一个有信号的状态,所有阻塞线程都将解除阻塞,随后到达latch
点的任何线程会立刻被允许继续执行。
latch
由 std::latch
实现,在<latch>
中定义。构造函数接收需要到达latch点的所需线程数。到达latch
点的线程可以调用arrive_and_wait()
,它递减latch
计数器并阻塞,直到latch
有信号为止。线程也可以通过调用wait()
在不减少计数器的情况下阻塞在latch
点上。try_wait()
方法可用于检查计数器是否达到零。最后,如果需要,还可以通过调用count_down()
来减少计数器,而不会阻塞。
下面演示了一个latch
点的用例,其中一些数据需要加载到内存(I/O bound)
中,然后在多个线程中并行处理这些数据。进一步假设线程在启动时和开始处理数据之前需要执行一些CPU
绑定的初始化。通过先启动线程并让它们进行CPU
绑定的初始化,并且并行加载数据(I/O bound)
,性能得到了提高。代码用计数器1初始化一个latch
对象,并启动10个线程,这些线程都进行一些初始化,然后阻塞latch
,直到latch
计数器达到零。在启动10个线程之后,代码加载一些数据,例如从磁盘中。一旦加载了所有数据,latch
计数器将减为0,这10个等待线程都将解除阻塞。
latch startLatch{1};
vector<jthread> threads;
for (int i{0}; i < 10; ++i)
{
threads.push_back(jthread{[&startLatch] {
// Do some initialization...(CPU bound)
// Wait until the latch counter reaches zero. startLatch.wait();
startLatch.wait();
// Process data...
}});
}
// Load data... (I/O bound)
// Once all data has been loaded, decrement the latch counter
// which then reaches zero and unblocks all waiting threads
startLatch.count_down();
barrier
barrier
是由一系列阶段组成的可重用线程协调机制。许多线程在barrier
点阻塞。当给定数量的线程到达barrier
时,将执行完成阶段的回调,解除所有阻塞线程的阻塞,重置线程计数器,并开始下一个阶段。在每个阶段中,可以调整下一阶段的预期线程数。barrier
对于在循环之间执行同步非常有用。例如,假设有很多线程并发执行,并在一个循环中执行一些计算。进一步假设一旦这些计算完成,需要在线程开始其循环的新迭代之前对结果进行一些处理。对于这种情况,设置barrier
是完美的,所有的线程都会阻塞在barrier
处。当它们全部到达时,完成阶段的回调将处理线程的结果,然后解除所有线程的阻塞,以开始它们的下一次迭代。
barrier
由 std::barrier
实现,在<barrier>中定义。barrier
最重要的方式是arrive_and_wait()
,它减少计数器,然后阻塞线程,直到当前阶段完成。
下面的代码片段演示了barrier
的使用。它启动4个线程,在循环中连续执行某些操作。在每次迭代中,所有线程都是用barrier
进行同步。
void completionFunction() noexcept
{
// do something
}
int main()
{
const size_t numbeOfThreads{4};
barrier barrierPoint{numbeOfThreads, completionFunction};
vector<jthread> threads;
for (int i{0}; i < numbeOfThreads; ++i)
{
threads.push_back(jthread{[&barrierPoint](stop_token token) {
while (!token.stop_requested())
{
//... Do some calculations ...
// Synchronize with other threads,
barrierPoint.arrive_and_wait();
}
}});
}
}
semaphore
semaphore
(信号量)是轻量级同步原语,可用作其他同步机制(如mutex、latch、barrier)的构建块。基本上一个semaphore
由一个表示很多插槽的计数器组成。计数器在构造函数中初始化。如果获得了一个插槽,计数器将减少,而释放插槽将增加计数器。在<semaphore>
中定义了两个 semaphore
类:std::counting_semaphore
和binary_semaphore
。前一种模型是非负资源计数。后者只有一个插槽,该槽要么是空的,要么不是空的,完全适合作为互斥的构建块。
方法 | 描述 |
---|---|
acquire() | 递减计数器。当计数器为零时阻塞,直到计数器再次递增 |
try_acquire() | 尝试递减计数器,但如果计数器已经为零不会阻塞。如果计数器可以递减,则返回true,否则 返回false |
try_acquire_for() | 与try_acquire相同,但会在给定的时间段内尝试 |
try_acquire_until() | 与try_acquire相同,但是会一直尝试直到系统到达给定时间 |
release() | 计数器增加一个给定的数,并解除在acquire调用中线程的阻塞 |
计数semaphore
允许精确地控制希望允许并发运行的线程数量。例如,下面的代码片段允许最多4个线程并行运行:
counting_semaphore semaphore{4};
vector<jthread> threads;
for (int i{0}; i < 10; ++i)
{
threads.push_back(jthread{[&semaphore] {
semaphore.acquire();
// ... Slot acquired ... (at most 4 threads concurrently)
semaphore.release();
}});
}
semaphore
的另一个用例是为线程而不是为条件变量实现通知机制。例如,可以在其构造函数中将semaphore
的计数器初始化为0,任何调用 acquire()
的线程都会阻塞,直到其他线程对 semaphore
调用release()
future
根据前面的学习知道,可通过std::thread
启动一个线程,计算并得到一个结果,当线程结束执行时不容易取回计算的结果。与std::thread
相关的另一个问题是处理像异常这样的错误。如果一个线程抛出一个异常,而这个异常没有被线程本身处理,C++运行时将调用 std::terminate()
,这通常会终止整个应用程序。
可使用future
更方便地获得线程的结果,并将异常转移到另一个线程中,然后另一个线程可以任意处置这个异常。当然,应该总是尝试在线程本身处理异常,不要让异常离开线程。
future
在promise
中存储结果。可通过future
获取promise
中存储的结果。也就是说,promise
是结果的输入端;future
是输出端。一旦在同一线程或另一线程中运行的函数计算出希望返回的值,就把这个值放在promise
中。然后可以通过future
获取这个值。可将future/promise
对想象为线程间传递结果的通信信道。
C++提供标准的future
名为std::future
。可从 std::future
检索结果。T是计算结果的类型
future<T> myFuture {...}
T result { myFuture.get() };
调用get()
以取出结果,并保存在变量result
中。如果另一个线程尚未计算完结果,对 get()
的调用将阻塞,直到该结果值可用。只能在future
上调用一次get()
。按照标准,第二次调用的行为是不确定的。
可首先通过向future
询问结果是否可用的方式来避免阻塞:
if (myFuture.wait_for(0)) { // value is available
T result {myFuture.get()};
} else { ... } // value is not yet available
std::promise 和 std::future
C++提供了std::promise
类,作为实现promise
概念的一种方式。可在promise
上调用 set_value()
来存储结果,也可调用set_exception()
,在promise
中存储异常。注意,只能在特定的promise
上调用set_value()
或set_exception()
一次。如果多次调用它,将抛出std::future_error
异常。
如果线程A启动另一个线程B以执行计算,则线程A可创建一个std::promise
,将其传给已启动的线程。注意,无法复制promise
,但可将其移到线程中!线程B使用promise
存储结果。将promise
移入线程B之前,线程A在创建的promise
上调用get_future()
,这样,线程B完成后就能访问结果。示例:
void doWork(promise<int> thePromise)
{
thePromise.set_value(11);
}
int main()
{
// Create a promise to pass to the thread.
promise<int> myPromise;
// get the future of the promise
auto theFuture{myPromise.get_future()};
// Create a thread and move the promise into it.
thread theThread{doWork, move(myPromise)};
// Get the result.
int result{theFuture.get()};
cout << "Result: " << result << endl;
theThread.join();
}
这段代码只用于演示。这段代码在一个新的线程中启动计算,然后在future
上调用。这个线程会阻塞,直到结果计算完为止。这听起来像代价很高的函数调用。在实际应用程序中使用future
模型时,可定期检查future
中是否有可用的结果(通过此前描述的wait_for()
),或者使用条件变量等同步机制。当结果还不可用时,可做其他事情,而不是阻塞。
std::packaged_task
有了std::packaged_task
,将可以更方便地使用 promise
,而不是像前面那样显式地使用std::promise
。下面的代码演示了这一点。它创建一个packaged_task
来执行 calculateSum()
。通过调用get_future()
,从packaged_task
检索 future
。启动一个线程,并将 packaged_task
移入其中。无法复制packaged_task!
启动线程后,在检索到的future
上调用get()
来获得结果。在结果可用前,将一直阻塞。
calculateSum()
不需要在任何类型的 promise
中显式存储任何数据。packaged_task
自动创建promise
,自动在promise
中存储被调用函数(这里是calculateSum())的结果,并自动在promise
中存储函数抛出的任何异常。
int calculateSum(int a, int b)
{
return a + b;
}
int main()
{
packaged_task<int(int, int)> task{calculateSum};
auto theFuture{task.get_future()};
thread theThread{move(task), 19, 3};
int result{theFuture.get()};
cout << "result = " << result << endl;
theThread.join();
}
std::async
如果想让C++运行时更多地控制是否创建一个线程以进行某种计算,可使用std::async()
。它接收一个将要执行的函数,并返回可用于检索结果的future
。async()
可通过两种方法运行函数:
- 创建一个新的线程,异步运行提供的函数。
- 在返回的
future
上调用get()
方法时,在主调线程上同步地运行函数。
如果没有通过额外参数来调用async()
,C++运行时会根据一些因素(例如系统中处理器的数目)从两种方法中自动选择一种方法。也可指定策略参数,从而调整C++运行时的行为。
- launch::async:强制C++运行时在一个不同的线程上异步地执行函数。
- launch::deferred:强制C++运行时在调用
get()
时,在主调线程上同步地执行函数。 - launch::async |launch::deferred:允许C++运行时进行选择(=默认行为)。
下例演示了async()
的用法:
int calculate() { return 123; }
int main()
{
auto myFuture{async(calculate)};
// auto myFuture { async(launch::async, calculate) };
// auto myFuture ( async(launch::deferred, calculate));
int result{myFuture.get()};
cout << "result = " << result << endl;
}
从这个例子可看出,std::async()
是以异步方式(在不同线程中)或同步方式(在同一线程中)执行一些计算并在随后获取结果的最简单方法之一。
警告:调用async()
锁返回的future
会在其析构函数中阻塞,直到结果可用为止。这意味着如果调用async()
时未捕获返回的future
,async()
调用将真正成为阻塞调用!例如,以下代码行同步调用calculate():
async(calculate);
在这条语句中,async()
创建和返回future
。未捕获这个future
,因此是临时future
。由于是临时的,因此将在该语句完成前调用其析构函数,在结果可用前,该析构函数将一直阻塞。
异常处理
使用 future
的一大优点是它们会自动在线程之间传递异常。在future
上调用get()
时,要么返回计算结果,要么重新抛出与future
关联的promise
中存储的任何异常。使用packaged_task
或async()
时,从已启动的函数抛出的任何异常将自动存储在promise
中。如果将std::promise
用作promise
,可调用set_exception()
以在其中存储异常。下面是一个使用async()
的示例:
int calculate()
{
throw runtime_error{"Exception throw from calculate()."};
}
int main()
{
auto myFuture{async(launch::async, calculate)};
try
{
int result{myFuture.get()};
cout << result << endl;
}
catch (const exception &ex)
{
cout << " Caught exception: " << ex.what() << endl;
}
}
std::shared_future
std::future<T>
只要求T可移动构建。在future<T>
上调用get()
时,结果将移出 future
,并返回给你。这意味着只能在future<T>
上调用get()
一次。
如果要多次调用 get()
,甚至从多个线程多次调用,则需要使用 std::shared_future<T>
,此时,T需要可复制构建。可使用 std::future::share()
,或给 shared_future
构造函数传递 future
,以创建shared_future
。注意,future
不可复制,因此需要将其移入shared_future
构造函数。
shared_future
可用于同时唤醒多个线程。例如,下面的代码片段定义了两个lambda
表达式,它们在不同的线程上异步地执行。每个lambda
表达式首先将值设置为各自的promise
,以指示已经启动。接着在signalFuture
调用 get()
,这一直阻塞,直到可通过future
获得参数为止;此后将继续执行。每个lambda
表达式按引用捕获各自的promise
,按值捕获signalFuture
,因此这两个lambda
表达式都有signalFuture
的副本。主线程使用async()
,在不同线程上执行这两个lambda
表达式,一直等到线程启动,然后设置signalPromise
中的参数以唤醒这两个线程。
int main()
{
promise<void> thread1Started, thread2Started;
promise<int> signalPromise;
auto signalFuture{signalPromise.get_future().share()};
auto function1{[&thread1Started, signalFuture] {
thread1Started.set_value();
int parameter{signalFuture.get()};
cout << "function1 parameter = " << parameter << endl;
}};
auto function2{[&thread2Started, signalFuture] {
thread2Started.set_value();
int parameter{signalFuture.get()};
cout << "function2 parameter = " << parameter << endl;
}};
auto result1{async(launch::async, function1)};
auto result2{async(launch::async, function2)};
thread1Started.get_future().wait();
thread2Started.get_future().wait();
signalPromise.set_value(12);
}
示例:多线程的Logger类
本节演示如何使用线程、互斥体对象、锁和条件变量编写一个多线程的Logger
类。这个类允许不同的线程向一个队列中添加日志消息。Logger
类本身会在另一个后台线程中处理这个队列,将日志信息串行地写入一个文件。这个类的设计经历了两次迭代,以说明编写多线程代码时可能遇到的问题。
C++标准没有线程安全的队列。很明显,必须通过一些同步机制保护对队列的访问,避免多个线程同时读写队列。这个示例使用互斥体对象和条件变量来提供同步。在此基础上,可以这样定义Logger
类:
class Logger
{
public:
Logger();
Logger(const Logger &src) = delete;
Logger &operator=(const Logger &rhs) = delete;
void log(string entry);
private:
void processEntries();
void processEntriesHelper(queue<string> &queue, ofstream &ofs) const;
mutex m_mutex;
condition_variable m_condVar;
queue<string> m_queue;
thread m_thread;
};
实现如下。注意这个最初的设计存在几个问题,尝试运行这个程序时,它可能会行为异常甚至崩溃,在Logger
类的下一次迭代中会讨论并解决这些问题。processEntries()
方法中的while
循环值得关注。它处理当前队列中的所有消息。当拥有锁时,它将当前队列的内容与栈上的一个局部空队列交换。在此之后,它释放锁,这样其他线程就不会再被阻塞,从而向现在为空的当前队列添加新条目。一旦释放锁,就会处理局部队列的所有条目。这里不再需要锁,因为其他线程不会碰到这个局部队列。
Logger::Logger()
{
m_thread = thread{&Logger::processEntries, this};
}
void Logger::log(string entry)
{
unique_lock lock{m_mutex};
m_queue.push(move(entry));
m_condVar.notify_all();
}
void Logger::processEntries()
{
ofstream logFire{"log.txt"};
if (logFire.fail())
{
cerr << "Failed to open logfire." << endl;
return;
}
unique_lock lock{m_mutex, defer_lock};
while (true)
{
lock.lock();
m_condVar.wait(lock);
queue<string> localQueue;
localQueue.swap(m_queue);
lock.unlock();
processEntriesHelper(localQueue, logFire);
}
}
void Logger::processEntriesHelper(queue<string> &queue, ofstream &ofs) const
{
while (!queue.empty())
{
ofs << queue.front() << endl;
queue.pop();
}
}
从这个相当简单的任务中可看到,正确编写多线程代码是十分困难的。令人遗憾的是,C++标准目前不提供任何并发数据结构。
Logger
类是一个演示基本构建块的示例。对于生产环境中的代码而言,建议使用恰当的第三方并发数据结构,不要自行编写。例如,开源的BoostC++库(boost.org)
实现了一个无锁队列,允许并发使用,不需要任何显式的同步。
可通过下面的测试代码测试这个Logger类,这段代码启动一些线程,所有线程都向同一个Logger
实例记录一些信息:
void logSomeMessages(int id, Logger &logger)
{
for (int i{0}; i < 10; ++i)
{
logger.log(format("Log entry {} from thread {}", i, id));
}
}
int main()
{
Logger logger;
vector<thread> threads;
for (int i{0}; i < 2; ++i)
{
threads.emplace_back(logSomeMessages, i, ref(logger));
}
for (auto &t : threads)
{
t.join();
}
}
如果构建并运行这个原始的最初版本,你会发现应用程序突然终止。原因在于应用程序从未调用后台线程的join()
或detach()
。回顾前面的内容可知,thread
对象的析构函数仍是可结合的,即尚未调用join()
或 detach()
,而调用std::terminate()
来停止运行线程和应用程序本身。这意味着,仍在队列中的消息未写入磁盘文件。当应用程序像这样终止时,甚至一些运行时库会报错或生成崩溃转储。需要添加一种机制来正常关闭后台线程,并在应用程序本身终止之前,等待后台线程完全关闭。这可通过向类中添加一个析构函数和一个布尔数据成员来解决。新的Logger
类定义如下所示:
class Logger
{
public:
// Gracefully shut down background thread.
virtual ~Logger();
// Other public members omitted for brevity.
private:
// Boolean telling the background thread to terminate.
bool m_exit { false};
};
析构函数将m_exit
设置为true
,唤醒后台线程,并等待直到后台线程被关闭。把m_exit
设置为true
之前,析构函数在m_mutex
上获得一个锁。这是在使用processEntries()
防止争用条件和死锁。processEntries()
可以放在其while
循环的开头,即检查m_exit
之后、调用wait()
之前。如果主线程此时调用Logger
类的析构函数,而析构函数没有在m_mutex
上获得一个锁,则析构函数在processEntries()
检查m_exit
之后、等待条件变量之前,把m_exit
设置为true
,并调用notify_all()
,因此processEntries()
看不到新值,也收不到通知。此时,应用程序处于死锁状态,因为析构函数在等待join()
调用,而后台线程在等待条件变量。注意析构函数可以在仍然持有锁或释放锁之后调用notify_all()
,但必须在join()
调用之前释放锁,这解释了使用花括号的额外代码块。
Logger::~Logger()
{
{
unique_lock lock{m_mutex};
m_exit = true;
}
m_condVar.notify_all();
m_thread.join();
}
processEntries()
方法需要检查此布尔变量,当这个布尔变量为true时终止处理循环:
void Logger::processEntries()
{
ofstream logFire{"log.txt"};
if (logFire.fail())
{
cerr << "Failed to open logfire." << endl;
return;
}
unique_lock lock{m_mutex, defer_lock};
while (true)
{
lock.lock();
if (!m_exit)
m_condVar.wait(lock);
else
{
processEntriesHelper(m_queue, logFire);
break;
}
queue<string> localQueue;
localQueue.swap(m_queue);
lock.unlock();
processEntriesHelper(localQueue, logFire);
}
}
注意不能只在外层while
循环的条件中检查m_exit
,因为即使m_exit
是true
,队列中也可能有需要写入的日志项。
可在多线程代码的特殊位置添加人为的延迟,以触发某个行为。注意添加这种延迟应仅用于测试,并且应从最终代码中删除。例如,要测试是否解决了析构函数带来的争用条件,可在主程序中删除对log()
的所有调用,使其几乎立即调用Logger
类的析构函数,并添加如下延迟:
void Logger::processEntries()
{
ofstream logFire{"log.txt"};
if (logFire.fail())
{
cerr << "Failed to open logfire." << endl;
return;
}
unique_lock lock{m_mutex, defer_lock};
while (true)
{
lock.lock();
if (!m_exit)
{
this_thread::sleep_for(100ms); // 延迟
m_condVar.wait(lock);
}
else
{
processEntriesHelper(m_queue, logFire);
break;
}
queue<string> localQueue;
localQueue.swap(m_queue);
lock.unlock();
processEntriesHelper(localQueue, logFire);
}
}
线程池
如果不在程序的整个生命周期中动态地创建和删除线程,还可以创建可根据需要使用的线程池。这种技术通常用于需要在线程中处理某类事件的程序。在大多数环境中 ,线程的理想数目应该和处理器核心的数目相等 。如果线程的数目多于处理器核心的数目,那么线程只有被挂起,从而允许其他线程运行,这样最终会增加开销。注意,尽管理想的线程数目和核心数目相等,但这种情况只适用于计算密集型线程,这种情况下线程不能由于其他原因阻塞,例如I/O
,当线程可以阻塞时,往往运行数目比核心数目更多的线程更合适。在此类情况下,确定最佳线程数难度较大,可能涉及测量系统正常负载条件下的吞吐量(一般为2倍核心数较好)。
由于不是所有的处理都是等同的,因此线程池中的线程经常接收一个表示要执行的计算的函数对象或lambda表达式作为输入的一部分。
由于线程池中的所有线程都是预先存在的,因此操作系统调度这些线程并运行的效率大大高于操作系统创建线程并响应输入的效率。此外,线程池的使用允许管理创建的线程数,因此根据平台的不同,可以少至1个线程,也可以多达数千个线程。
有几个库实现了线程池,例如Intel Threading Building Blocks(TBB)
、Microsoft Parallel Pattems Library(PPL)
等。建议给线程池使用这样的库,而不是编写自己的实现。如果的确希望自己实现线程池,可以使用与对象池类似的方式实现。
线程设计和最佳实践总结
- 使用并行标准库算法: 标准库中包含大量算法。从 C++17开始,有60多个算法支持并行执行。尽量使用这些并行算法,而非编写自己的多线程代码。
- 终止应用程序前,确保所有thread对象都不是可结合的: 确保对所有
thread
对象都调用了join()
或detach()
。仍可结合的thread
析构函数将调用std::terminate()
,从而突然间终止所有线程和应用程序。C++20引入了jthread
,它在析构函数中自动join()。 - 最好的同步就是没有同步: 如果采用合理的方式设计不同的线程,让所有的线程在使用共享数据时只从共享数据读取,而不写入共享数据,或者只写入其他线程不会读取的部分,那么多线程编程就会变得简单很多。这种情况下不需要任何同步,也不会有争用条件或死锁的问题。
- 尝试使用单线程的所有权模式: 这意味着同一时间拥有1个数据块的线程数不多于1。拥有数据意味着不允许其他任何线程读/写这些数据。当线程处理完数据时,数据可传递到另一个线程,那个线程目前拥有这些数据的唯一且完整的责任/拥有权。这种情况下,没必要进行同步。
- 在可能时使用原子类型和操作: 通过原子类型和原子操作更容易编写没有争用条件和死锁的代码,因为它们能自动处理同步。如果在多线程设计中不可能使用原子类型和操作,而且需要共享数据,那么需要使用同步机制(如互斥)来确保同步的正确性。
- 使用锁保护可变的共享数据: 如果需要多个线程可写入的可变共享数据,而且不能使用原子类型和操作,那么必须使用锁机制,以确保不同线程之间的读写是同步的。
- 尽快释放锁: 当需要通过锁保护共享数据时,务必尽快释放锁。当一个线程持有一个锁时,会使得其他线程阻塞等待这个锁,这可能会降低性能。
- 不要手动获取多个锁,应当改用std::lock()或 std::try_lock(): 如果多个线程需要获取多个锁,那么所有线程都要以同样的顺序获得这些锁,以防止死锁。可通过泛型函数
std::lock()
或std::try_lock()
获取多个锁。 - 使用支持多线程的分析器: 通过支持多线程的分析器找到多线程应用程序中的性能瓶颈,分析多个线程是否确实利用了系统中所有可用的处理能力。支持多线程的分析器的一个例子是某些
Visual Studio
版本中的profiler
。 - 使用RAII 锁对象:使用
lock_guard
、unique_lock
、shared_lock
或scoped_lock
RAII
类,在正确的时间自动释放锁。 - 了解调试器的多线程支持特性: 大部分调试器都提供对多线程应用程序调试的最基本支持。应该能得到应用程序中所有正在运行的线程列表,而且应该能切换到任意线程,查看线程的调用栈。例如,可通过这些特性检查死锁,因为可准确地看到每个线程正在做什么。
- 使用线程池,而不是动态创建和销毁大量线程: 动态地创建和销毁大量的线程会导致性能下降。这种情况下,最好使用线程池来重用现有的线程。
- 使用高级多线程库: 目前,C++标准仅提供用于编写多线程代码的基本构件。正确使用这些构件并非易事。尽可能使用高级多线程库,例如
Intel Threading Building Blocks(TBB)
、Microsoft Parallel Patterns Library(PPL)
等,而不是自己实现。多线程编程很难掌握,而且容易出错。另外,自己的实现不一定像预期那样正确工作。
原创不易,转载请注明出处,欢迎点赞,收藏!