目录
8.1 在线程间切分任务
8.1.1 先在线程间切分数据,再开始处理
8.1.2 以递归方式划分数据
8.1.3 依据工作类别划分任务
借多线程分离关注点需防范两大风险
在线程间按流程划分任务
8.2 影响并发性能的因素
8.2.1 处理器的数量
8.2.2 数据竞争和缓存兵乓
8.2.3 不经意共享
8.2.4 数据的紧凑程度
8.2.5 过度任务切换与线程过饱和
参考:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/chapter8/8.5-chinese.md
8.1 在线程间切分任务
需要决定使用多少个线程,并且这些线程应该去做什么。需要决定是使用“全能”线程去完成所有的任务,还是使用“专业”线程去完成一件事情,或将两种方法混合。使用并发时,需要作出诸多选择来驱动并发,选择会决定代码的性能和可读性。因此选择至关重要,所以设计应用结构时,需要作出适当的决定。本节中,将看到很多划分任务的技术。
8.1.1 先在线程间切分数据,再开始处理
最简单的并行算法就是并行化的std::for_each
,会对数据集中每个元素执行同一个操作。
最简单的分配方式:第一组N个元素分配一个线程,下一组N个元素再分配一个线程,以此类推,如图8.1所示。不管数据怎么分,每个线程都会对分配给它的元素进行操作,但不会和其他线程进行沟通,直到处理完成。
虽然这个技术十分强大,但是并不是哪里都适用。有时不能像之前那样,对任务进行整齐的划分,因为只有对数据进行处理后,才能进行明确的划分。这里的方式特别适用了递归算法,下面就来看看这种特别的方式。
8.1.2 以递归方式划分数据
代码8.1 使用栈的并行快速排序算法——等待数据块排序
template<typename T>
struct sorter // 1
{
struct chunk_to_sort
{
std::list<T> data;
std::promise<std::list<T> > promise;
};
thread_safe_stack<chunk_to_sort> chunks; // 2
std::vector<std::thread> threads; // 3
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
sorter():
max_thread_count(std::thread::hardware_concurrency()-1),
end_of_data(false)
{}
~sorter() // 4
{
end_of_data=true; // 5
for(unsigned i=0;i<threads.size();++i)
{
threads[i].join(); // 6
}
}
void try_sort_chunk()
{
boost::shared_ptr<chunk_to_sort > chunk=chunks.pop(); // 7
if(chunk)
{
sort_chunk(chunk); // 8
}
}
std::list<T> do_sort(std::list<T>& chunk_data) // 9
{
if(chunk_data.empty())
{
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(),chunk_data,chunk_data.begin());
T const& partition_val=*result.begin();
typename std::list<T>::iterator divide_point= // 10
std::partition(chunk_data.begin(),chunk_data.end(),
[&](T const& val){return val<partition_val;});
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data,chunk_data.begin(),
divide_point);
std::future<std::list<T> > new_lower=
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk)); // 11
if(threads.size()<max_thread_count) // 12
{
threads.push_back(std::thread(&sorter<T>::sort_thread,this));
}
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(),new_higher);
while(new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) // 13
{
try_sort_chunk(); // 14
}
result.splice(result.begin(),new_lower.get());
return result;
}
void sort_chunk(boost::shared_ptr<chunk_to_sort> const& chunk)
{
chunk->promise.set_value(do_sort(chunk->data)); // 15
}
void sort_thread()
{
while(!end_of_data) // 16
{
try_sort_chunk(); // 17
std::this_thread::yield(); // 18
}
}
};
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input) // 19
{
if(input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input); // 20
}
这个方案使用到了特殊的线程池——所有线程任务都来源于一个等待链表,然后线程会去完成任务,任务完成后会再来链表提取任务。这个线程池会出问题(包括对工作链表的竞争),问题的解决方案将在第9章提到。关于多处理器的问题,将会在本章后面的章节中做出更为详细的介绍(详见8.2.1)。
任务几种划分方法:处理前划分和递归划分(都需要事先知道数据的长度固定),还有上面的划分方式。事情并非总是这样好解决,当数据是动态生成或是通过外部输入,那这里的办法就不适用了。这种情况下,基于任务类型的划分方式,就要好于基于数据的划分方式。
8.1.3 依据工作类别划分任务
虽然会为每个线程分配不同的数据块,因为这里每个线程对每个数据块的操作是相同的,所以工作的划分(无论是之前就划分好,还是使用递归的方式划分)仍停留在理论阶段。另一种选择是让线程做专门的工作,就是每个线程做不同的工作,就像水管工和电工在建造一所屋子的时候,所做的不同工作那样。线程可能会对同一段数据进行操作,但对数据进行不同的操作。
对分工的排序,也就是分离关注点。每个线程都有不同的任务,这意味着真正意义上的线程独立。其他线程偶尔会向特定线程交付数据,或是通过触发事件的方式来进行处理。不过总体而言,每个线程只需要关注自己所要做的事情即可。其本身就是良好的设计,每一段代码只对自己的部分负责。
借多线程分离关注点需防范两大风险
多线程下有两个危险需要分离。第一个是对错误的担忧(主要表现为线程间共享着很多的数据),第二是不同的线程要相互等待,这两种情况都是因为线程间很密切的交互。这种情况发生时,就需要看一下为什么需要这么多交互。当所有交互都有同样的问题,就应该使用单线程来解决,并将引用同一源的线程提取出来。或者当有两个线程需要频繁的交流,在没有其他线程时,就可以将这两个线程合为一个线程。
当通过任务类型对线程间的任务进行划分时,不应该让线程处于隔离态。当多个输入数据集需要使用同样的操作序列,可以将序列中的操作分成多个阶段让线程执行。
在线程间按流程划分任务
当任务会应用到相同操作序列,去处理独立的数据项时,就可以使用流水线(pipeline)系统进行并发。这好比一个物理管道:数据流从管道一端进入,进行一系列操作后,从管道另一端出去。
使用这种方式划分工作,可以为流水线中的每一阶段操作创建一个独立线程。当一个操作完成,数据元素会放在队列中,供下一阶段的线程使用。这就允许第一个线程在完成对于第一个数据块的操作时,第二个线程可以对第一个数据块执行管线中的第二个操作。
这就是线程间划分数据的一种替代方案(如8.1.1描述),这种方式适合于操作开始前,且对输入数据处长度不清楚的情况。例如:数据来源可能是从网络,或者可能是通过扫描文件系统来确定要处理的文件。
流水线对于队列中的耗时操作处理也很合理,通过对线程间任务的划分,就能对应用的性能有所改善。假设有20个数据项,需要在四核的机器上处理,并且每一个数据项需要四个步骤来完成操作,每一步都需要3秒来完成。如果将数据分给了四个线程,每个线程上就有5个数据项要处理。假设在处理时,没有其他线程对处理过程进行影响,在12秒后4个数据项处理完成,24秒后8个数据项处理完成,以此类推。当20个数据项都完成操作,就需要1分钟的时间。管线中就会完全不同,四步可以交给四个内核,第一个数据项可以被每一个核进行处理,所以其还是会消耗12秒。在12秒后你就能得到一个处理过的数据项,这相较于数据划分并没有好多少。不过,当流水线动起来,事情就会不一样了。第一个核处理第一个数据项后,数据项就会交给下一个内核,所以第一个核在处理完第一个数据项后,其还可以对第二个数据项进行处理。在12秒后,每3秒将会得到一个已处理的数据项,这就要好于每隔12秒完成4个数据项。
8.2 影响并发性能的因素
多处理系统中使用并发来提高代码的效率时,需要了解影响并发的效率的因素。即使使用多线程对关注点进行分离,还需要确定是否会对性能造成负面影响
8.2.1 处理器的数量
处理器数量是影响多线程应用的首要因素。当对目标硬件很熟悉,并且能针对硬件进行软件设计,并在目标系统或副本上进行性能测试。那很幸运,可以在类似的平台上进行开发。不过,当所使用的平台与目标平台的差异很大,比如:可能会在一个双芯或四芯的系统上做开发,而用户系统可能就只有单个处理器(可能有很多芯),或多个单芯处理器,亦或是多核多芯的处理器。并发程序在不同平台上的行为和性能特点可能完全不同,需要在不同平台上进行测试。
为了扩展线程的数量,且与硬件所支持的并发线程数一致,C++标准线程库提供std::thread::hardware_concurrency()
,使用这个函数就能知道在给定硬件上可以扩展的线程数量了。
使用std::thread::hardware_concurrency()
需要谨慎,因为不会考虑其他应用已使用的线程数量(除非已经将系统信息进行共享)。std::async()
可以避免这个问题,标准库会对所有调用进行安排。同样,谨慎的使用线程池也可以避免这个问题。
随着处理器数量的增加,另一个问题就会来影响性能:多个处理器尝试访问同一个数据。
8.2.2 数据竞争和缓存兵乓
当两个线程在不同处理器上时,对同一数据进行读取,通常不会出现问题。因为数据会拷贝到每个线程的缓存中,并让两个处理器同时进行处理。当有线程对数据进行修改,并且需要更新到其他核芯的缓存中去,就要耗费一定的时间。这样的修改可能会让第二个处理器停下来,等待硬件内存更新缓存中的数据。根据CPU指令,这是一个特别特别慢的操作。
思考下面的代码段:
std::atomic<unsigned long> counter(0);
void processing_loop()
{
while(counter.fetch_add(1,std::memory_order_relaxed)<100000000)
{
do_something();
}
}
counter变量是全局的,任何线程都能调用processing_loop()。因此,每次对counter进行增量操作时,处理器必须确保缓存中的counter是最新值,然后进行修改,再告知其他处理器。编译器不会为任何数据做同步操作,fetch_add是一个“读-改-写”操作,因此要对最新的值进行检索。如果另一个线程在另一个处理器上执行同样的代码,counter的数据需要在两个处理器之间进行传递,这两个处理器的缓存中间就存有counter的最新值(当counter的值增加时)。如果do_something()足够短,或有很多处理器来对这段代码进行处理时,处理器会互相等待。一个处理器准备更新这个值,另一个处理器在修改这个值,所以该处理器就需要等待第二个处理器更新完成,并且完成更新传递时才能执行更新,这种情况被称为高竞争(high contention)。如果处理器很少需要互相等待就是低竞争(low contention)。
循环中counter的数据将在每个缓存中传递若干次,这就是乒乓缓存(cache ping-pong),这会对应用的性能有着重大的影响。当处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情,所以对于整个应用来说这是一个坏消息。
如何避免乒乓缓存呢?答案就是:减少两个线程对同一个内存位置的竞争。
虽然,要实现起来并不简单。即使给定内存位置,因为伪共享(false sharing)可能还是会有乒乓缓存。
8.2.3 不经意共享
处理器缓存通常不会用来处理单个存储点,而是会用来处理称为缓存行(cache lines)的内存块。内存块通常大小为32或64字节,实际大小需要由处理器来决定。因为硬件缓存可确定处理缓存行的大小,较小的数据项就在同一内存行的相邻位置上。有时这样的设定还不错:当线程访问的一组数据是在同一数据行中,对于应用的性能来说就要好于对多个缓存行进行传播。不过,同一缓存行存储的是无关数据时,且需要被不同线程访问,这就会造成性能问题。
假设一个int类型的数组,并且有一组线程可以访问数组中的元素,且对数组的访问很频繁。通常int的大小要小于一个缓存行,同一个缓存行中可以存储多个数据项。因此,即使每个线程都能对数据中的成员进行访问,硬件还是会产生乒乓缓存。每当线程访问0号数据项,并对其值进行更新时,缓存行的所有权就需要转移给执行该线程的处理器,这仅是为了更新1号数据项的线程获取1号线程的所有权。缓存行是共享的(即使没有数据存在),因此使用伪共享来描述这种方式。这个问题的解决办法就是对数据进行构造,让同一线程访问的数据项存在临近的地址中(就像是放在同一缓存行中),这样那些能被独立线程访问的数据将分布在相距很远的地方,并且可能是存储在不同的缓存行中。本章接下来的内容中可以看到,这种思路对代码和数据设计的影响。C++17标准在头文件<new>
中定义了std::hardware_destructive_interference_size
它指定了当前编译目标可能共享的连续字节的最大数目。如果确保数据间隔大于等于这个字节数,就不会有错误的共享存在了。
8.2.4 数据的紧凑程度
伪共享(不经意共享)发生的原因:某个线程所要访问的数据过于接近另一线程的数据,另一个是与数据布局相关的陷阱会直接影响单线程的性能。问题在于数据过于接近:单线程访问数据时,数据就已在内存中展开,分布在不同的缓存行上。另一方面,当内存中有紧凑数据时,数据就分布在同一缓存行上。因此,当数据已传播,将会有更多的缓存行从处理器的缓存上加载数据,这会增加访问内存的延迟,以及降低数据的性能(与紧凑的数据存储地址相比较)。
同样的,如果数据已传播,给定缓存行上就包含与当前线程有关和无关的数据。极端情况下,有更多的数据存在于缓存中,就会对数据以更多的关注,而非这些数据去做了什么。这就会浪费宝贵的缓存空间,增加处理器缓存缺失的情况,从而因为其他数据已经占有缓存中的位置,所以需要再从主存中添加对应数据项到缓存中。
现在,对于单线程代码来说任务切换(task switching)就很关键了。如果系统中的线程数量要比核芯多,每个核上都要运行多个线程。这就会增加缓存的压力,为了避免伪共享,努力让不同线程访问不同缓存行。当处理器切换线程时,要对不同内存行上的数据进行加载(当不同线程使用的数据跨越了多个缓存行时),而非对缓存中的数据保持原样(当线程中的数据都在同一缓存行时)。C++17在头文件<new>
中指定了一个常数std::hardware_constructive_interference_size
,这是同一高速缓存行上的连续字节的最大数目(需要对齐)。将所需的数据大小控制在这个字节数内,就能提高缓存命中率。
如果线程数量多于内核或处理器数量,操作系统可能会选择将线程安排给这个核芯一段时间,之后再安排给另一个核芯一段时间。就需要将缓存行从一个内核上,转移到另一个内核上,也意味着要耗费很多时间。虽然,操作系统通常会避免这样的情况发生,不过当其发生的时候,对性能会有很大影响。
当有超级多的线程准备运行时(非等待状态),任务切换就会频繁发生。这个问题我们之前也接触过:超额申请。
8.2.5 过度任务切换与线程过饱和
多线程系统中,通常线程的数量要多于处理器的数量,除非在大规模并行(massively parallel)硬件上运行。不过,线程会花费时间来等待外部I/O完成,或被互斥量阻塞,或等待条件变量等等,所以等待不是问题。使用额外的线程来完成有用的工作,而非让线程在处理器处以闲置状态时持续等待。
不过,这也并非长久之计,如果有很多额外线程,就会有很多线程准备执行。不过,当线程数远远大于可用处理器数量时,操作系统就会忙于切换任务,以确保每个任务都有时间运行,这将增加切换任务的时间开销,和缓存问题造成同一结果。当无限制的产生新线程,超额申请就会加剧,或者在通过任务类型对任务进行划分的时候,线程数量大于处理器数量。这时,对性能影响的因素是CPU的能力,而非I/O。
如果只是简单的通过数据划分生成多个线程,可以限定工作线程的数量,如8.1.2节中那样。如果超额申请是对工作的划分而产生,那么不同的划分方式对性能就没有太多益处了。
其他因素也会影响多线程代码的性能,即使CPU类型和时钟周期相同,乒乓缓存的开销也可以让程序在两个单核处理器和在一个双核处理器上,产生不明显的性能差。