一、原子类型与原子操作
1.1 原子类型与操作介绍
在前一篇博文中,多线程交互示例代码中,给出了一个原子类型定义:
// 原子数据类型
atomic_llong total {0};
那么什么事原子数据类型呢,和c++的基础数据类型有什么不同呢:
我们在c++11之前编程时,很多情况下需要在多个线程间共享一个简单的类型变量(int,bool,pointer等),对这种简单临界资源的访问,如有两个线程,对一个变量进行操作,一个线程读这个变量的值,一个线程往这个变量中写值。即使是一个简单变量的读取和写入操作,如果不加锁,也有可能会导致读写值混乱。因此,就使用std::mutex来解决上述对临界资源访问的问题,使用std::mutex程序执行不会导致混乱,但是每一次循环都要加锁解锁是的程序开销很大。 为了提高性能,C++11提出了原子操作及原子类型的概念,所谓原子操作,就是多线程程序中"最小的且不可并行化的"的操作,其在std::atomic<T>
定义,它提供了多线程间的原子操作,可以把原子操作理解成一种:不需要用到互斥量加锁(无锁)技术的多线程并发编程方式。
通常对一个共享资源的操作是原子操作的话,意味着多个线程访问该资源时,有且仅有唯一一个线程在对这个资源进行操作。那么从线程(处理器)的角度看来,其他线程就不能够在本线程对资源访问期间对该资源进行操作,因此原子操作对于多个线程而言,就不会发生有别于单线程程序的意外状况。通常情况下,原子操作都是通过"互斥"(mutual exclusive)的访问来保证的。类似于在C++11标准之前,需要在C/C++代码中嵌入互斥锁(mutex)来实现。
从效率上来说,原子操作要比互斥量的方式效率要高。互斥量的加锁一般是针对一个代码段,而原子操作针对的一般都是一个变量。 原子操作,一般都是指“不可分割的操作”;是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。 由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能。原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了 CAS 循环,当大量的冲突发生时,该等待还是得等待!但是总归比锁要好。
1.2 c++11的原子类型
原子类型定义在<atomic>
头文件中,原子类型是封装了一个值的类型,它的访问保证不会导致数据的竞争,并且可以用于在不同的线程之间同步内存访问。在c++20和c++23又基于c++11追加组合定义。
//头文件 <memory>
template< class T > struct atomic; (C++11 起)
template< class U > struct atomic<U*>; (C++11 起)
//头文件 <stdatomic.h>
template<class U> struct atomic<std::shared_ptr<U> >; (C++20 起)
template<class U> struct atomic<std::weak_ptr<U>>; (C++20 起)
#define _Atomic(T) (C++23 起)
每个 std::atomic 模板的实例化和全特化定义一个原子类型。若一个线程写入原子对象,同时另一线程从它读取,则行为良好定义。另外,对原子对象的访问可以建立线程间同步,并按 std::memory_order 所对非原子内存访问定序。std::atomic 既不可复制亦不可移动。
c++23后,定义了兼容性宏 _Atomic ,其于头文件 <stdatomic.h> 中提供,使得两者均良构时 _Atomic(T) 等同于 std::atomic<T> 。未指定包含 <stdatomic.h> 时命名空间 std 中的任何声明可用。
std::atomic是一个类模板,和一般的类模板定义对象使用是一致的,例如c++11前定义一个long long对象,需要采用一个mutex来辅助,
static long long total = 0;
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
//线程调用
for(;;)
{
pthread_mutex_lock(&mutex_);
total += 1;
pthread_mutex_unlock(&mutex_);
}
而直接采用std::atomic模板及可以简化:
std::atomic<long long> total {0};//普通定义
//或
std::atomic_llong total {0}; //特化定义
//线程调用
for(;;)
{
total += 1;
}
标准库为下列类型提供 std::atomic
模板的特化,它们拥有普通模板定义所不拥有的额外属性:
- 对所有指针类型的部分特化 std::atomic<U*> 。这些特化拥有标准布局、默认构造函数 和析构函数。除了为所有原子类型提供的操作,这些特化额外支持适合指针类型的原子算术运算,例如 fetch_add 、 fetch_sub。
- (C++20 后),为 std::shared_ptr 和 std::weak_ptr 提供部分特化 std::atomic<std::shared_ptr<U> > 和 std::atomic<std::weak_ptr<U> > 。
- 整数类型特化, std::atomic 提供适合于整数类型的额外原子操作,例如 fetch_add 、 fetch_sub 、fetch_and 、fetch_or 、fetch_xor。
- 浮点类型特化,如float 、double 和 long double 等类型, std::atomic 提供适合于浮点类型的额外原子操作,例如 fetch_add 和 fetch_sub。
c++标准库还为这些特化提供了类型别名,像前面的std::atomic_llong就是别名:
//std::atomic<Integral> 别名,C++11
atomic_bool std::atomic<bool> (typedef)
atomic_char std::atomic<char>(typedef)
atomic_schar std::atomic<signed char>(typedef)
atomic_uchar std::atomic<unsigned char>(typedef)
atomic_short std::atomic<short>(typedef)
atomic_ushort std::atomic<unsigned short>(typedef)
atomic_int std::atomic<int>(typedef)
atomic_uint std::atomic<unsigned int>(typedef)
atomic_long std::atomic<long>(typedef)
atomic_ulong std::atomic<unsigned long>(typedef)
atomic_llong std::atomic<long long>(typedef)
atomic_ullong std::atomic<unsigned long long>(typedef)
atomic_char8_t (C++20)std::atomic<char8_t>(typedef)
atomic_char16_t std::atomic<char16_t>(typedef)
atomic_char32_t std::atomic<char32_t>(typedef)
atomic_wchar_t std::atomic<wchar_t>(typedef)
atomic_int8_t std::atomic<std::int8_t>(typedef)
atomic_uint8_t std::atomic<std::uint8_t>(typedef)
atomic_int16_t std::atomic<std::int16_t>(typedef)
atomic_uint16_t std::atomic<std::uint16_t>(typedef)
atomic_int32_t std::atomic<std::int32_t>(typedef)
atomic_uint32_t std::atomic<std::uint32_t>(typedef)
atomic_int64_t std::atomic<std::int64_t>(typedef)
atomic_uint64_t std::atomic<std::uint64_t>(typedef)
atomic_int_least8_t std::atomic<std::int_least8_t>(typedef)
atomic_uint_least8_t std::atomic<std::uint_least8_t>(typedef)
atomic_int_least16_t std::atomic<std::int_least16_t>(typedef)
atomic_uint_least16_t std::atomic<std::uint_least16_t>(typedef)
atomic_int_least32_t std::atomic<std::int_least32_t>(typedef)
atomic_uint_least32_t std::atomic<std::uint_least32_t>(typedef)
atomic_int_least64_t std::atomic<std::int_least64_t>(typedef)
atomic_uint_least64_t std::atomic<std::uint_least64_t>(typedef)
atomic_int_fast8_t std::atomic<std::int_fast8_t>(typedef)
atomic_uint_fast8_t std::atomic<std::uint_fast8_t>(typedef)
atomic_int_fast16_t std::atomic<std::int_fast16_t>(typedef)
atomic_uint_fast16_t std::atomic<std::uint_fast16_t>(typedef)
atomic_int_fast32_t std::atomic<std::int_fast32_t>(typedef)
atomic_uint_fast32_t std::atomic<std::uint_fast32_t>(typedef)
atomic_int_fast64_t std::atomic<std::int_fast64_t>(typedef)
atomic_uint_fast64_t std::atomic<std::uint_fast64_t>(typedef)
atomic_intptr_t std::atomic<std::intptr_t>(typedef)
atomic_uintptr_t std::atomic<std::uintptr_t>(typedef)
atomic_size_t std::atomic<std::size_t>(typedef)
atomic_ptrdiff_t std::atomic<std::ptrdiff_t>(typedef)
atomic_intmax_t std::atomic<std::intmax_t>(typedef)
atomic_uintmax_t std::atomic<std::uintmax_t>(typedef)
//特殊用途类型别名,C++20
atomic_signed_lock_free 免锁且对于等待/提醒最高效的有符号整数原子类型(typedef)
atomic_unsigned_lock_free 免锁且对于等待/提醒最高效的无符号整数原子类型(typedef)
采用std::atomic<T>定义和采用特化别名定义是等价的,对于线程而言,原子类型通常属于"资源型"的数据,这意味着多个线程通常只能访问单个原子类型的拷贝。因此在C++11中,原子类型只能从其模板参数类型中进行构造,标准不允许原子类型进行拷贝构造、移动构造,以及使用operator=等,以防止发生意外。比如:
//test0.h
#ifndef _TEST_1_H_
#define _TEST_1_H_
void func0(void);
#endif //_TEST_1_H_
//test0.cpp
#include "test0.h"
#include <atomic>
#include <iostream>
using namespace std;
void func0(void)
{
atomic<int> a_i(100);
//atomic<int> a_i_cpy(a_i);//无法通过编译
atomic_int a_i_alias{1000};
//atomic_int a_i_cpy_alias{a_i_alias};//无法通过编译
};
//main.cpp
#include "test0.h"
int main(int argc, char* argv[])
{
func0();
return 0;
}
其中,a_i_cpy(a_i)的构造方式在C++11中是不允许的(事实上,atomic模板类的拷贝构造函数、移动构造函数、operator=等总是默认被删除的。我们会在第7章中介绍如何删除一些默认的函数)。
不过从atomic<T>类型的变量来构造其模板参数类型T的变量则是可以的。比如:
//test0.h
#ifndef _TEST_1_H_
#define _TEST_1_H_
void func0(void);
#endif //_TEST_1_H_
//test0.cpp
#include "test0.h"
#include <atomic>
#include <iostream>
using namespace std;
void func0(void)
{
atomic<int> a_i(100);
//atomic<int> a_i_cpy(a_i);//无法通过编译
atomic_int a_i_alias{1000};
//atomic_int a_i_cpy_alias{a_i_alias};//无法通过编译
int i_test(a_i);
cout << "i_test = " << i_test << "\n";
i_test = a_i_alias;
cout << "i_test = " << i_test << "\n";
};
//main.cpp
#include "test0.h"
int main(int argc, char* argv[])
{
func0();
return 0;
}
编译后输出,可以看到模板参数T可以通过atomic对象进行拷贝构造或赋值的
这是由于atomic类模板总是定义了从atomic<T>到T的类型转换函数的缘故。在需要时,编译器会隐式地完成原子类型到其对应的类型的转换。
1.3 原子操作功能与应用
那么,使得原子类型能够在线程间保持原子性的缘由主要还是因为编译器能够保证针对原子类型的操作都是原子操作。在C++11标准中,将原子操作定义为atomic模板类的成员函数,这囊括了绝大多数典型的操作,如读、写、交换等。当然,对于内置类型而言,主要是通过重载一些全局操作符来完成的。
【1】成员类型
成员类型 定义
value_type T (无论是否特化)
difference_type value_type (仅对 atomic<Integral> 和 atomic<Floating> (C++20 起) 特化);std::ptrdiff_t (仅对 atomic<U*> 特化) difference_type 不在初等 atomic 模板中,或不在对 std::shared_ptr 和 std::weak_ptr 的部分特化中定义。
成员函数
(构造函数) 构造原子对象(公开成员函数)
operator= 存储值于原子对象(公开成员函数)
is_lock_free 检查原子对象是否免锁(公开成员函数)
store 原子地以非原子对象替换原子对象的值(公开成员函数)
load 原子地获得原子对象的值(公开成员函数)
operator T 从原子对象加载值(公开成员函数)
exchange 原子地替换原子对象的值并获得它先前持有的值(公开成员函数)
compare_exchange_weak 原子地比较原子对象与非原子参数的值,若相等则进行交换,若不相等则进行加载(公开成员函数)
compare_exchange_strong 原子地比较原子对象与非原子参数的值,若相等则进行交换,若不相等则进行加载(公开成员函数)
wait (C++20) 阻塞线程直至被提醒且原子值更改(公开成员函数)
notify_one (C++20) 提醒至少一个在原子对象上的等待中阻塞的线程(公开成员函数)
notify_all (C++20) 提醒所有在原子对象上的等待中阻塞的线程(公开成员函数)
常量
is_always_lock_free [静态](C++17)指示该类型是否始终免锁(公开静态成员常量)
特化成员函数
fetch_add 原子地将参数加到存储于原子对象的值,并返回先前保有的值(公开成员函数)
fetch_sub 原子地从存储于原子对象的值减去参数,并获得先前保有的值(公开成员函数)
fetch_and 原子地进行参数和原子对象的值的逐位与,并获得先前保有的值(公开成员函数)
fetch_or 原子地进行参数和原子对象的值的逐位或,并获得先前保有的值(公开成员函数)
fetch_xor 原子地进行参数和原子对象的值的逐位异或,并获得先前保有的值(公开成员函数)
operator++(int) 令原子值增加或减少系列(公开成员函数)
operator++(int)
operator--(int)
operator--(int)
operator+= 加、减,或与原子值进行逐位与、或、异或(公开成员函数)
operator-=
operator&=
operator|=
operator^=
对于大多数的原子类型而言,都可以执行读(load)、写(store)、交换(exchange)、比较并交换(compare_exchange_weak/compare_exchange_stronge)等操作。通常情况下,这些原子操作已经足够使用了。比如在下列语句中:
atomic<int> a;
int b = a;//等同于b=a.load()
赋值语句b=a实际就等同于b=a.load()。而由于a.load是原子操作,因此可以避免线程间关于a的竞争,而下列语句:
atomic<int> a;
a = 1;//实际等同于a.store(1)
其赋值语句a=1则等同于调用a.store(1)。同样的,由于a.store是原子操作,也可以避免线程间关于a的竞争。而 exchange和 compare_exchange_weak、compare_exchange_stronge 则更复杂一些。由于每个平台上对线程间实现交换、比较并交换等操作往往有着不同的方式,无法用一致的高级语言表达,因此这些接口封装了平台上最高性能的实现,使得程序员能够在不同平台上都能获得最佳的性能。
类模板 std::atomic在标准库中的声明:
namespace std {
template<class T> struct atomic {
using value_type = T;
static constexpr bool is_always_lock_free = /* 实现定义 */;
bool is_lock_free() const volatile noexcept;
bool is_lock_free() const noexcept;
// 原子类型上的操作
constexpr atomic() noexcept(is_nothrow_default_constructible_v<T>);
constexpr atomic(T) noexcept;
atomic(const atomic&) = delete;
atomic& operator=(const atomic&) = delete;
atomic& operator=(const atomic&) volatile = delete;
T load(memory_order = memory_order::seq_cst) const volatile noexcept;
T load(memory_order = memory_order::seq_cst) const noexcept;
operator T() const volatile noexcept;
operator T() const noexcept;
void store(T, memory_order = memory_order::seq_cst) volatile noexcept;
void store(T, memory_order = memory_order::seq_cst) noexcept;
T operator=(T) volatile noexcept;
T operator=(T) noexcept;
T exchange(T, memory_order = memory_order::seq_cst) volatile noexcept;
T exchange(T, memory_order = memory_order::seq_cst) noexcept;
bool compare_exchange_weak(T&, T, memory_order, memory_order) volatile noexcept;
bool compare_exchange_weak(T&, T, memory_order, memory_order) noexcept;
bool compare_exchange_strong(T&, T, memory_order, memory_order) volatile noexcept;
bool compare_exchange_strong(T&, T, memory_order, memory_order) noexcept;
bool compare_exchange_weak(T&, T,
memory_order = memory_order::seq_cst) volatile noexcept;
bool compare_exchange_weak(T&, T, memory_order = memory_order::seq_cst) noexcept;
bool compare_exchange_strong(T&, T,
memory_order = memory_order::seq_cst) volatile noexcept;
bool compare_exchange_strong(T&, T, memory_order = memory_order::seq_cst) noexcept;
void wait(T, memory_order = memory_order::seq_cst) const volatile noexcept;
void wait(T, memory_order = memory_order::seq_cst) const noexcept;
void notify_one() volatile noexcept;
void notify_one() noexcept;
void notify_all() volatile noexcept;
void notify_all() noexcept;
};
}
1.4 原子布尔类型
此外,标准库还定义了一个比较特殊的布尔型的 atomic类型: atomic_flag(注意,atomic_flag跟 atomic_bool是不同的),std::atomic_flag 是原子布尔类型。
//定义于头文件 <atomic>
class atomic_flag; // (C++11 起)
相比于其他的atomic类型,atomic flag是无锁的(lock-free),它保证是免锁的,即线程对其访问不需要加锁。因此对atomic_flag而言,也就不需要使用load、store等成员函数进行读写(或者重载操作符)。另外,不同于 std::atomic<bool> , std::atomic_flag 不提供加载或存储操作。
atomic flag类型成员函数:
成员函数
(构造函数) 构造 atomic_flag(公开成员函数)
operator= 赋值运算符(公开成员函数)
clear 原子地设置标志为 false(公开成员函数)
test_and_set 原子地设置标志为 true 并获得其先前值(公开成员函数)
test (C++20)原子地返回标志的值(公开成员函数)
wait (C++20)阻塞线程直至被提醒且原子值更改(公开成员函数)
notify_one (C++20)提醒至少一个在原子对象上的等待中阻塞的线程(公开成员函数)
notify_all (C++20)提醒所有在原子对象上的等待中阻塞的线程(公开成员函数)
调用atomic_flag 案例,通过atomic_flag的成员test_and_set 以及clear,我们可以实现一个自旋锁(spin lock)。
//test1.h
#ifndef _TEST_1_H_
#define _TEST_1_H_
void func1(void);
#endif //_TEST_1_H_
//test1.cpp
#include "test1.h"
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
std::atomic_flag lock = ATOMIC_FLAG_INIT;
void f(int n)
{
for (int cnt = 0; cnt < 10; ++cnt) {
while (lock.test_and_set(std::memory_order_acquire)) // 获得锁
; // 自旋
std::cout << "Output from thread " << n << '\n';
lock.clear(std::memory_order_release); // 释放锁
}
}
void func1(void)
{
std::vector<std::thread> v;
for (int n = 0; n < 10; ++n) {
v.emplace_back(f, n);
}
for (auto& t : v) {
t.join();
}
}
//main.cpp
//#include "test0.h"
#include "test1.h"
int main(int argc, char* argv[])
{
//func0();
func1();
return 0;
}
编译 g++ main.cpp test*.cpp -o test.exe -std=c++1,运行./test.exe输出如下:
$ ./test.exe
Output from thread 0
...
Output from thread 0
Output from thread 1
...
Output from thread 1
Output from thread 3
...
Output from thread 3
Output from thread 4
...
Output from thread 4
Output from thread 6
...
Output from thread 6
Output from thread 7
...
Output from thread 7
Output from thread 9
...
Output from thread 9
Output from thread 5
...
Output from thread 5
Output from thread 8
...
Output from thread 8
Output from thread 2
...
Output from thread 2
py_hp@py-for-home /cygdrive/d/workForMy/workspace/thread_test2
上述代码中,声明了一个全局的atomic_flag变量lock,将lock初始化为值ATOMIC_FLAG_INIT,即false的状态。而在线程0中(执行函数f的代码),我们不停地通过lock的成员test_and_set来设置lock为true。这里的test_and_set()是一种原子操作,用于在一个内存空间原子地写入新值并且返回旧值。因此 test_and_set会返回之前的lock 的值,因此f中的test and set将一直返回true,并不断打印信息,即自旋等待。
而当其他线程加入运行的时候,由于0线程调用了lock的成员clear,将lock的值设为false,因此此时线程0的自旋将终止,从而开始运行后面的代码。这样一来,其他线程开始抢占运行,每个线程都必须等到正执行的线程自旋终止,其他线程才能获得执行权限。
当然,还可以将lock封装为锁操作,比如:
void Lock(atomic_flag *lock){
while(lock.test_and_set());
}
void Unlock(atomic_flag *lock){
lock.clear();
}
这样一来,就可以通过Lock和UnLock操作,c++11前通过像mutex一样互斥地访问临界区了。除此之外,很多时候,了解底层的程序员会考虑使用无锁编程,以最大限度地挖掘并行编程的性能,而C++11的无锁机制为这样的实现提供了高级语言的支持。
类 std::atomic_flag在标准库头文件 <atomic>声明如下:
namespace std {
struct atomic_flag {
constexpr atomic_flag() noexcept;
atomic_flag(const atomic_flag&) = delete;
atomic_flag& operator=(const atomic_flag&) = delete;
atomic_flag& operator=(const atomic_flag&) volatile = delete;
bool test(memory_order = memory_order::seq_cst) const volatile noexcept;
bool test(memory_order = memory_order::seq_cst) const noexcept;
bool test_and_set(memory_order = memory_order::seq_cst) volatile noexcept;
bool test_and_set(memory_order = memory_order::seq_cst) noexcept;
void clear(memory_order = memory_order::seq_cst) volatile noexcept;
void clear(memory_order = memory_order::seq_cst) noexcept;
void wait(bool, memory_order = memory_order::seq_cst) const volatile noexcept;
void wait(bool, memory_order = memory_order::seq_cst) const noexcept;
void notify_one() volatile noexcept;
void notify_one() noexcept;
void notify_all() volatile noexcept;
void notify_all() noexcept;
};
}
1.5 原子操作-内存同步顺序
在上面的例子中,我们的原子操作都是比较直观的。事实上,在C++11中,原子操作还可以包含一个参数:std::memory_order。通常情况下,使用该参数将有利于编译器进一步释放并行的潜在的性能。
std::memory_order,指定内存访问,包括常规的非原子内存访问,如何围绕原子操作排序。在没有任何制约的多处理器系统上,多个线程同时读或写数个变量时,一个线程能观测到变量值更改的顺序不同于另一个线程写它们的顺序。其实,更改的顺序甚至能在多个读取线程间相异。一些类似的效果还能在单处理器系统上出现,因为内存模型允许编译器变换。
//定义于头文件 <atomic>
typedef enum memory_order
{
/*宽松操作:没有同步或顺序制约,仅对此操作要求原子性*/
memory_order_relaxed,
/*
*释放消费顺序,有此内存顺序的加载操作,在其影响的内存位置进行消费操作:
*当前线程中依赖于当前加载的该值的读或写不能被重排到此加载前。
*其他释放同一原子变量的线程的对数据依赖变量的写入,为当前线程所可见。
*在大多数平台上,这只影响到编译器优化
*/
memory_order_consume,
/*
*释放获得顺序,有此内存顺序的加载操作,在其影响的内存位置进行获得操作:
*当前线程中读或写不能被重排到此加载前。
*其他释放同一原子变量的线程的所有写入,能为当前线程所见
*/
memory_order_acquire,
/*
*有此内存顺序的存储操作进行释放操作:
*当前线程中的读或写不能被重排到此存储后。
*当前线程的所有写入,可见于获得该同一原子变量的其他线程(释放获得顺序),
*并且对该原子变量的带依赖写入变得对于其他消费同一原子对象的线程可见(释放消费顺序)
*/
memory_order_release,
/*
*带此内存顺序的读修改写操作既是获得操作又是释放操作。
*当前线程的读或写内存不能被重排到此存储前或后。
*所有释放同一原子变量的线程的写入可见于修改之前,而且修改可见于其他获得同一原子变量的线程
*/
memory_order_acq_rel,
/*
*序列一致顺序,有此内存顺序的加载操作进行获得操作,存储操作进行释放操作,
*而读修改写操作进行获得操作和释放操作,再加上存在一个单独全序,
*其中所有线程以同一顺序观测到所有修改
*/
memory_order_seq_cst
} memory_order;
库中所有原子操作的默认行为提供序列一致顺序。该默认行为可能有损性能,不过可以给予库的原子操作额外的 std::memory_order 参数,以指定附加制约,在原子性外,编译器和处理器还必须强制该操作。
【1】宽松顺序
带标签 memory_order_relaxed 的原子操作无同步操作;它们不会在共时的内存访问间强加顺序。它们只保证原子性和修改顺序一致性。
//test2.h
#ifndef _TEST_2_H_
#define _TEST_2_H_
void order_relaxed(void);
#endif //_TEST_2_H_
//test2.cpp
#include "test2.h"
#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> cnt = {0};
void f1(int id)
{
for (int n = 0; n < 1000; ++n) {
cnt.fetch_add(1, std::memory_order_relaxed);
}
std::cout << "this thread " << id <<" end and counter value is " << cnt << '\n';
};
void order_relaxed(void)
{
std::vector<std::thread> v;
for (int n = 0; n < 10; ++n) {
v.emplace_back(f1,n);
}
for (auto& t : v) {
t.join();
}
std::cout << "Final counter value is " << cnt << '\n';
};
//main.cpp
// #include "test0.h"
// #include "test1.h"
#include "test2.h"
int main(int argc, char* argv[])
{
// func0();
// func1();
order_relaxed();
return 0;
}
编译测试,在这个计数器自增例子中,只要求原子性,确保自增计算正确,但不要求顺序或同步。
【2】释放获得顺序
若线程 A 中的一个原子存储带标签 memory_order_release ,而线程 B 中来自同一变量的原子加载带标签 memory_order_acquire ,则从线程 A 的视角先发生于原子存储的所有内存写入(非原子及宽松原子的),在线程 B 中成为可见副效应,即一旦原子加载完成,则保证线程 B 能观察到线程 A 写入内存的所有内容。同步仅建立在释放和获得同一原子对象的线程之间。其他线程可能看到与被同步线程的一者或两者相异的内存访问顺序。
//test3.h
#ifndef _TEST_3_H_
#define _TEST_3_H_
void order_release(void);
#endif //_TEST_3_H_
//test3.cpp
#include "test3.h"
#include <thread>
#include <atomic>
#include <cassert>
#include <string>
#include <iostream>
std::atomic<std::string*> ptr;
int data;
void producer()
{
std::string* p = new std::string("Hello");
data = 100;
ptr.store(p, std::memory_order_release); //释放操作
};
void consumer()
{
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_acquire)))//获得操作
;
std::cout << "get order and start do something!\n ";
assert(*p2 == "Hello"); // 绝无问题
assert(data == 100); // 绝无问题
};
void order_release(void)
{
std::thread t1(consumer);
std::thread t2(producer);
t1.join();
t2.join();
};
//main.cpp
// #include "test0.h"
// #include "test1.h"
// #include "test2.h"
#include "test3.h"
int main(int argc, char* argv[])
{
// func0();
// func1();
// order_relaxed();
order_release();
return 0;
}
互斥锁释放获得同步的例子:线程 t2 释放锁而线程 t1 获得它时,发生于线程 t2 环境的临界区(释放之前)中的所有事件,必须对于执行同一临界区的线程 t1 (获得之后)可见。
【3】释放消费顺序
若线程 A 中的原子存储带标签 memory_order_release 而线程 B 中来自同一对象的读取存储值的原子加载带标签 memory_order_consume ,则线程 A 视角中先发生于原子存储的所有内存写入(非原子和宽松原子的),会在线程 B 中该加载操作所携带依赖进入的操作中变成可见副效应,即一旦完成原子加载,则保证线程B中,使用从该加载获得的值的运算符和函数,能见到线程 A 写入内存的内容。
见上述例子简单更改一下,将memory_order_acquire替换成memory_order_consume:
//test4.h
#ifndef _TEST_4_H_
#define _TEST_4_H_
void order_release_consume(void);
#endif //_TEST_4_H_
//test4.cpp
#include "test4.h"
#include <thread>
#include <atomic>
#include <cassert>
#include <string>
#include <iostream>
std::atomic<std::string*> ptr;
int data;
void producer()
{
std::string* p = new std::string("Hello");
data = 100;
ptr.store(p, std::memory_order_release);
};
void consumer()
{
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume)))//更换成memory_order_consume
;
std::cout << "get order and start do something!\n ";
assert(*p2 == "Hello"); // 绝无出错: *p2 从 ptr 携带依赖
assert(data == 100); // 可能也可能不会出错: data 不从 ptr 携带依赖
};
void order_release_consume(void)
{
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
};
//main.cpp
#include "test4.h"
int main(int argc, char* argv[])
{
order_release_consume();
return 0;
}
上述代码中,用于指针中介的发布的依赖定序同步:int data不由数据依赖关系关联到指向字符串的指针,从而其值在消费者中有可能未定义或同步。
【3】序列顺序一致
memory_order_seq_cst表示该原子操作必须是顺序一致的,这是C++11中所有atomic 原子操作的默认值,不带memory_order参数的原子操作就是使用该值。 带标签 memory_order_seq_cst 的原子操作不仅以与释放/获得顺序相同的方式排序内存(在一个线程中先发生于存储的任何结果都变成进行加载的线程中的可见副效应),还对所有带此标签的内存操作建立单独全序。
//test5.h
#ifndef _TEST_5_H_
#define _TEST_5_H_
void order_seq_cst(void);
#endif //_TEST_5_H_
//test5.cpp
#include "test5.h"
#include <thread>
#include <atomic>
#include <cassert>
#include <iostream>
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
void write_x()
{
x.store(true, std::memory_order_seq_cst);
};
void write_y()
{
y.store(true, std::memory_order_seq_cst);
};
void read_x_then_y()
{
while (!x.load(std::memory_order_seq_cst))
;
if (y.load(std::memory_order_seq_cst)) {
++z;
}
std::cout << "read_x_then_y->"<<"x:"<<x<<","<<"y:"<<y<<","<< "z:" << z << "\n";
};
void read_y_then_x()
{
while (!y.load(std::memory_order_seq_cst))
;
if (x.load(std::memory_order_seq_cst)) {
++z;
}
std::cout << "read_y_then_x->"<<"x:"<<x<<","<<"y:"<<y<<","<< "z:" << z << "\n";
};
void order_seq_cst(void)
{
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join(); b.join(); c.join(); d.join();
assert(z.load() != 0); // 决不发生
std::cout << "order_seq_cst->"<<"x:"<<x<<","<<"y:"<<y<<","<< "z:" << z << "\n";
};
//main.cpp
#include "test5.h"
int main(int argc, char* argv[])
{
order_seq_cst();
return 0;
}
此示例演示序列一直顺序为必要的场合。任何其他顺序都可能触发assert,因为可能令线程c和d观测到原子对象x和y以相反顺序更改。由于c/d线程切入时机不同,会造成获得a/b不同态势,因此其输出信息就显得每次可能不大相同,但是数据增加或者说是次序是得到保证的。(大家可以将a/b/c/d四个对象声明定义更换一下次序测试看看更多展示效果)
标准库的设计者的考量内存模型远远多过于这一点,他们对各种平台、处理器、编程方式都进行了考量,总结出了不同的"内存模型"。例如,让一些代码遵守先于发生的关系,而另外一部分的代码不遵守,在C++11中,这是完全可能呢。事实上,顺序一致只是属于C++11中多种内存模型中的一种。而在C++11中,并不是只支持顺序一致单个内存模型的原子变量,因为顺序一致往往意味着最低效的同步方式。
程序的内存中的数据改变最终是落实到机器指令执行的,如果内存数据的改变次序与机器执行指令保持一致,就是强顺序的,反之就是弱顺序的。c/c++编译器处于优化的考虑,会将一些指令按指令的关联关系、约束关系等进行指令移动,使其最佳指令排列和产生最佳运行时性能,但是对于某些平台体系结构是强顺序的,某些是弱顺序的,在C++11中,原子类型的成员函数(原子操作)总是保证了顺序一致性。这对于x86这样的强顺序平台来说,禁止了编译器对原子类型变量间的重排序优化;而对于PowerPC这样的弱顺序平台来说,则不仅禁止了编译器的优化,还插入了大量的内存栅栏。这对于意图是提高性能的多线程程序而言,无疑是一种性能伤害。在C++11中,设计者给出的解决方式是让程序员为原子操作指定所谓的内存顺序:memory_order。
顺序一致、松散、release-acquire和release-consume通常是最为典型的4种内存顺序,其他的如 memory_order_acq_rel,则是常用于实现一种叫做CAS(compare and swap)的基本同步元语,对应到atomic的原子操作compare_exchange_strong成员函数上。我们也称之为acquire-release 内存顺序。
由于并行编程在C++11中是非常新的一个话题,因此C++11中关于原子操作的设计还涉及大量的细节和众多特性,另外还不断在更高级的标准版本中进行调整。通常大家直接简单地使用C++11原子操作的顺序一致性就可以进行并行程序的编写了。而如果想让自己的程序在多线程情况下获得更好的性能的话,尤其当使用的是一些弱内存顺序的平台,比如 PowerPC的话,建立原子操作间内存顺序则很有必要,因为这可会带来极大的性能提升(事实上,这也是弱一致性内存模型平台的优势)。
但对于并行编程来说,可能最根本的还是思考如何将大量计算的问题,按需分解成多个独立的、能够同时运行的部分,并找出真正需要在线程间共享的数据,实现为C++11的原子类型。虽然有了原子类型的良好设计,实现这些都可以非常的便捷,但并不是所有的问题或者计算都适合用并行计算来解决,对于不适用的问题,强行用并行计算来解决会收效甚微,甚至起到相反效果。因此在决定使用并行计算解决问题之前,必须要有清晰的设计规划。而在实现了代码并行后,更要进一步使用一些性能调试工具来提高并行程序的性能。
二、 原子操作库
除了std::atomic类模板及其相关原子操作外,标准库定义了原子操作库集合,原子库为细粒度的原子操作提供组件,允许无锁并发编程。涉及同一对象的每个原子操作,相对于任何其他原子操作是不可分的。原子对象不具有数据竞争。
原子类型
atomic (C++11)atomic类模板及其针对布尔、整型和指针类型的特化(类模板)
atomic_ref (C++20)提供非原子对象上的原子操作(类模板)
原子类型上的操作
atomic_is_lock_free (C++11)检查对该原子类型的操作是否是无锁的(函数模板)
atomic_store (C++11)原子地以非原子实参替换原子对象的值(函数模板)
atomic_store_explicit
atomic_load (C++11)原子地获得存储于原子对象的值(函数模板)
atomic_load_explicit
atomic_exchange (C++11)原子地以非原子实参的值替换原子对象的值,并返回该原子对象的旧值(函数模板)
atomic_exchange_explicit
atomic_compare_exchange_weak (C++11)原子地比较原子对象和非原子实参的值,若相等则进行 atomic_exchange,若不相等则进行 atomic_load(函数模板)
atomic_compare_exchange_weak_explicit
atomic_compare_exchange_strong
atomic_compare_exchange_strong_explicit
atomic_fetch_add (C++11)将非原子值加到原子对象,并获得原子对象的先前值(函数模板)
atomic_fetch_add_explicit
atomic_fetch_sub (C++11)从原子对象减去非原子值,并获得原子对象的先前值(函数模板)
atomic_fetch_sub_explicit
atomic_fetch_and (C++11)将原子对象替换为与非原子实参逻辑与的结果,并获得原子对象的先前值(函数模板)
atomic_fetch_and_explicit
atomic_fetch_or (C++11)将原子对象替换为与非原子实参逻辑或的结果,并获得原子对象的先前值(函数模板)
atomic_fetch_or_explicit
atomic_fetch_xor (C++11)将原子对象替换为与非原子实参逻辑异或的结果,并获得原子对象的先前值(函数模板)
atomic_fetch_xor_explicit
atomic_wait (C++20)阻塞线程直至被提醒且原子值更改(函数模板)
atomic_wait_explicit
atomic_notify_one (C++20)提醒一个在 atomic_wait 中阻塞的线程(函数模板)
atomic_notify_all (C++20)提醒所有在 atomic_wait 中阻塞的线程(函数模板)
标志类型及操作
atomic_flag (C++11)免锁的布尔原子类型(类)
atomic_flag_test_and_set (C++11)原子地设置标志为 true 并返回其先前值(函数)
atomic_flag_test_and_set_explicit
atomic_flag_clear (C++11)原子地设置标志值为 false(函数)
atomic_flag_clear_explicit
atomic_flag_test (C++20)原子地返回标志的值(函数)
atomic_flag_test_explicit
atomic_flag_wait (C++20)阻塞线程,直至被提醒且标志更改(函数)
atomic_flag_wait_explicit
atomic_flag_notify_one (C++20)提醒一个在 atomic_flag_wait 中阻塞的线程(函数)
atomic_flag_notify_all (C++20)提醒所有在 atomic_flag_wait 中阻塞的线程(函数)
初始化
atomic_init (C++11)(C++20 中弃用)对默认构造的原子对象进行非原子初始化(函数模板)
ATOMIC_VAR_INIT (C++11)(C++20 中弃用)静态存储期的原子对象的常量初始化(宏函数)
ATOMIC_FLAG_INIT (C++11)(C++20 中弃用)将 std::atomic_flag 初始化为 false(宏常量)
内存同步顺序
memory_order (C++11)为给定的原子操作定义内存顺序约束(枚举)
kill_dependency (C++11)从 std::memory_order_consume 依赖树移除指定对象(函数模板)
atomic_thread_fence(C++11)通用的依赖内存顺序的栅栏同步原语(函数)
atomic_signal_fence(C++11)线程与执行于同一线程的信号处理函数间的栅栏(函数)
原子类型的 C 兼容性,定义于头文件 <stdatomic.h>
_Atomic (C++23) 使得_Atomic(T) 等同于 std::atomic<T> 的兼容性宏(宏函数)
<stdatomic.h> 以外的C++标准库头文件不提供_Atomic宏或任何非宏的全局命名空间声明。(C++23 起)
上述原子操作库集合中,标记类型及操作atomic_flag系列,内存同步顺序memory_order实现前面都简述过,而初始化函数集在c++20弃用。
2.1 原子类型的操作函数模板-数值计算
现在看看原子类型的操作,它们是实现原子类型std::atomic的类型操作成员函数基础,本质上原子类型std::atomic成员操作函数是调用了上述的原子操作库的原子类型操作函数模板来实现的,“见1.3 原子操作功能与应用”,看下面案例,
//test6.h
#ifndef _TEST_6_H_
#define _TEST_6_H_
void atomic_operator(void);
#endif //_TEST_6_H_
//test6.cpp
#include "test6.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <vector>
std::atomic<long long> data;
// long long ll_ = 1;
long long ll_[5] = {7,10,100,1000,10000};
void do_work(int id)
{
switch (id)
{
case 1:
data.store(1, std::memory_order_relaxed); //1
//std::atomic_store(&data,1L);//1,一样OK
//std::atomic_store_explicit(&data,1L,std::memory_order_relaxed);//1,一样OK
break;
case 2:
data.load(std::memory_order_relaxed);
//std::atomic_load_explicit(&data,std::memory_order_relaxed);
data+=2;
data.store(data, std::memory_order_relaxed); //1+2=3
break;
case 3:
data.fetch_add(3, std::memory_order_relaxed); //3+3=6
break;
case 4:
data.fetch_sub(4, std::memory_order_relaxed); //6-4=2
break;
case 5:
data.fetch_and(5, std::memory_order_relaxed); //2&5=0
break;
case 6:
data.fetch_or(6, std::memory_order_relaxed); //0|6=6
break;
case 7:
data.fetch_xor(7, std::memory_order_relaxed); //6^7=1
break;
case 8:
data.exchange(7, std::memory_order_relaxed); //1<->7=7
break;
case 9:
std::atomic_compare_exchange_weak(&data,ll_,10);//ll_[0]==data,data=10
std::cout << "curent data = " << data ;
std::cout << " ll_[0] data = " << ll_[0] << "\n";
std::atomic_compare_exchange_strong(&data,ll_,10);//ll_[0]!=data,ll_[0]=10
std::cout << "curent data = " << data ;
std::cout << " ll_[0] data = " << ll_[0] << "\n";
return;
default:
std::cout << "std::atomic_is_lock_free = " << std::atomic_is_lock_free(&data) << "\n";
return;
}
std::cout << "curent data = " << data << "\n";
};
void atomic_operator(void)
{
std::vector<std::thread> ths;
for (size_t i = 0; i < 10; i++)
{
ths.emplace_back(std::thread(do_work,i));
std::this_thread::sleep_for(std::chrono::milliseconds(5));//等待5毫秒,确保顺序
}
for (size_t i = 0; i < 10; i++)
{
ths[i].join();
}
ths.clear();
std::cout << "Result:" << data << '\n';
};
//main.cpp
#include "test6.h"
int main(int argc, char* argv[])
{
atomic_operator();
return 0;
}
编译运行效果如下:
2.2 原子类型的操作函数模板-阻塞
c++20新增了四个原子类型操作函数模板,这是用来做线程阻塞等待的。
/*阻塞线程直至被提醒且原子值更改(函数模板) */
template< class T >
void atomic_wait( const std::atomic<T>* object,
typename std::atomic<T>::value_type old ) noexcept;
template< class T >
void atomic_wait( const volatile std::atomic<T>* object,
typename std::atomic<T>::value_type old ) noexcept;
template< class T >
void atomic_wait_explicit( const std::atomic<T>* object,
typename std::atomic<T>::value_type old,std::memory_order order ) noexcept;
template< class T >
void atomic_wait_explicit( const volatile std::atomic<T>* object,
typename std::atomic<T>::value_type old,std::memory_order order ) noexcept;
/*
*若有线程阻塞于 *object 上的原子等待操作(即 std::atomic_wait() 、 *std::atomic_wait_explicit() 或 std::atomic::wait() ),
*则除阻至少一个这种线程;否则不做任何事。
*等价于 object->notify_one() 。*/
template< class T > void atomic_notify_one( std::atomic<T>* object );
template< class T > void atomic_notify_one( volatile std::atomic<T>* object );
/*
*除阻所有被 上的原子等待操作(即 std::atomic_wait() 、 std::atomic_wait_explicit() 或 *std::atomic::wait() )阻塞的线程,若有;否则不做任何事。
*等价于 object->notify_all() 。
*/
template< class T > void atomic_notify_all( std::atomic<T>* object );
template< class T > void atomic_notify_all( volatile std::atomic<T>* object );
//(C++20)针对原子布尔类型的
atomic_flag_wait 阻塞线程,直至被提醒且标志更改(函数)
atomic_flag_wait_explicit
atomic_flag_notify_one 提醒一个在 atomic_flag_wait 中阻塞的线程(函数)
atomic_flag_notify_all 提醒所有在 atomic_flag_wait 中阻塞的线程(函数)
这些线程阻塞相关的函数模板,和原子类型std::atomic的成员函数是一致的:
//std::atomic类成员函数
wait (C++20) 阻塞线程直至被提醒且原子值更改(公开成员函数)
notify_one (C++20) 提醒至少一个在原子对象上的等待中阻塞的线程(公开成员函数)
notify_all (C++20) 提醒所有在原子对象上的等待中阻塞的线程(公开成员函数)
线程阻塞成员函数或函数模板应用例子:
//test8.h
#ifndef _TEST_8_H_
#define _TEST_8_H_
void atomic_wait_test(void);
#endif //_TEST_8_H_
//test8.cpp
#include "test8.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
std::atomic<int> a_i;
void func_send(int id)
{
for (size_t i = 0; i < 5; i++)
{
a_i.store(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));//等待100毫秒,确保顺序
std::cout << "func_send a_i = " << a_i << "\n";
}
std::atomic_notify_one(&a_i); //如果注释该项,调用func_wait的线程一直在阻塞
//a_i.notify_one();
};
void func_wait(int id)
{
int val = a_i.load();
//a_i.wait(val);
std::atomic_wait(&a_i,val);//阻塞直至 &a_i 被 notify_one() 或 notify_all() 提醒,或线程被虚假地除阻
std::cout << "func_wait a_i = " << a_i << "\n";
};
void atomic_wait_test(void)
{
std::thread a(func_send,1);
std::thread b(func_wait,2);
a.join();
b.join();
};
//main.cpp
编译 g++ main.cpp test8.cpp -o test.exe -std=c++20,运行程序如下:
2.3 原子类型的操作函数模板-自增自减
关于原子类型的自增、自检等操作,本质上还是调用前面讲述的原子操作库的原子类型操作函数模板来实现的:
//c++11起
T operator++() noexcept; (仅为 atomic<Integral> 模板特化的成员)
T operator++() volatile noexcept;
T* operator++() noexcept; (仅为 atomic<T*> 模板特化的成员)
T* operator++() volatile noexcept;
T operator++( int ) noexcept; (仅为 atomic<Integral> 模板特化的成员)
T operator++( int ) volatile noexcept;
T* operator++( int ) noexcept; (仅为 atomic<T*> 模板特化的成员)
T* operator++( int ) volatile noexcept;
T operator--() noexcept; (仅为 atomic<Integral> 模板特化的成员)
T operator--() volatile noexcept;
T* operator--() noexcept; (仅为 atomic<T*> 模板特化的成员)
T* operator--() volatile noexcept;
T operator--( int ) noexcept; (仅为 atomic<Integral> 模板特化的成员)
T operator--( int ) volatile noexcept;
T* operator--( int ) noexcept; (仅为 atomic<T*> 模板特化的成员)
T* operator--( int ) volatile noexcept;
原子地自增或自减当前值。操作为读-修改-写操作。
- 进行原子前自增。等价于 fetch_add(1)+1 。
- 进行原子后自增。等价于 fetch_add(1) 。
- 进行原子前自减。等价于 fetch_sub(1)-1 。
- 进行原子后自减。等价于 fetch_sub(1) 。
对于有符号整数类型,算术定义为使用补码表示。无未定义结果。对于 T*
类型,结果可能为未定义地址,但此外这些操作不会有未定义行为。
//test.h
#ifndef _TEST_H_
#define _TEST_H_
#include <atomic>
#include <iostream>
using namespace std;
void func(void)
{
int i_vec[10] = {0,1,2,3,4,5,6,7,8,9};
atomic<int> i_val(0);
atomic<int*> pi_val(i_vec);
i_val++;
pi_val++;
cout << "i_val = " << i_val << "\n"; //1
cout << "*pi_val = " << *pi_val << "\n";//1
i_val+=5;
pi_val+=5;
cout << "i_val = " << i_val << "\n"; //6
cout << "*pi_val = " << *pi_val << "\n";//6
i_val--;
pi_val--;
cout << "i_val = " << i_val << "\n"; //5
cout << "*pi_val = " << *pi_val << "\n";//5
i_val-=2;
pi_val-=2;
cout << "i_val = " << i_val << "\n"; //3
cout << "*pi_val = " << *pi_val << "\n";//3
};
#endif //_TEST_H_
//main.cpp
#include "test.h"
int main(int argc, char* argv[])
{
func();
return 0;
}
2.4 c++20新增的原子类型-std::atomic_ref类模板
c++20新增了std::atomic_ref类模板,它和std::atomic很类似,std::atomic_ref 类模板应用原子操作到其所引用的对象。在 atomic_ref 对象的生存期中,认为其所引用的对象是原子对象。若一个线程写入原子对象,同时另一线程从它读取,则行为良好定义(数据竞争上的细节见内存模型)。另外,对原子对象的访问可以建立线程间同步,和按 std::memory_order 所指定排序非原子内存访问。
//定义于头文件 <atomic>
template< class T > struct atomic_ref;
template< class T > struct atomic_ref<T*>;
对象的生存期必须超出所有引用该对象的 atomic_ref 的生存期。任何 atomic_ref 实例引用存在的对象时,必须只通过这些 atomic_ref 实例排他地访问该对象。 atomic_ref 对象所引用对象的任何子对象均不可同时为任何其他 atomic_ref 对象所引用。通过 atomic_ref 应用到对象的原子操作,相对于通过任何其他引用同一对象的 atomic_ref 应用的操作是原子的。std::atomic_ref 为可复制构造 (CopyConstructible) 。类似引用,常性对 atomic_ref 为浅,可通过 const atomic_ref 对象修改被引用的值。
std::atomic 类模板具有的成员函数、常量、特化成员函数,std::atomic_ref 类模板都对应具备,这里就不展开描述了,“见1.3 原子操作功能与应用”。主要就明确std::atomic_ref 类模板允许你将非原子对象当作原子对象的类型。例如,你可以创建一个std::atomic_ref<MyTest>,指向自定义的结构体MyTest,这样就可以把它当成atomic<MyTest>使用了,看下面例子:
//test7.h
#ifndef _TEST_7_H_
#define _TEST_7_H_
void atomic_ref_test(void);
#endif //_TEST_7_H_
//test7.cpp
#include "test7.h"
#include <iostream>
#include <thread>
#include <atomic>
struct MyTest {
MyTest() : a(0),b(0){};
MyTest(int a_,int b_) : a(a_),b(b_){};
int a;
int b;
} ATest; // 用户定义的可平凡复制类型
// std::atomic<MyTest > cnt(ATest);
std::atomic_ref<MyTest> cnt(ATest); // 对用户定义类型的特化
void func_test(int id)
{
for (size_t i = 0; i < 10; i++)
{
switch (id)
{
case 1:
cnt.store(MyTest(i+1,i-1));
break;
case 2:
cnt.store(MyTest(i-1,i+1));
break;
default:
break;
}
std::cout << "Result:" << cnt.load().a << ","<< cnt.load().b << '\n';
}
};
void atomic_ref_test(void)
{
std::thread athread(func_test,1);
std::thread bthread(func_test,2);
athread.join();
bthread.join();
};
//main.cpp"
#include "test7.h"
int main(int argc, char* argv[])
{
atomic_ref_test();
return 0;
}
C++20的这个方案需要定义一个新类型,其接口大部分与atomic重复,上述代码中,采用std::atomic替代std::atomic_ref效果是一样的。上述代码通过 g++ main.cpp test7.cpp -o test.exe -std=c++20命令编译及运行如下:
通过自定义类型传递进原子模板,就可以是的自定义类型像原子操作一样使用。虽然自定义类型可以作为原子类型模板的模板参数使用,但一般不建议这么做,上述例子采用两个std::atomic_int一样能达成效果,而不必定义一个结构体,还需要给其提供标准布局、平凡默认构造函数 (C++20 前)和平凡析构函数。如果要使用到 fetch_add 、 fetch_sub 、 fetch_and 、 fetch_or 、 fetch_xor,++,--等原子操作,还相应地为自定义类型提供适合的原子算术运算定义。
三、线程支持库
C++11起标准库包含线程、互斥、条件变量和future的内建支持。
线程,线程使得程序能在数个处理器核心同时执行。定义于头文件 <thread>
thread (C++11)管理单独的线程(类)
jthread (C++20)有自动合并和取消支持的 std::thread(类)
管理当前线程的函数,定义于命名空间 this_thread
yield (C++11)建议实现重新调度各执行线程(函数)
get_id (C++11)返回当前线程的线程 id(函数)
sleep_for (C++11)使当前线程的执行停止指定的时间段(函数)
sleep_until (C++11)使当前线程的执行停止直到指定的时间点(函数)
线程取消,定义于头文件 <stop_token>
stop_token (C++20)查询是否已经做出 std::jthread 取消请求的接口(类)
stop_source (C++20)表示请求停止一个或多个 std::jthread 的类(类)
stop_callback (C++20)在 std::jthread 取消上注册回调的接口(类模板)
缓存大小访问,定义于头文件 <new>
hardware_destructive_interference_size (C++17)避免假共享的最小偏移(常量)
hardware_constructive_interference_size (C++17)促使真共享的最大偏移(常量)
互斥,互斥算法避免多个线程同时访问共享资源。这会避免数据竞争,并提供线程间的同步支持。
定义于头文件 <mutex>
mutex (C++11)提供基本互斥设施(类)
timed_mutex (C++11)提供互斥设施,实现有时限锁定(类)
recursive_mutex (C++11)提供能被同一线程递归锁定的互斥设施(类)
recursive_timed_mutex (C++11)提供能被同一线程递归锁定的互斥设施,并实现有时限锁定(类)
定义于头文件 <shared_mutex>
shared_mutex (C++17)提供共享互斥设施(类)
shared_timed_mutex (C++14)提供共享互斥设施并实现有时限锁定(类)
通用互斥管理,定义于头文件 <mutex>
lock_guard (C++11)实现严格基于作用域的互斥体所有权包装器(类模板)
scoped_lock (C++17)用于多个互斥体的免死锁 RAII 封装器(类模板)
unique_lock (C++11)实现可移动的互斥体所有权包装器(类模板)
shared_lock (C++14)实现可移动的共享互斥体所有权封装器(类模板)
defer_lock_t (C++11)用于指定锁定策略的标签类型(类)
try_to_lock_t
adopt_lock_t
defer_lock (C++11)用于指定锁定策略的标签常量(常量)
try_to_lock
adopt_lock
通用锁定算法
try_lock (C++11)试图通过重复调用 try_lock 获得互斥体的所有权(函数模板)
lock (C++11)锁定指定的互斥体,若任何一个不可用则阻塞(函数模板)
单次调用
once_flag (C++11)确保 call_once 只调用函数一次的帮助对象(类)
call_once (C++11)仅调用函数一次,即使从多个线程调用(函数模板)
条件变量,允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。
定义于头文件 <condition_variable>
condition_variable (C++11)提供与 std::unique_lock 关联的条件变量(类)
condition_variable_any (C++11)提供与任何锁类型关联的条件变量(类)
notify_all_at_thread_exit (C++11)安排到在此线程完全结束时对 notify_all 的调用(函数)
cv_status (C++11)列出条件变量上定时等待的可能结果(枚举)
信号量 (semaphore) 是一种轻量的同步原件,用于制约对共享资源的并发访问。在可以使用两者时,信号量能比条件变量更有效率。
定义于头文件 <semaphore>
counting_semaphore (C++20)实现非负资源计数的信号量(类模板)
binary_semaphore (C++20)仅拥有二个状态的信号量(typedef)
闩与屏障,闩 (latch) 与屏障 (barrier) 是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达该屏障。闩不能复用,屏障能重复使用。
定义于头文件 <latch>
latch (C++20)单次使用的线程屏障(类)
定义于头文件 <barrier>
barrier (C++20)可复用的线程屏障(类模板)
Future,标准库提供了一些工具来获取异步任务(即在单独的线程中启动的函数)的返回值,并捕捉其所抛出的异常。这些值在共享状态中传递,其中异步任务可以写入其返回值或存储异常,而且可以由持有该引用该共享态的 std::future 或 std::shared_future 实例的线程检验、等待或是操作这个状态。
定义于头文件 <future>
promise (C++11)存储一个值以进行异步获取(类模板)
packaged_task (C++11)打包一个函数,存储其返回值以进行异步获取(类模板)
future (C++11)等待被异步设置的值(类模板)
shared_future (C++11)等待被异步设置的值(可能为其他 future 所引用)(类模板)
async (C++11)异步运行一个函数(有可能在新线程中执行),并返回保有其结果的 std::future(函数模板)
launch (C++11)指定 std::async 所用的运行策略(枚举)
future_status (C++11)指定在 std::future 和 std::shared_future 上的定时等待的结果(枚举)
Future 错误
future_error (C++11)报告与 future 或 promise 有关的错误(类)
future_category (C++11)鉴别 future 错误类别(函数)
future_errc (C++11)鉴别 future 错误码(枚举)
3.1 管理当前线程的函数
线程函数管理主要是获取线程编号、打乱当前线程执行次序、睡眠等待等操作。
/*
*std::this_thread::yield,定义于头文件 <thread>,(C++11 起)
*提供提示给实现,以重调度线程的执行,允许其他线程运行。
*/
void yield() noexcept;
/*
*std::this_thread::get_id,定义于头文件 <thread>,(C++11 起)
*返回当前线程的 id 。
*/
std::thread::id get_id() noexcept;
/*
*std::this_thread::sleep_for,定义于头文件 <thread>,(C++11 起)
*阻塞当前线程执行,至少经过指定的 sleep_duration 。
*此函数可能阻塞长于 sleep_duration ,因为调度或资源争议延迟。
*标准库建议用稳定时钟度量时长。若实现用系统时间代替,则等待时间亦可能对时钟调节敏感。
*/
template< class Rep, class Period >
void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );
/*
*std::this_thread::sleep_until,定义于头文件 <thread>,(C++11 起)
*阻塞当前线程,直至抵达指定的 sleep_time 。
*/
template< class Clock, class Duration >
void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );
示例代码:
//test9.h
#ifndef _TEST_9_H_
#define _TEST_9_H_
void thread_sleep(void);
#endif //_TEST_9_H_
//test9.cpp
#include "test9.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <mutex>
#include <deque>
std::atomic_int a_i;
std::mutex g_display_mutex;
std::deque<std::string> msgs;
void foo()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread::id this_id = std::this_thread::get_id();
char buf[256] ={0};
for (size_t i = 0; i < 5; i++)
{
sprintf(buf,"thread id = %d, and val = %d, now will sleep %d milliseconds!\n"
,this_id,a_i++,10*i);
g_display_mutex.lock();
msgs.push_back(std::string(buf));
g_display_mutex.unlock();
// std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::milliseconds(10*i));
}
std::chrono::time_point<std::chrono::system_clock> until_time =
std::chrono::system_clock::now(); //当前时间
until_time += std::chrono::microseconds(100); //当前时间+100ms
std::this_thread::sleep_until(until_time); //延迟到100ms后
a_i.store(100);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> elapsed = end-start;
std::cout << "threa id = "<< this_id << " run finsh! ";
std::cout << "Waited " << elapsed.count() << " ms\n";
};
void print_msg()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread::id this_id = std::this_thread::get_id();
std::string msg_read="";
while (100!=a_i.load())
{
g_display_mutex.lock();
if(!msgs.empty())
{
msg_read = msgs.front();
msgs.pop_front();
std::cout << msg_read;
}
g_display_mutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));//等10ms
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> elapsed = end-start;
std::cout << "threa id = "<< this_id << " run finsh! ";
std::cout << "Waited " << elapsed.count() << " ms\n";
};
void thread_sleep(void)
{
std::thread t1(foo);
std::this_thread::sleep_for(std::chrono::milliseconds(5)); //错开时间
std::thread t2(foo);
std::this_thread::sleep_for(std::chrono::milliseconds(5)); //错开时间
std::thread t3(print_msg);
t1.join();
t2.join();
t3.join();
};
//main.cpp
#include "test9.h"
int main(int argc, char* argv[])
{
thread_sleep();
return 0;
}
通常线程ID设置,如果不指定编号的话,主线程ID一般 是1,后面的子线程是逐步+1来标识。std::this_thread::sleep_until可以等待到指定时刻,而std::this_thread::sleep_for是等待一段时间间隔,编译g++ main.cpp test9.cpp -o test.exe -std=c++11,运行代码
上述代码我们做一下调整:
// std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::milliseconds(10*i));
->
std::this_thread::yield();
// std::this_thread::sleep_for(std::chrono::milliseconds(10*i));
在编译测试,yield()虽然释放了次序,取消了等待,前面两个线程快速执行完毕,设置原子类型数值,是的while (100!=a_i.load())无法进入循环读取队列数据打印,也快速结束了线程:
3.2 互斥线程支持库
互斥算法避免多个线程同时访问共享资源。这会避免数据竞争,并提供线程间的同步支持。
/*
*std::mutex,定义于头文件 <mutex>,(C++11 起)
*mutex 类是能用于保护共享数据免受从多个线程同时访问的同步原语。
*/
class mutex;
/*
*std::timed_mutex,定义于头文件 <mutex>,(C++11 起)
*timed_mutex 类是能用于保护数据免受多个线程同时访问的同步原语。
*/
class timed_mutex;
/*
*std::recursive_mutex,定义于头文件 <mutex>,(C++11 起)
*recursive_mutex 类是同步原语,能用于保护共享数据免受从个多线程同时访问。
*/
class recursive_mutex;
/*
*std::recursive_timed_mutex,定义于头文件 <mutex>,(C++11 起)
*recursive_timed_mutex 是同步原语,能用于保护共享数据免受从多个线程同时访问。
*/
class recursive_timed_mutex;
/*
*std::shared_mutex,定义于头文件 <shared_mutex>,(C++17 起)
*shared_mutex 类是一个同步原语,可用于保护共享数据不被多个线程同时访问。
*/
class shared_mutex;
/*
*std::shared_timed_mutex,定义于头文件 <shared_mutex>,(C++14 起)
*shared_timed_mutex 类是能用于保护数据免受多个线程同时访问的同步原语。
*/
class shared_timed_mutex;
/*
*std::lock_guard,定义于头文件 <mutex>
*类 lock_guard 是互斥体包装器,为在作用域块期间占有互斥提供便利,RAII 风格机制。
*创建 lock_guard 对象时,它试图接收给定互斥的所有权。
*控制离开创建 lock_guard 对象的作用域时,销毁 lock_guard 并释放互斥。
*lock_guard 类不可复制。
*/
template< class Mutex > class lock_guard;
/*
*std::scoped_lock,定义于头文件 <mutex>,(C++17 起)
*类 scoped_lock 是提供便利 RAII 风格机制的互斥包装器,它在作用域块的存在期间占有一或多个互斥。
*创建 scoped_lock 对象时,它试图取得给定互斥的所有权。
*控制离开创建 scoped_lock 对象的作用域时,析构 scoped_lock 并释放互斥。
*若给出数个互斥,则使用免死锁算法,如同以 std::lock 。scoped_lock 类不可复制。
*/
template< class... MutexTypes > class scoped_lock;
/*
*std::unique_lock,定义于头文件 <mutex>(C++11 起)
*类 unique_lock 是通用互斥包装器,
*允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用.
*/
template< class Mutex > class unique_lock;
/*
*std::shared_lock,定义于头文件 <shared_mutex>,(C++14 起)
*类 shared_lock 是通用共享互斥所有权包装器,允许延迟锁定、定时锁定和锁所有权的转移。
*锁定 shared_lock,会以共享模式锁定关联的共享互斥(std::unique_lock可用于以排他性模式锁定)。
*/
template< class Mutex > class shared_lock;
/*std::defer_lock_t, std::try_to_lock_t, std::adopt_lock_t,定义于头文件 <mutex>,(C++11 起)
*用于为 std::lock_guard 、std::scoped_lock 、std::unique_lock 和 std::shared_lock 指定锁定策略的空类标签类型
*defer_lock_t 不获得互斥的所有权
*try_to_lock_t 尝试获得互斥的所有权而不阻塞
*adopt_lock_t 假设调用方线程已拥有互斥的所有权
*/
struct defer_lock_t { explicit defer_lock_t() = default; };
struct try_to_lock_t { explicit try_to_lock_t() = default; };
struct adopt_lock_t { explicit adopt_lock_t() = default; };
/*
*std::defer_lock, std::try_to_lock, std::adopt_lock,定义于头文件 <mutex>
*std::defer_lock 、 std::try_to_lock 和 std::adopt_lock 分别是空结构体标签类型 *std::defer_lock_t 、 std::try_to_lock_t 和 std::adopt_lock_t 的实例。
*它们用于为 std::lock_guard 、 std::unique_lock 及 std::shared_lock 指定锁定策略。
*defer_lock_t 不获得互斥的所有权
*try_to_lock_t 尝试获得互斥的所有权而不阻塞
*adopt_lock_t 假设调用方线程已拥有互斥的所有权
*/
constexpr std::defer_lock_t defer_lock {};// (C++11 起)(C++17 前)
inline constexpr std::defer_lock_t defer_lock {}; //(C++17 起)
constexpr std::try_to_lock_t try_to_lock {}; //(C++11 起)(C++17 前)
inline constexpr std::try_to_lock_t try_to_lock {}; //(C++17 起)
constexpr std::adopt_lock_t adopt_lock {}; //(C++11 起)(C++17 前)
inline constexpr std::adopt_lock_t adopt_lock {}; //(C++17 起)
/*
*std::try_lock,定义于头文件 <mutex>,(C++11 起)
*尝试锁定每个给定的可锁定 (Lockable) 对象 lock1、 lock2、 ...、 lockn ,通过以从头开始的顺序调用 try_lock 。
*/
template< class Lockable1, class Lockable2, class... LockableN >
int try_lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn );
/*
*std::lock,定义于头文件 <mutex>,(C++11 起)
*锁定给定的可锁定 (Lockable) 对象 lock1 、 lock2 、 ... 、 lockn ,用免死锁算法避免死锁。
*/
template< class Lockable1, class Lockable2, class... LockableN >
void lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn );
3.3 线程锁mutex
std::mutex类模板,定义于头文件 <mutex>,用于保护共享数据免受从多个线程同时访问的同步异常。其最主要的成员函数是lock和unlock,本质上就调用通用锁定算法来实现的。
成员类型 定义
native_handle_type (可选) 实现定义
成员函数
(构造函数) 构造互斥(公开成员函数)
(析构函数) 销毁互斥(公开成员函数)
operator= [被删除]不可复制赋值(公开成员函数)
锁定
lock 锁定互斥,若互斥不可用则阻塞(公开成员函数)
try_lock 尝试锁定互斥,若互斥不可用则返回(公开成员函数)
unlock 解锁互斥(公开成员函数)
原生句柄
native_handle 返回底层实现定义的原生句柄(公开成员函数)
通用锁定算法
try_lock (C++11)试图通过重复调用try_lock获得互斥体的所有权(函数模板)
lock (C++11)锁定指定的互斥体,若任何一个不可用则阻塞(函数模板)
mutex 提供排他性非递归所有权语义:
- 调用方线程从它成功调用 lock 或 try_lock 开始,到它调用 unlock 为止占有 mutex 。
- 线程占有 mutex 时,所有其他线程若试图要求 mutex 的所有权,则将阻塞(对于 lock 的调用)或收到 false 返回值(对于 try_lock ).
- 调用方线程在调用 lock 或 try_lock 前必须不占有 mutex 。
若 mutex 在仍为任何线程所占有时即被销毁,或在占有 mutex 时线程终止,则行为未定义。 mutex 类满足互斥体 (Mutex) 和标准布局类型 (StandardLayoutType) 的全部要求。std::mutex 既不可复制亦不可移动。
通常不直接使用 std::mutex : std::unique_lock 、 std::lock_guard 或 std::scoped_lock (C++17 起)以更加异常安全的方式管理锁定。
//test10.h
#ifndef _TEST_10_H_
#define _TEST_10_H_
void lock_test(void);
#endif //_TEST_10_H_
//test10.cpp
#include "test10.h"
#include <iostream>
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>
std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex;
void save_page(const std::string &url)
{
// 模拟长页面读取
// std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(100)); //等待
std::string result = "fake content";
std::lock_guard<std::mutex> guard(g_pages_mutex);
g_pages[url] = result;
}
struct Box {
explicit Box(int num) : num_things{num} {}
int num_things;
std::mutex m;
};
void transfer(Box &from, Box &to, int num)
{
// 仍未实际取锁
// std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);//指定锁定策略的标签类型
// std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
// 锁两个 unique_lock 而不死锁
// std::lock(lock1, lock2);
//和这三个语句等价
// 锁定两个互斥而不死锁
std::lock(from.m, to.m);
// 保证二个已锁定互斥在作用域结尾解锁
std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);//指定锁定策略的标签类型
std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);
from.num_things -= num;
to.num_things += num;
// 'from.m' 与 'to.m' 互斥解锁于 'unique_lock' 析构函数
}
std::mutex foo_count_mutex;
int i_count = 0;
void foo_major()
{
for (size_t i = 0; i < 100; i++)
{
foo_count_mutex.lock();
i_count += i;
for (int loop=0;loop<10000;++loop);//等待,可以改变次数测试
// std::this_thread::sleep_for(std::chrono::milliseconds(1)); //等待,更耗时的占用CPU
foo_count_mutex.unlock();
}
};
void foo_minor()
{
for (size_t i = 0; i < 1000; i++)
{
if(foo_count_mutex.try_lock()) //
{
i_count += i;
std::cout << "try_lock index = " << i << "\n";
for (int loop=0;loop<1000;++loop);//等待,可以改变次数测试
foo_count_mutex.unlock();
}
// std::this_thread::sleep_for(std::chrono::milliseconds(1)); //等待
}
};
void lock_test(void)
{
//lock_guard
std::thread t1(save_page, "http://foo");
std::thread t2(save_page, "http://bar");
t1.join(); t2.join();
// 现在访问g_pages是安全的,因为线程t1/t2生命周期已结束
for (const auto &pair : g_pages) {
std::cout << pair.first << " => " << pair.second << '\n';
}
//unique_lock
Box acc1(100);Box acc2(50);
std::thread t3(transfer, std::ref(acc1), std::ref(acc2), 10);
std::thread t4(transfer, std::ref(acc2), std::ref(acc1), 5);
t3.join();t4.join();
std::cout << "acc1.num_things = " << acc1.num_things << '\n';
std::cout << "acc2.num_things = " << acc2.num_things << '\n';
//try_lock
std::thread t5(foo_major);
std::thread t6(foo_minor);
t5.join(); t6.join();
std::cout << "i_count = " << i_count << '\n';
}
//main.cpp
#include "test10.h"
int main(int argc, char* argv[])
{
lock_test();
return 0;
}
编译 g++ main.cpp test10.cpp -o test.exe -std=c++11,运行程序可能输出如下:
3.4 线程锁timed_mutex 类。
timed_mutex 类满足定时互斥体 (TimedMutex) 与标准布局类型 (StandardLayoutType) 的所有要求。
成员类型 定义
native_handle_type (可选) 实现定义
成员函数
(构造函数) 构造互斥(公开成员函数)
(析构函数) 销毁互斥(公开成员函数)
operator= [被删除] 不可复制赋值(公开成员函数)
锁定
lock 锁定互斥,若互斥不可用则阻塞(公开成员函数)
try_lock 尝试锁定互斥,若互斥不可用则返回(公开成员函数)
try_lock_for 尝试锁定互斥,若互斥在指定的时限时期中不可用则返回(公开成员函数)
try_lock_until 尝试锁定互斥,若直至抵达指定时间点互斥不可用则返回(公开成员函数)
unlock 解锁互斥(公开成员函数)
原生句柄
native_handle 返回底层实现定义的原生句柄(公开成员函数)
上述代码的try_lock例子再次调整一下:
//test11.h
#ifndef _TEST_11_H_
#define _TEST_11_H_
void time_lock_test(void);
#endif //_TEST_11_H_
//test11.cpp
#include "test11.h"
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
std::timed_mutex foo_count_mutex; //变为timed_mutex类型
int i_count = 0;
void foo_minor(int id)
{
int fail_count = 0;
char buf[128] = {0};
std::chrono::milliseconds fps(1);
for (size_t i = 0; i < 10; i++)
{
if(foo_count_mutex.try_lock_for(fps)) //变更为try_lock_for
{
i_count += i;
sprintf(buf,"id %d try_lock index = %d\n",id,i);
std::cout << std::string(buf); //完整信息一次cout,防止打印覆盖
std::this_thread::sleep_for(fps); //等待
foo_count_mutex.unlock();
}else{
sprintf(buf,"id %d try_lock fail_count = %d\n",id,++fail_count);
std::cout << std::string(buf);
}
}
};
void time_lock_test(void)
{
//try_lock
std::thread t5(foo_minor,5);
std::thread t6(foo_minor,6);
t5.join(); t6.join();
std::cout << "i_count = " << i_count << '\n';
};
//main.cpp
#include "test11.h"
int main(int argc, char* argv[])
{
time_lock_test();
return 0;
}
编译运行可能输出如下,两个线程对于计算资源的占用是你来我往的:
3.5 线程辅助-单次调用约束
类 std::once_flag 是 std::call_once函数模板的辅助类。std::call_once定义于头文件 <mutex>,其准确执行一次可调用 (Callable) 对象 f ,即使同时从多个线程调用。此函数类似于 POSIX 的pthread_once 。
/*(C++11 起)
*参数
*flag - 对象,对于它只有一个函数得到执行
*f - 要调用的可调用 (Callable) 对象
*args... - 传递给函数的参数
*/
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
call_once调用逻辑细节为:
- 若在调用 call_once 的时刻, flag 指示已经调用了 f ,则 call_once 立即返回(称这种对 call_once 的调用为消极)。
- 否则, call_once 以参数 std::forward<Args>(args)... 调用 std::forward<Callable>(f) (如同用 std::invoke )。不同于 std::thread 构造函数或 std::async ,不移动或复制参数,因为不需要转移它们到另一执行线程(称这种对 call_once 的调用为积极)。
- 若该调用抛异常,则传播异常给 call_once 的调用方,并且不翻转 flag ,以令其他调用将得到尝试(称这种对 call_once 的调用为异常)。
- 若该调用正常返回(称这种对 call_once 的调用为返回),则翻转 flag ,并保证以同一 flag 对 call_once 的其他调用为消极。
同一 flag 上的所有积极调用组成单独全序,它们由零或多个异常调用后随一个返回调用组成。该顺序中,每个积极调用的结尾同步于下个积极调用。从返回调用的返回同步于同一 flag 上的所有消极调用:这表示保证所有对 call_once 的同时调用都观察到积极调用所做的任何副效应,而无需额外同步。
//test12.h
#ifndef _TEST_12_H_
#define _TEST_12_H_
void callone_test(void);
#endif //_TEST_12_H_
//test12.cpp
#include "test12.h"
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag flag1, flag2;
void simple_do_once(int id)
{
std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
void may_throw_function(bool do_throw)
{
if (do_throw) {
std::cout << "throw: call_once will retry\n"; // 这会出现多于一次
// throw std::exception();
return;
}
std::cout << "Didn't throw, call_once will not attempt again\n"; // 保证一次
}
void do_once(bool do_throw)
{
try {
std::call_once(flag2, may_throw_function, do_throw);
}
catch (...) {
}
}
void callone_test(void)
{
std::thread st1(simple_do_once,1);
std::thread st2(simple_do_once,2);
std::thread st3(simple_do_once,3);
std::thread st4(simple_do_once,4);
st1.join();st2.join();st3.join();st4.join();
std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();t2.join();t3.join();t4.join();
};
//main.cpp
#include "test12.h"
int main(int argc, char* argv[])
{
callone_test();
return 0;
}
利用std::call_once对某函数多次调用时,只执行一次,哪怕是传递实参不一致。
3.5 线程辅助-条件变量
它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥对象上。
/*
*std::condition_variable,定义于头文件 <condition_variable>,(C++11 起)
*condition_variable 类是同步原语,能用于阻塞一个线程,或同时阻塞多个线程,
*直至另一线程修改共享变量(条件)并通知 condition_variable 。
*std::condition_variable 只可与 std::unique_lock<std::mutex> 一同使用.
*/
class condition_variable;
/*
*std::condition_variable_any,定义于头文件 <condition_variable>, (C++11 起)
*condition_variable_any 类是 std::condition_variable 的泛化。
*相对于只在 std::unique_lock<std::mutex> 上工作的 std::condition_variable ,
*condition_variable_any 能在任何满足基本可锁定 (BasicLockable) 要求的锁上工作。
*/
class condition_variable_any;
/*
*std::notify_all_at_thread_exit,定义于头文件 <condition_variable>,(C++11 起)
*notify_all_at_thread_exit 提供机制,通知其他线程给定的线程已完全完成,
*包括销毁所有 thread_local 对象。若当前线程未锁定 lock.mutex() ,则调用此函数是未定义行为.
*若lock.mutex()的互斥对象不相同,调用此函数是未定义行为。
*/
void notify_all_at_thread_exit( std::condition_variable& cond,
std::unique_lock<std::mutex> lk );
/*
*std::cv_status,定义于头文件 <condition_variable>,(C++11 起)
*带作用域枚举 std::cv_status 描述定时等待是否因时限返回。
*/
enum class cv_status;
no_timeout //条件变量因 notify_all 、 notify_one 或虚假地被唤醒
timeout //条件变量因时限耗尽被唤醒
condition_variable 容许 wait 、 wait_for 、 wait_until 、 notify_one 及 notify_all 成员函数的同时调用。类 std::condition_variable 是标准布局类型 (StandardLayoutType) 。它非可复制构造 (CopyConstructible) 、可移动构造 (MoveConstructible) 、可复制赋值 (CopyAssignable) 或可移动赋值 (MoveAssignable) 。
//test13.h
#ifndef _TEST_13_H_
#define _TEST_13_H_
void variable_test(void);
#endif //_TEST_13_H_
//test13.cpp
#include "test13.h"
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// 等待直至 main() 发送数据
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});
// 等待后,我们占有锁。
std::cout << "Worker thread is processing data\n";
data += " after processing";
// 发送数据回 main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one )
lk.unlock();
cv.notify_one();
};
void notify_thread()
{
data = "Example data";
// 发送数据到 worker 线程
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "notify_thread signals data ready for processing\n";
}
cv.notify_one();
// 等候 worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in notify_thread, data = " << data << '\n';
}
void variable_test(void)
{
std::thread worker(worker_thread);
std::thread notifyer(notify_thread);
worker.join();
notifyer.join();
};
//main.cpp
#include "test13.h"
int main(int argc, char* argv[])
{
variable_test();
return 0;
}
上述代码,通过condition_variable ,以促进线程notify_thread和worker_thread交流,worker_thread在获得notify_thread的“通知”后,执行自己线程内的作业,完后后,又“通知”notify_thread。
3.6 线程辅助-Future
标准库提供了std::future 来获取异步任务(即在单独的线程中启动的函数的函数返回值)的返回值,并捕捉其所抛出的异常。这些值在共享状态中传递,其中异步任务可以写入其返回值或存储异常,而且可以由持有该引用该共享态的 std::future 或 std::shared_future 实例的线程检验、等待或是操作这个状态。
/*
*std::future,定义于头文件 <future>,(C++11 起)
*访问异步操作结果的机制,通过 std::async 、 std::packaged_task 或 std::promise 创建的异步操作
*能用各种方法查询、等待或从 std::future 提取值
*/
template< class T > class future;
template< class T > class future<T&>;
template<> class future<void>;
/*
*std::shared_future,定义于头文件 <future>,(C++11 起)
*类模板 std::shared_future 提供访问异步操作结果的机制,类似 std::future ,
*除了允许多个线程等候同一共享状态。不同于仅可移动的 std::future
*故只有一个实例能指代任何特定的异步结果),std::shared_future 可复制
*(而且多个 shared_future 对象能指代同一共享状态。
*/
template< class T > class shared_future;
template< class T > class shared_future<T&>;
template<> class shared_future<void>;
/*
*std::future_status,定义于头文件 <future>,(C++11 起)
*指定 std::future 和 std::shared_future 的 wait_for 和 wait_until 函数所返回的 future 状态。
*/
enum class future_status { ready, timeout, deferred };
/*
*函数模板 async 异步地运行函数 f (潜在地在可能是线程池一部分的分离线程中),
*并返回最终将保有该函数调用结果的 std::future 。
*/
template< class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)> >
async( Function&& f, Args&&... args ); //(C++11 起)(C++17 前)
template< class Function, class... Args> std::future<std::invoke_result_t<std::decay_t<Function>,std::decay_t<Args>...> >
async( Function&& f, Args&&... args ); //(C++17 起)(C++20 前)
template< class Function, class... Args>
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...> >
async( Function&& f, Args&&... args ); //(C++20 起)
template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)> >
async( std::launch policy, Function&& f, Args&&... args ); //(C++11 起)(C++17 前)
template< class Function, class... Args >
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...> >
async( std::launch policy, Function&& f, Args&&... args ); //(C++17 起)(C++20 前)
template< class Function, class... Args >
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...> >
async( std::launch policy, Function&& f, Args&&... args );//(C++20 起)
/*
*std::packaged_task,定义于头文件 <future>,(C++11 起)
*类模板 std::packaged_task 包装任何可调用 (Callable) 目标
*(函数、 lambda 表达式、 bind 表达式或其他函数对象),
*使得能异步调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。
*/
template< class > class packaged_task;
template< class R, class ...Args > class packaged_task<R(Args...)>;
/*
*std::promise,定义于头文件 <future>,(C++11 起)
*类模板 std::promise 提供存储值或异常的设施,
*之后通过 std::promise对象所创建的 std::future 对象异步获得结果。
*注意 std::promise 只应当使用一次。
*/
template< class R > class promise;
template< class R > class promise<R&>;
template<> class promise<void>;
/*
*std::launch,定义于头文件 <future>,(C++11 起)
*枚举值,指定 std::async 所指定的任务的的运行策略。
*/
enum class launch : /* unspecified */ {
async = /* unspecified */, //运行新线程,以异步执行任务
deferred = /* unspecified */, //调用方线程上首次请求其结果时执行任务(惰性求值)
/* implementation-defined */
};
/*
*Future 错误
*处理异步执行和共享状态( std::future 、 std::promise 等)的线程库中的函数在失败时抛出。
*/
future_error (C++11) 报告与 future 或 promise 有关的错误(类)
future_category (C++11) 鉴别 future 错误类别(函数)
future_errc (C++11) 鉴别 future 错误码(枚举)
std::future和std::shared_future功能实现:
成员函数
(构造函数) 构造 future 对象(公开成员函数)
(析构函数) 析构 future 对象(公开成员函数)
operator= 移动future对象(公开成员函数)
share 从*this转移共享状态给shared_future并返回它(公开成员函数),std::future专属成员函数
get 返回结果(公开成员函数)
valid 检查 future 是否拥有共享状态(公开成员函数)
wait 等待结果变得可用(公开成员函数)
wait_for 等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回。(公开成员函数)
wait_until 等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回。(公开成员函数)
通过std::future与std::async 、 std::packaged_task 或 std::promise 创建的操作绑定,获得其任务(函数)返回值。不同于仅可移动的 std::future (故只有一个实例能指代任何特定的异步结果),std::shared_future 可复制而且多个 shared_future 对象能指代同一共享状态。
//test14.h
#ifndef _TEST_14_H_
#define _TEST_14_H_
void future_test(void);
void shared_future_test(void);
#endif //_TEST_14_H_
//test14.cpp
#include "test14.h"
#include <iostream>
#include <future>
#include <thread>
void future_test(void)
{
// 来自 packaged_task 的 future
std::packaged_task<int()> task([](){ return 7; }); // 包装函数,返回int类型
std::future<int> f1 = task.get_future(); // 获取 future,返回与承诺的结果关联的 std::future
std::thread(std::move(task)).detach(); // 在线程上运行
// 来自 async() 的 future
std::future<int> f2 = std::async(std::launch::async, [](){ return 8; });
// 来自 promise 的 future
std::promise<int> p;
std::future<int> f3 = p.get_future();
std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach();
std::cout << "Waiting..." << std::flush;
f1.wait();// 等待结果变得可用
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: "
<< f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
};
//shared_future 可用于同时向多个线程发信
void shared_future_test(void)
{
std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
std::shared_future<void> ready_future(ready_promise.get_future());
std::chrono::time_point<std::chrono::high_resolution_clock> start;
auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t1_ready_promise.set_value();
ready_future.wait(); // 等待来自 main() 的信号
return std::chrono::high_resolution_clock::now() - start;
};
auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t2_ready_promise.set_value();
ready_future.wait(); // 等待来自 main() 的信号
return std::chrono::high_resolution_clock::now() - start;
};
auto result1 = std::async(std::launch::async, fun1);
auto result2 = std::async(std::launch::async, fun2);
// 等待线程变为就绪
t1_ready_promise.get_future().wait();
t2_ready_promise.get_future().wait();
// 线程已就绪,开始时钟
start = std::chrono::high_resolution_clock::now();
// 向线程发信使之运行
ready_promise.set_value();
std::cout << "Thread 1 received the signal "
<< result1.get().count() << " ms after start\n"
<< "Thread 2 received the signal "
<< result2.get().count() << " ms after start\n";
}
//main.cpp
#include "test14.h"
int main(int argc, char* argv[])
{
future_test();
shared_future_test();
return 0;
}
3.7 线程存储期-thread_local
存储期,每个对象都有称为存储期的属性,它限制对象的生存期。
C/C++ 中有四种存储期:
- 自动存储期。进入声明对象于其中的块时分配其存储,而在以任何方式( goto 、 return 、抵达结尾)退出该块时解分配存储。一个例外是 VLA ;在执行声明时而非块入口分配其存储,并在声明离开作用域而非退出块时解分配存储。 (C99 起)若递归地进入块,则对每个递归层进行新的分配。所有函数参数和非 static 块作用域对象,还有用于块作用域的复合字面量拥有此存储期。
- 静态存储期。存储期是整个程序的执行过程,只在 main 函数之前初始化一次存储于对象的值。所有声明为 static 对象和所有带内部或外部链接且不声明为 _Thread_local (C11 起)的对象都拥有此存储期。
- 线程存储期。存储期是创建对象的线程的整个执行过程,在启动线程时初始化存储于对象的值。每个线程拥有其自身的相异对象。若执行访问此对象的表达式的线程,不是执行其初始化的线程,则行为是实现定义的。所有声明为 _Thread_local 的对象拥有此存储期。 (C11 起)
- 分配存储期。按照请求,用动态内存分配函数分配和解分配存储。
c/c++的线程存储期一般通过定义于头文件 threads.h 的便利宏 thread_local 使用关键词 _Thread_local实现。它不能用于函数声明。若将它用在对象声明上,则它必须在同一对象的每次声明上都存在。若将它用在块作用域声明上,则必须与 static 或 extern 之一组合以决定链接。
//定义于头文件 <threads.h>,c++11起
#define thread_local _Thread_local
//定义线程局部存储期变量
int thread_local thval;
一旦声明一个变量为thread local,其值将在线程开始时被初始化,而在线程结束时,该值也将不再有效。对于thread local变量地址取值(&),也只可以获得当前线程中的线程局部存储变量(TLS)的地址值。
//test15.h
#ifndef _TEST_15_H_
#define _TEST_15_H_
void thread_local_test(void);
#endif //_TEST_15_H_
//test15.cpp
#include "test15.h"
#include <pthread.h>
#include <thread>
#include <iostream>
using namespace std;
int thread_local errorCode = 0;
// int errorCode = 0; //注释前一句,采用本语句测试有何不同
void* thread_local_func(void * input)
{
if (*(int*)input == 1) errorCode = 1;
else if (*(int*)input == 2) errorCode = -1;
else errorCode = 0;
return nullptr;
};
void thread_local_test(void)
{
int input_a = 1;int input_b = 2;
std::thread th1(thread_local_func,&input_a);
std::thread th2(thread_local_func,&input_b);
th1.join();th2.join();
cout << "errorCode ="<< errorCode << "\n";//int thread_local声明,输出0;int声明,输出-1
};
//main.cpp"
#include "test15.h"
int main(int argc, char* argv[])
{
thread_local_test();
return 0;
}
//out log
errorCode = 0
虽然TLS变量的声明很简单,使用也很直观,不过实际上TLS的实现需要涉及编译器、链接器、加载器甚至是操作系统的相互配合。在TLS中一个常被讨论的问题就是TLS变量的静态/动态分配的问题,即TLS变量的内存究竟是在程序一开始就被分配还是在线程开始运行时被分配。C++11标准允许平台/编译器自行选择采用静态分配或动态分配,或者两者都支持。还有一点值得注意的是,C++11对TLS只是做了语法上的统一,而对其实现并没有做任何性能上的规定。这可能导致thread_local声明的变量在不同平台或者不同的TLS实现上出现不同的性能(通常TLS变量的读写性能不会高于普通的全局/静态变量)。如果项目中想得到最佳的平台上的TLS变量的运行性能的话,最好还是阅读代码运行平台的相关文档并进行原型测试验证。
四、综述
为了适应并行编程,C++11开始,完善了原子类型及原子操作,提供一些系列原子类型模板和原子操作函数模板,相比先前偏于底层的pthread库,现在可以通过定义原子类型的方式,轻松地化解了互斥访问同步变量的难题。虽然原子类型使用很简单,但其成员变量(原子操作)却可以有各种不同的内存顺序。C++11从各种不同的平台上抽象出了一个软件的内存模型,并以内存顺序进行描述,要想进一步挖掘并行系统性能的,开发者深刻理解原子操作库,平台特性及编译器设计才能设计出高性能并行计算的程序代码。此外,C++11还为线程类thread匹配了辅助类模板和函数模板,包含线程、互斥、条件变量和future的内建支持,支持thread在各种场景中自如运用,并将广泛存在的线程局部存储进行了语法上的统一。
另外随着c++标准版本的不断迭代更新,不少线程关联的类模板及函数模板都在增减、调整,如果实际项目中需要采用一些新元素,最好查看它们版本支持,防止项目找跨不同标准库支持时无法匹配。另外新语言元素引入也需谨慎对待,最好自行做大量的验证测试才可应用在实际项目中。