本栏主要介绍《C++并发实战编程》这本书,链接。
将按照书目录分为 九章节介绍C++并发编程,尽可能简化本书内晦涩难懂知识点。
本章节主要讲解线程使用基础。详细介绍线程使用基础(线程发起、等待、参数、异常处理)、线程管控(归属权、并发数量等)。
一、线程基础
1.线程启动
C++11标准统一了线程操作,可以在定义线程变量后,直接启动线程执行回调逻辑。
void thead_run(string str)
{
cout << "str:" << str << std::endl;
}
int main()
{
string str = "thread is running";
thread t1(thead_run, str);
}
2.线程等待
2.1 通过join等待子线程运行结束
看上面示例,在主线程调用t1创建线程,但是有可能主线程运行很快,t1线程来不及运行进程已经运行结束,为了防止主线程退出或者局部作用域结束导致子线程被析构的情况,我们可以通过join,让主线程等待子线程启动运行,子线程运行结束后主线程再运行。
thread t1(thead_run, str);
t1.join();
2.2 通过detch分离子线程
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
using namespace std;
void background_task(int *num) {
for (;;)
{
cout << "str: " << *num << endl;
this_thread::sleep_for(chrono::seconds(1));
}
}
void test()
{
int *p = new int(10);
thread t1(background_task, p);
t1.detach();
delete p;
}
int main() {
test();
this_thread::sleep_for(chrono::seconds(5));
return 0;
}
通过detch分离的线程,子线程采用分离的方式在后台独自运行,这样的线程叫做守护线程.
从2.2示例看出,t1线程在test函数内部创建。此时堆区分配的指针p在detach后会被释放,t1作为守护线程单独运行,由于线程t1传入的参数是指针,指针已经被释放,会导致bg函数内部未定义状态。
思考,像这样因为变量被提前释放而导致线程未定义行为应该如何避免?
- 使用智能指针,通过其引用计数避免资源释放
- 尽可能使得局部变量具有拷贝复制的能力,但是拷贝增加额外系统开销。
- 使用join 避免变量提前释放,场景不灵活。
3.线程传参
一般情况下线程默认传参是拷贝的,当然也有例外比如在2.2章节,演示了线程传参指针导致子线程未定义的错误示例。
以下是列举线程传参的各种方式:
3.1 传入字符串
警惕隐式转换的后果:
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
using namespace std;
void background_task(string const& str) {
for (;;)
{
cout << "str: " << str << endl;
this_thread::sleep_for(chrono::seconds(1));
}
}
void test()
{
char str[] = "hello";
thread t1(background_task, str);
t1.detach();
}
int main() {
test();
this_thread::sleep_for(chrono::seconds(5));
return 0;
}
输出结果如下:
最终输出其实期望str:hello,程序却并没有进行输出。
问题早晨的原因与2.2类似,这里我想借助隐式构造,将const char *str转换为string供给t1线程使用,但是由于detach过早的发生,造成无效的隐式转换(当然还有可能字符串len超过string最大长度截断等问题)。
解决办法是将str转换成一个右值来避免悬垂指针。
thread t1(background_task, str);
thread t1(background_task, string(str));
3.2 传入引用
传入引用,对于创建线程传参来讲,线程的构造函数会进行对参数的拷贝,即使你传入的是引用,也不会改变数值(拷贝了一份引用) 当线程结束时,内部拷贝数据将会在数据更新阶段被销毁,且传入参数的data是不会被更改的。
如果期望线程内部影响传输参数,则输入ref.表示将参数转换成引用的形式 。 如果不期望的话,则将函数参数类型改为普通类型。
当线程创建时,传递引用(必须x_ref)、传递指针、移动语义(参数类型必须为&&)都不会涉及值拷贝。
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
void test(int &num)
{
cout<<"num:"<<num<<endl;
num = 1;
cout<<"modify num:"<<num<<endl;
}
int main()
{
int dig = 2;
thread t1(test, ref(dig));
this_thread::sleep_for(chrono::seconds(1)); //保证子线程先运行完毕
cout<<"dig:"<<dig<<endl;
t1.join();
}
3.3 传入匿名对象
由于C++编译器在某些情况下会将表达式解释为函数声明而不是类型对象的定义,所以线程传参时通过加{}等操作告诉编译器传入为匿名对象。
#include <iostream>
#include <thread>
using namespace std;
class background_task
{
public:
background_task()
{
cout<<"background_task"<<endl;
}
void operator()() const
{
cout<<"hello"<<endl;
}
};
int main()
{
//t1-t5属于匿名对象
thread t1{background_task()}; //加{}
thread t2((background_task()));//使用多括号
thread t3 = thread(background_task()); // 使用赋值语法
// thread t4(background_task());//err C++编译器在某些情况下会将表达式解释为函数声明而不是类型对象的定义。
thread t5([](){background_task();}); //使用lambda,因为lambda表达式允许使用捕获一个局部变量的局部函数
t1.join();
t2.join();
t3.join();
// t4.join();//err
t5.join();
}
void operator()() const 是一个仿函数,线程将会执行这个类中的 operator() 函数,operator() 表示可以将类定义对象直接当参数执行 第二() 表示是否传递参数。const告诉编译器在调用 operator() 时不会改变类的内部状态,这可以帮助确保线程的安全性。
3.4 传入类对象和成员函数
传入类对象和成员函数需要加&。
#include <iostream>
#include <thread>
using namespace std;
class basic2_3
{
private:
int num_;
public:
basic2_3() : num_(0) {}
void function(int _num)
{
cout<<"num_:"<<num_<<endl;
cout<<"_num:"<<_num<<endl;
}
};
int main()
{
basic2_3 basic;
int num = 1;
thread t1(&basic2_3::function, &basic, num); //第三个参数表示函数参数参数
t1.join();
}
3.5 传入move对象
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
using namespace std;
void test(vector<int> arg_data)
{
for(auto& temp: arg_data)
{
cout<<temp<<endl;
}
}
int main()
{
vector<int> data = {1,2,3};
thread t(test, move(data));
t.join();
}
3.6 传入lambda表达式
#include <iostream>
#include <thread>
using namespace std;
int main()
{
auto Circle = [](){
for(int i = 0; i < 10; i++)
{
cout<<"i:"<<i<<endl;
}
};
thread t(Circle);
t.join();
}
3.7 传入函数
#include <iostream>
#include <thread>
using namespace std;
void MyThreadFunction()
{
cout<<"MyThreadFunction"<<endl;
}
int main()
{
std::thread myThread(MyThreadFunction);
myThread.join();
}
3.8 传入仿函数
参考示例3.3 operator()()
4.异常处理
启动一个线程后,如果主线程产生崩溃,会导致子线程也会异常退出,就是调用terminate,如果子线程在进行一些重要的操作比如将充值信息入库等,丢失这些信息是很危险的。所以常用的做法是捕获异常,并且在异常情况下保证子线程稳定运行结束后,主线程抛出异常结束运行。
4.1资源获取即初始化RAII
当线程运行出现异常退出的时候,则会导致无法被join。因此,当倾向于无异常情况下join时候,需要在异常处理过程中调用join,一种方式是使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization) 即提供一个类,在析构函数中使用join()。
像是我们最常见的智能指针,就是RAII的一种体现。
#include <iostream>
#include <thread>
using namespace std;
class thread_guard
{
public:
explicit thread_guard(thread &t_):t(t_){}
~thread_guard()
{
cout<<"~thread_guard()"<<endl;
if(t.joinable())
{
cout<<"in ~thread_guard(), this thread will to be join"<<endl;
t.join();
}
}
thread_guard(const thread_guard&) = delete;
thread_guard& operator&=(const thread_guard&) = delete;
private:
thread &t;
};
void funciton(int num){ cout<<"num:"<<num<<endl;}
void doSomething(){throw runtime_error("test throw error");}
void test()
{
int num = 1;
thread t1(funciton, num);
thread_guard tg1(t1);
try{
doSomething();
}
catch(const exception& e)
{
cerr << "Caught exception in test: "<<e.what() << '\n';
throw;
}
}
int main()
{
try {
test(); //如果在主函数内不写捕获,则进程直接退出,无法调用到join。
} catch (const std::exception& e) {
std::cerr << "Caught exception in main: " << e.what() << std::endl;
}
}
1. test函数抛出异常后并不会立马退出,函数运行完毕后tg1对象会被析构,析构的时候在类thread_guard析构函数中,进行join回收线程。
2. 为何thread_guard对象禁止拷贝和构造
C++不允许std::thread
执行拷贝构造和拷贝赋值,是因为线程对象在内部持有一个线程句柄,该句柄指向线程的执行状态和资源。如果允许拷贝构造和拷贝赋值,那么就会导致多个线程对象持有同一个线程句柄,这样就无法保证线程的执行状态和资源的正确性。
3. 关于拷贝构造和移动构造优先调用问题
当使用一个右值(临时对象或表达式)来初始化对象时,系统会优先调用移动构造函数,这是因为右值是临时对象,其生命周期很短,可以被移动到目标对象中,避免了不必要的拷贝操作,提高了效率。而当返回局部变量时,如果该局部变量被拷贝到函数外部,那么系统会寻找该类的拷贝构造函数来完成拷贝操作,如果没有拷贝构造函数,则会使用移动构造函数。这是因为返回局部变量时,该局部变量的生命周期并不短暂,需要在函数外部继续存在,因此需要进行拷贝或移动操作。
4.曾经有一份C++17标准的备选提案,可惜C++17标准没有引入这个类,后来它改名为std::jthread,依然进入了C++20标准的议程(现已被正式纳入C++20标准)。
二、线程管理
1.线程所有权
就是不要将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate函数引发崩溃。
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
void test()
{
thread::id threadId = this_thread::get_id();
cout<<"my thread id:"<<threadId<<endl;
}
void test1()
{
cout<<"other test1"<<endl;
}
int main()
{
//t1 绑定test
thread t1(test);
cout<<"1"<<endl;
//2 转移t1管理的线程给t2,转移后t1无效
thread t2 = std::move(t1);
cout<<"2"<<endl;
//3 t1 可继续绑定其他线程,执行some_other_function
t1 = std::thread(test1);
cout<<"3"<<endl;
//4 创建一个线程变量t3
thread t3;
//5 转移t2管理的线程给t3
t3 = std::move(t2);
cout<<"4"<<endl;
//6 转移t3管理的线程给t1
t1 = std::move(t3);
cout<<"5"<<endl;
this_thread::sleep_for(std::chrono::seconds(2000));
}
2.使用容器存储线程注意
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i)
{
threads.emplace_back(test, i);
}
for (auto& testFun: threads)
{
testFun.join();
}
容器存储线程时,比如vector
,如果用push_back
操作势必会调用std::thread
,这样会引发编译错误,因为在其内部thread(thread &x)=delete, 采用的时emplace方式,可以直接根据线程构造函数需要的参数构造.
push_back
适用于已经构造的对象或需要从一个地方复制(或移动)到容器中的对象
emplace_back
适用于直接在容器内部构造新对象,并避免额外的拷贝(或移动)操作。
3.线程返回局部变量
在第一章节第四小节第3小标题中曾提到过拷贝构造和移动构造优先调用问题,我们可以在函数内部返回一个局部的std::thread
变量,利用RVO机制(在C++中,当函数返回一个对象时,通常会创建一个临时对象,然后将其拷贝到函数调用的位置。但是通过RVO,编译器可以直接在函数内部构造返回值对象,避免了额外的拷贝操作,提高了性能。注意RVO是一种编译器优化技术,不是C++语言的规范要求,但目前大多数C++编译器都可满足)
thread createThread()
{
thread t1(function);
return t1;
}
int main()
{
thread t = createThread();
processThread(t);
}
4.并行运算
直接看注释
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <mutex>
#include <numeric>
#include <execution>
#include <chrono>
#include <functional>
using namespace std;
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
/*基于result结果开始 从first开始累加到last*/
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 1
return init;
/*一个线程计算25个元素*/
unsigned long const min_per_thread=25;
unsigned long const max_threads = (length+min_per_thread-1)/min_per_thread; // 2
/*支持最大线程并行数量*/
unsigned long const hardware_threads=thread::hardware_concurrency();
/*计算量的最大值和硬件支持并行数量中的最小值*/
unsigned long const num_threads= // 3
min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
/*每个线程处理元素数量*/
unsigned long const block_size=length/num_threads; // 4
/*存放中间选择线程数量结果*/
std::vector<T> results(num_threads);
/*线程容器,必须-1(包括主线程)*/
std::vector<std::thread> threads(num_threads-1); // 5
cout<<"num_threads:"<<num_threads<<endl;
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
/*在不知道容器的类型请情况下,以blocK_size大小作为移动的步数,从block_end开始移动*/
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i]));
block_start=block_end; // 每创建一个线程将block_start置位
}
/*处理最终块*/
accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]); // 9
for_each(threads.begin(),threads.end(),mem_fn(&std::thread::join)); // 10
/*将所有结果累加*/
return std::accumulate(results.begin(),results.end(),init); // 11
}
int main()
{
std::vector<int> data(1000000);
// 使用随机数种子初始化随机数生成器
std::srand(static_cast<unsigned int>(std::time(nullptr)));
for (int i = 0; i < 1000000; ++i) {
data[i] = std::rand() % 100; // 生成0到99之间的随机整数
}
int init = 0; // 初始值
//多线程
auto start_time = std::chrono::high_resolution_clock::now();
int result = parallel_accumulate(data.begin(), data.end(), init);
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
cout<<"result:"<<result<<endl;
cout<<"mult threads to run times:"<<duration.count()<<endl;
//单线程
auto start_time1 = std::chrono::high_resolution_clock::now();
int result1 = accumulate(data.begin(), data.end(), init);
auto end_time1 = std::chrono::high_resolution_clock::now();
auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>(end_time1 - start_time1);
cout<<"result1:"<<result1<<endl;
cout<<"one thread to run times:"<<duration1.count()<<endl;
}
5.识别线程
比如我们启动了一个线程,我们可以通过线程变量的get_id()
获取线程id
std::thread t([]()
{
std::cout << "in thread id " << std::this_thread::get_id() << std::endl;
});