[笔记]C++并发编程实战 《四》同步并发操作

news2025/1/16 14:12:15

文章目录

  • 前言
  • 第4章 同步并发操作
  • 4.1 等待一个事件或其他条件
    • 4.1.1 等待条件达成
    • 4.1.2 使用条件变量构建线程安全队列
  • 4.2 使用期望值等待一次性事件
    • 4.2.1 后台任务的返回值
    • 4.2.2 任务与期望值关联
    • 4.2.3 使用(std::)promises
    • 4.2.4 将异常存与期望值中
    • 4.2.5 多个线程的等待


前言

本章主要内容

  • 等待事件
  • 带有期望的等待一次性事件
  • 在限定时间内等待
  • 使用同步操作简化代码

第4章 同步并发操作

上一章中,我们看到各种在线程间保护共享数据的方法。我们不仅想要保护数据,还想对单独的线程进行同步。 例如,在第一个线程完成前,可能需要等待另一个线程执行完成。通常情况下,线程会等待一个特定事件发生,或者等待某一条件达成。这可能需要定期检查“任务完成”标识,或将类似的东西放到共享数据中,但这与理想情况差很多。像这种情况就需要在线程中进行同步,C++标准库提供了一些工具可用于同步操作,形式上表现为条件变量(condition variables)期望值(futures)

并发技术规范(TS)中,为期望值添加了更多的操作,并与新的同步工具锁存器(latches)(轻量级锁资源)和栅栏机制(barriers)一起使用。

本章将讨论:

  • 如何使用条件变量等待事件
  • 介绍期望,锁存器和栅栏机制,以及如何使用它简化同步操作。

4.1 等待一个事件或其他条件

假设你在旅游,而且正在一辆在夜间运行的火车上。在夜间,如何在正确的站点下车呢?

  • 一种方法是整晚都要醒着,然后注意到了哪一站。这样,你就不会错过你要到达的站点,但是这样会让你感到很疲倦。另外,你可以看一下时间表,估计一下火车到达目的地的时间,然后在一个稍早的时间点上设置闹铃,然后你就可以安心的睡会了。这个方法听起来也很不错,也没有错过你要下车的站点,但是当火车晚点的时候,你就要被过早的叫醒了。当然,闹钟的电池也可能会没电了,并导致你睡过站。理想的方式是,无论是早或晚,只要当火车到站的时候,有人或其他东西能把你唤醒就好了。

这和线程有什么关系呢?

好吧,让我们来联系一下。当一个线程等待另一个线程完成任务时,它会有很多选择。第一,它可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标志进行重设。不过,就是一种浪费:线程消耗宝贵的执行时间持续的检查对应标志,并且当互斥量被等待线程上锁后,其他线程就没有办法获取锁,这样线程就会持续等待。因为以上方式对等待线程限制资源,并且在完成时阻碍对标识的设置。这种情况类似与,保持清醒状态和列车驾驶员聊了一晚上:驾驶员不得不缓慢驾驶,因为你分散了他的注意力,所以火车需要更长的时间,才能到站。同样的,等待的线程会等待
更长的时间,这些线程也在消耗着系统资源。

  • 第二个选择是在等待线程在检查间隙,使用 std::this_thread::sleep_for() 进行周期性的间歇(详见4.3节),如下代码。
    这个循环中,在休眠前②,函数对互斥量进行解锁①,并且在休眠结束后再对互斥量进行上锁,所以另外的线程就有机会获取锁并设置标识。这个实现就进步很多,因为当线程休眠时,线程没有浪费执行时间,但是很难确定正确的休眠时间。太短的休眠和没有休眠一样,都会浪费执行时间;太长的休眠时间,可能会让任务等待线程醒来。休眠时间过长是很少见的情况,因为这会直接影响到程序的行为,当在高节奏游戏中,它意味着丢帧,或在一个实时应用中超越了一个时间片。
bool flag;
std::mutex m;
void wait_for_flag()
{
	std::unique_lock<std::mutex> lk(m);
	while(!flag)
	{
	lk.unlock(); // 1 解锁互斥量
	std::this_thread::sleep_for(std::chrono::milliseconds(100));
	// 2 休眠100ms
	lk.lock(); // 3 再锁互斥量
	}
}
  • 第三个选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”。从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行),终止线程将会向等待着的线程广播“条件达成”的信息。

4.1.1 等待条件达成

C++标准库对条件变量有两套实现:

  • std::condition_variable
  • std::condition_variable_any 。

这两个实现都包含在 <condition_variable> 头文件的声明中。两者都需要与一个互斥量一起才能工作(互斥量是为了同步);前者仅限于与 std::mutex 一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any的后缀。因为 std::condition_variable_any 更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以 std::condition_variable 一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑 std::condition_variable_any 。

所以,如何使用 std::condition_variable 去处理之前提到的情况——当有数据需要处理时,如何唤醒休眠中的线程对其进行处理?以下清单展示了一种使用条件变量做唤醒的方式。清单4.1 使用 std::condition_variable 处理数据等待

std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
void data_preparation_thread()
{
    while (more_data_to_prepare())
    {
        data_chunk const data = prepare_data();
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);  // 2
        data_cond.notify_one(); // 3
    }
}
void data_processing_thread()
{
    while (true)
    {
        std::unique_lock<std::mutex> lk(mut); // 4
        data_cond.wait(
            lk, []
            { return !data_queue.empty(); }); // 5
        data_chunk data = data_queue.front();
        data_queue.pop();
        lk.unlock(); // 6
        process(data);
        if (is_last_chunk(data))
            break;
    }
}

首先,有一个用来在两个线程之间传递数据的队列①。当数据准备好时,使用 std::lock_guard 对队列上锁,将准备好的数据压入队列中②,之后线程会对队列中的数据上锁。再调用 std::condition_variablenotify_one()成员函数,对等待的线程(如果有等待线程)进行通知③。

另外一侧,有一个正在处理数据的线程,这个线程首先对互斥量上锁,但这里 std::unique_lock 要比 std::lock_guard ④更加合适。线程之后会调用 std::condition_variable 的成员函数wait(),传递一个锁和一个lambda函数表达式(作为等待的条件⑤)。

Lambda函数是C++11添加的新特性,它可以让一个匿名函数作为其他表达式的一部分,并且非常合适作为标准函数的谓词,例如wait()函数。在这个例子中,简单的Lambda函数 []{return !data_queue.empty();} 会去检查data_queue是否不为空,当data_queue不为空——那就意味着队列中已经准备好数据了。附录A的A.5节有Lambda函数更多的信息。

wait()会去检查这些条件(通过调用所提供的lambda函数),当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false),wait()函数将解锁互斥量,并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠状态中苏醒,重新获取互斥锁,并且再次检查条件是否满足。在条件满足的情况下,从wait()返回并继续持有锁;当条件不满足时,线程将对互斥量解锁,并且重新开始等待。

这就是为什么用 std::unique_lock 而不使用 std::lock_guard ——等待中的线程必须在等待期间解锁互斥量,并在这之后对互斥量再次上锁,而 std::lock_guard 没有这么灵活。如果互斥量在线程休眠期间保持锁住状态,准备数据的线程将无法锁住互斥量,也无法添加数据到队列中;同样的,等待线程也永远不会知道条件何时满足。

清单4.1使用了一个简单的Lambda函数用于等待⑤(这个函数用于检查队列何时不为空),不过任意的函数和可调用对象都可以传入wait()。当写好一个函数去做检查条件(或许比清单中简单检查要复杂很多)时,就可以直接将这个函数传入wait();不一定非要放在一个Lambda表达式中。调用wait()的过程中,一个条件变量可能会去检查给定条件若干次;然而,它总是在互斥量被锁定时这样做,当且仅当提供测试条件的函数返回true时,它就会立即返回。当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,就是所谓的伪唤醒(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。如果这样做了,就必须做好多次产生副作用的心理准备。

本质上说, std::condition_variable::wait是对“忙碌-等待”的一种优化。的确,不理想的方式就是用简单的循环来实现:

template <typename Predicate>
void minimal_wait(std::unique_lock<std::mutex> &lk, Predicate pred)
{
    while (!pred())
    {
        lk.unlock();
        lk.lock();
    }
}

最好为wiat()准备一个最小化实现,同时这个实现只能使用notify_one()notify_all()进行调用。

解锁 std::unique_lock 的灵活性,不仅适用于对wait()的调用;它还可以用于待处理但还未处理的数据⑥。处理数据可能是一个耗时的操作,并且在第3章也提过,持有锁的时间过长是一个多么糟糕的主意。

使用队列在多个线程中转移数据(如清单4.1)是很常见的。做得好的话,同步操作可以限制在队列内部,同步问题和条件竞争出现的概率也会降低。鉴于这些好处,现在从清单4.1中提取出一个通用线程安全的队列。

4.1.2 使用条件变量构建线程安全队列

当设计一个通用队列时,就要花一些时间想想有哪些操作需要添加到队列实现中去,就如之前在3.2.3节看到的线程安全的栈。可以看一下C++标准库提供的实现,找找灵感; std::queue<> 容器的接口展示如下:
清单4.2 std::queue 接口

template <class T, class Container = std::deque<T>>
class queue
{
public:
    explicit queue(const Container &);
    explicit queue(Container && = Container());
    template <class Alloc>
    explicit queue(const Alloc &);
    template <class Alloc>
    queue(const Container &, const Alloc &);
    template <class Alloc>
    queue(Container &&, const Alloc &);
    template <class Alloc>
    queue(queue &&, const Alloc &);
    void swap(queue &q);
    bool empty() const;
    size_type size() const;
    T &front();
    const T &front() const;
    T &back();
    const T &back() const;
    void push(const T &x);
    void push(T &&x);
    void pop();
    template <class... Args>
    void emplace(Args &&...args);
};

忽略构造、赋值以及交换操作时,剩下了三组操作:

  1. 对整个队列的状态进行查询(empty()和size());
  2. 查询在队列中的各个元素(front()和back());
  3. 修改队列的操作(push(), pop()和emplace())。

和3.2.3中的栈一样,因此也会遇到在固有接口上的条件竞争。因此,需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。

与清单4.1中的代码不同的是:当使用队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里提供pop()函数的两个变种:

  • try_pop()。try_pop() ,尝试从队列中弹出数据,总会直接返回(当有失败时),即使没有值可检索;
  • wait_and_pop()。wait_and_pop(),将会等待有值可检索的时候才返回。

当使用之前栈的方式来实现队列,接口就可能会是下面这样:
清单4.3 线程安全队列的接口

#include <memory> // 为了使用std::shared_ptr
template <typename T>
class threadsafe_queue
{
public:
    threadsafe_queue();
    threadsafe_queue(const threadsafe_queue &);
    threadsafe_queue &operator=(
        const threadsafe_queue &) = delete; // 不允许简单的赋值
    void push(T new_value);
    bool try_pop(T &value);       // 1
    std::shared_ptr<T> try_pop(); // 2
    void wait_and_pop(T &value);
    std::shared_ptr<T> wait_and_pop();
    bool empty() const;
};

就像之前对栈做的那样,将很多构造函数剪掉了,并且禁止了简单赋值。和之前一样,也需要提供两个版本的try_pop()和wait_for_pop()。第一个重载的try_pop()①在引用变量中存储着检索值,所以它可以用来返回队列中值的状态;当检索到一个变量时,他将返回true,否则将返回false(详见A.2节)。第二个重载②就不能做这样了,因为它是用来直接返回检索值的。当
没有值可检索时,这个函数可以返回NULL指针。

那么问题来了,如何将以上这些和清单4.1中的代码相关联呢?

好吧,现在就来看看怎么去关联。从之前的代码中提取push()和wait_and_pop(),如以下清单所示。
清单4.4 从清单4.1中提取push()和wait_and_pop()

#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class threadsafe_queue
{
private:
    std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;

public:
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }
    void wait_and_pop(T &value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
                       { return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
    while (more_data_to_prepare())
    {
        data_chunk const data = prepare_data();
        data_queue.push(data); // 2
    }
}
void data_processing_thread()
{
    while (true)
    {
        data_chunk data;
        data_queue.wait_and_pop(data); // 3
        process(data);
        if (is_last_chunk(data))
            break;
    }
}

线程队列的实例中包含有互斥量和条件变量,所以独立的变量就不需要了①,并且调用push()也不需要外部同步②。

当然,wait_and_pop()还要兼顾条件变量的等待③。

另一个wait_and_pop()函数的重载写起来就很琐碎了,剩下的函数就像从清单3.5实现的栈中一个个的粘过来一样。最终的队列实现如下所示。

清单4.5 使用条件变量的线程安全队列(完整版)

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template <typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut; // 1 互斥量必须是可变的
    std::queue<T> data_queue;
    std::condition_variable data_cond;

public:
    threadsafe_queue()
    {
    }
    threadsafe_queue(threadsafe_queue const &other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }
    void wait_and_pop(T &value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
                       { return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
                       { return !data_queue.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    bool try_pop(T &value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = data_queue.front();
        data_queue.pop();
        return true;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

empty()是一个const成员函数,并且传入拷贝构造函数的other形参是一个const引用;因为其他线程可能有这个类型的非const引用对象,并调用变种成员函数,所以这里有必要对互斥量上锁。又因为锁住互斥量是个可变操作,所以互斥量成员必须(修饰)为mutable①才能在empty()和拷贝构造函数中锁住。

条件变量在多个线程等待同一个事件时,也是很有用的。当线程用来分解工作负载,并且只有一个线程可以对通知做出反应,与清单4.1中使用的结构完全相同;运行多个数据实例——处理线程(processing thread)。当新的数据准备完成,调用notify_one()将会触发一个正在执行wait()的线程,去检查条件和wait()函数的返回状态(因为你仅是向data_queue添加一个数据项)。 这里不保证线程一定会被通知到,即使只有一个等待线程收到通知,所有处线程也有可能仍然在处理数据,而忽略了这个通知。

另一种可能是,很多线程等待同一事件。对于通知,他们都需要做出回应。这会发生在共享数据初始化的时候,当处理线程可以使用同一数据时,就要等待数据被初始化(有不错的机制可用来应对;可见第3章,3.3.1节),或等待共享数据的更新,比如,定期重新初始化(periodic reinitialization)。在这些情况下,线程准备数据数据时,就会通过条件变量调用notify_all()成员函数,而非直接调用notify_one()函数。顾名思义,这就是全部线程在都去执行wait()(检查他们等待的条件是否满足)的原因。
等待线程只等待一次,当条件为true时,它就不会再等待条件变量了,所以一个条件变量可能并非同步机制的最好选择。尤其是,条件在等待一组可用的数据块时。在这样的情况下,期望值(future)就是一个不错的选择。

4.2 使用期望值等待一次性事件

假设你乘飞机去国外度假。当你到达机场并办理完各种登机手续后,还需要等待机场广播通知登机时(可能要等很多个小时),你可能会在候机室里面找一些事情来打发时间,比如:读书,上网,或者来一杯价格不菲的机场咖啡。不过,从根本上来说你就在等待一件事情:机场广播能够登机的时间。之前飞机的班次在之后没有可参考性,因为当你在再次度假的时候,可能会选择等待另一班飞机。

C++标准库模型将这种一次性事件称为期望值(future)。当线程需要等待特定的一次性事件时,某种程度上来说就需要知道这个事件在未来的期望结果。之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望值的状态会变为就绪(ready)。一个期望值可能是数据相关的(比如,你的登机口编号),也可能不是。当事件发生时(并且期望状态为就绪),并且这个期望值就不能被重置。

C++标准库中,有两种期望值使用两种类型模板实现,声明在 头文件中:

  • 唯一期望值(unique futures)( std::future<> )
  • 共享期望值(shared futures)( std::shared_future<> )。

仿照了 std::unique_ptr 和 std::shared_ptr 。 std::future 的实例只能与一个指定事件相关联,而 std::shared_future 的实例就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如 std::unique_ptr 和 std::shared_ptr 的模板参数就是相关联的数据类型。与数据无关处,可以使用 std::future 与 std::shared_future 的特化模板。虽然,我希望用于线程间的通讯,但是期望值对象本身并不提供同步访问。当多个线程需要访问一个独立期望值对象时,必须使用互斥量或类似同步机制对访问进行保护,如在第3章所述。不过,在将要阅读到的4.2.5节中,多个线程会对一个 std::shared_future<> 实例的副本进行访问,即使他们是同一个异步结果,也不需要同步期望值。

并行技术规范将这两个模板类在 std::experimental 命名空间中进行了扩展:

  • std::experimental::future<>
  • std::experimental::shared_future<> 。

这个命名空间就是为了将其与 std 命名空间中的模板类进行区分,实验性的命名空间中为这两个模板类添加了更多的功能。尤为重要的是 std::experimental 中的内容与代码质量无关(我希望这里也会有较高质量的实现),需要强调是这个命名空间提供的类和函数都不是标准的,这个命名空间中类和函数的语法和语义,很可能与纳入C++标准(也就是 std 命名空间)后有所不同。如果想要
使用这两个试验性的模板类,需要包含 <experimental/future> 头文件。

最简单的一次性事件,就是一个后台运行出的计算结果。第2章中已经清楚了 std::thread 执行的任务不能有返回值,并且我能保证,这个问题将在使用期望值解决——现在就来看看是怎么解决的。

4.2.1 后台任务的返回值

假设,你有一个需要长时间的运算,你需要其能计算出一个有效的值,但是你现在并不迫切需要这个值。可能你已经找到了生命、宇宙,以及万物的答案,就像道格拉斯·亚当斯[1]一样。你可以启动一个新线程来执行这个计算,这就意味着你需要计算的结果,因为 std::thread 并不提供直接接收返回值的机制。这里就需要 std::async 函数模板(也是在头文 中声明的)。

当不着急要任务结果时,可以使用 std::async 启动一个异步任务。与 std::thread 对象等待的方式不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()成员函数;并且会阻塞线程直到期望值状态为就绪为止;之后,返回计算结果。下面清单中代码就是一个简单的例子。

清单4.6 使用 std::future 从异步任务中获取返回值

#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int>
the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

与 std::thread 做的方式一样, std::async 允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在 std::ref 中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。就如 std::thread ,当参数为右值时,拷贝操作将使用移动的方式转移原始数据。

这就允许使用“只移动”类型作为函数对象和参数。来看一下下面的程序清单:
清单4.7 使用 std::async 向函数传递参数

#include <string>
#include <future>
struct X
{
    void foo(int, std::string const &);
    std::string bar(std::string const &);
};
X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42,"hello"),p是指向x的指针
auto f2 = std::async(&X::bar, x, "goodbye");    // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
struct Y
{
    double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141);         // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
X baz(X &);

std::async(baz, std::ref(x)); // 调用baz(x)
class move_only
{
public:
    move_only();
    move_only(move_only &&)
        move_only(move_only const &) = delete;
    move_only &operator=(move_only &&);
    move_only &operator=(move_only const &) = delete;
    void operator()();
};
auto f5 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only()) 构造得到

默认情况下,期望值是否等待取决于 std::async 是否启动一个线程,或是否有任务正在进行同步。大多数情况下(估计这就是你想要的结果),也可以在函数调用之前向 std::async 传递一个额外参数,这个参数的类型是 std::launch ,还可以是 std::launch::defered ,表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async 表明函数必须在其所在的
独立线程上执行, std::launch::deferred | std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的,当函数调用被延迟,它可能不会在运行了。如下所示:

auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在wait()或get()调用时执行
auto f8=std::async(std::launch::deferred | std::launch::async,baz,std::ref(x)); // 实现选择执行方式
auto f9=std::async(baz,std::ref(x));
f7.wait(); // 调用延迟函数

本章的后面和第8章中,将会再次看到这段程序,使用 std::async 会更容易让算法分割到各个任务中,这样程序就能并发的执行了。不过,这不是让 std::future 与任务实例相关联的唯一方式;你也可以将任务包装入 std::packaged_task<> 实例中,或通过编写代码的方式,使用 std::promise<> 类型模板显示设置值。与 std::promise<> 对比, std::packaged_task<> 具有更高层的抽象,所以我们从“高抽象”的模板说起。

4.2.2 任务与期望值关联

std::packaged_task<> 对一个函数或可调用对象,绑定一个期望值。当调用 std::packaged_task<> 对象时,它就会调用相关函数或可调用对象,将期望状态置为就绪,返回值也会被存储。这可以用在构建线程池的结构单元(可见第9章),或用于其他任务的
管理,比如:在任务所在线程上运行其他任务,或将它们顺序的运行在一个特殊的后台线程上。当一个粒度较大的操作被分解为独立的子任务时,其中每个子任务都可以包含在一个 std::packaged_task<> 实例中,之后这个实例将传递到任务调度器或线程池中。对任务细节进行抽象,调度器仅处理 std::packaged_task<> 实例,而非处理单独的函数。std::packaged_task<> 的模板参数是一个函数签名,比如void()就是一个没有参数也没有返回值的函数,或int(std::string&, double*)就是有一个非const引用的 std::string 和一个指向double类型的指针,并且返回类型是int。当构造出一个 std::packaged_task<> 实例时,就必须传入一个函数或可调用对象;这个函数或可调用的对象,需要能接收指定的参数和返回可转换为指定返回类型的值。类型可以不完全匹配,可以用一个int类型的参数和返回一个float类型的函数,来构建 std::packaged_task<double(double)> 的实例,因为这里类型可以隐式转换。

函数签名的返回类型可以用来标识从get_future()返回的 std::future<> 的类型,而函数签名的参数列表,可用来指定packaged_task的函数调用操作符。

例如,模板偏特化 std::packaged_task<std::string(std::vector*,int)> 将在下面的代码清单中使用。
清单4.8 std::packaged_task<> 的偏特化

template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
	template<typename Callable>
	explicit packaged_task(Callable&& f);
	std::future<std::string> get_future();
	void operator()(std::vector<char>*,int);
};

因为 std::packaged_task 对象是一个可调用对象,所以可以封装在 std::function 对象中,从而作为线程函数传递到 std::thread 对象中,或作为可调用对象传递另一个函数中,或可以直接进行调用。当 std::packaged_task 作为一个函数被调用时,实参将由函数调用操作符传递到底层函数,并且返回值作为异步结果存储在 std::future ,可通过get_future()获取。因此你可以把用 std::packaged_task 打包任务,并在它被传到别处之前的适当时机取回期望值。

当需要异步任务的返回值时,你可以等待期望的状态变为“就绪”。下面的代码就是这么个情况。

线程间传递任务
很多图形架构需要特定的线程去更新界面,所以当一个线程对界面的更新时,它要发出一条信息给正确的线程,让特定的线程来做界面更新。 std::packaged_task 提供了实现这种功能的方法,且不需要发送一条自定义信息给图形界面相关线程。下面来看看代码。

清单4.9 使用 std::packaged_task 执行一个图形界面线程

#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() // 1
{
    while (!gui_shutdown_message_received()) // 2
    {
        get_and_process_gui_message(); // 3
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if (tasks.empty()) // 4
                continue;
            task = std::move(tasks.front()); // 5
            tasks.pop_front();
        }
        task(); // 6
    }
}
std::thread gui_bg_thread(gui_thread);
template <typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
    std::packaged_task<void()> task(f);        // 7
    std::future<void> res = task.get_future(); // 8
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task)); // 9
    return res;                       // 10
}

这段代码十分简单:图形界面线程①循环直到收到一条关闭图形界面的信息后关闭②,进行轮询界面消息处理③,例如:用户点击和执行在队列中的任务。当队列中没有任务④,它将再次循环;除非,它能在队列中提取出一个任务⑤,然后释放队列上的锁,并且执行任务⑥。这里,期望值与任务相关,当任务执行完成时,其状态会被置为“就绪”状态。

将一个任务传入队列:提供的函数⑦可以提供一个打包好的任务,可以通过这个任务⑧调用get_future()成员函数获取期望值对象,并且在任务被推入列表⑨之前,期望值将返回调用函数⑩。当需要知道线程执行完任务时,向图形界面线程发布消息的代码,会等待期望值改变状态;否则,会丢弃这个期望值。

例子中使用 std::packaged_task<void()> 创建任务,其包含了一个无参数无返回值的函数或可调用对象(如果当这个调用有返回值时,返回值会被丢弃)。这可能是最简单的任务,如你之前所见, std::packaged_task 也可以用于一些复杂的情况——通过指定一个不同的函数签名作为模板参数,不仅可以改变其返回类型(因此该类型的数据会存在期望相关的状态中),也可以改变函数操作符的参数类型。这个例子可以简单的扩展成允许任务运行在图形界面线程上,且接受传参,还有通过 std::future 获取返回值,而不仅仅是完成一个指标。

这些任务能作为一个简单的函数调用来表达吗?还有,任务的结果能从很多地方得到吗?

这些问题可以使用第三种方法创建期望值来解决:使用 std::promise 对值进行显示设置。

4.2.3 使用(std::)promises

当一个应用需要处理很多网络连接时,它会使用不同线程尝试连接每个接口,因为这能使网络尽早联通,尽早执行程序。当连接较少的时候,工作没有问题(也就是线程数量比较少)。

不幸的是,随着连接数量的增长,这种方式变的越来越不合适;因为大量的线程会消耗大量的系统资源,还有可能造成线程上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能有影响。最极端的例子:系统资源被创建的线程消耗殆尽,系统连接网络的能力会变的极差。因此通过少数线程(可能只有一个)处理网络连接,每个线程同时处理多个连接事件,对需要处理大量的网络连接的应用而言是普遍的做法。

考虑一个线程处理多个连接事件,来自不同的端口连接的数据包基本上是以乱序方式进行处理的;同样的,数据包也将以乱序的方式进入队列。很多情况下,另一些应用不是等待数据成功的发送,就是等待一批(新的)来自指定网络接口的数据接收成功。

std::promise 提供设定值的方式(类型为T),这个类型会和后面看到的 std::future 对象相关联。一对 std::promise/std::future 会为这种方式提供一个可行的机制;期望值可以阻塞等待线程,同时,提供数据的线程可以使用组合中的承诺值来对相关值进行设置,并将期望值的状态置为“就绪”。

可以通过一个给定的 std::promise 的get_future()成员函数来获取与之相关的 std::future 对象,跟 std::packaged_task 的用法类似。当承诺值已经设置完毕(使用set_value()成员函数),对应期望值的状态变为“就绪”,并且可用于检索已存储的值。当在设置值之前销毁 std::promise ,将会存储一个异常。在4.2.4节中,会详细描述异常是如何传送到线程的。

清单4.10中,单线程处理多接口的实现,如同之前所说,在这个例子中,可以使用一对 std::promise/std::future 找出一块传出成功的数据块;与期望值相关的只是一个简单的“成功/失败”标识。对于传入包,与期望值相关的数据就是数据包的有效负载。

清单4.10 使用承诺值解决单线程多连接问题

#include <future>
void process_connections(connection_set &connections)
{
    while (!done(connections)) // 1
    {
        for (connection_iterator // 2
                 connection = connections.begin(),
                 end = connections.end();
             connection != end;
             ++connection)
        {
            if (connection->has_incoming_data()) // 3
            {
                data_packet data = connection->incoming();
                std::promise<payload_type> &p =
                    connection->get_promise(data.id); // 4
                p.set_value(data.payload);
            }
            if (connection->has_outgoing_data()) // 5
            {
                outgoing_packet data =
                    connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true); // 6
            }
        }
    }
}

函数process_connections()中,直到done()返回true①为止。每一次循环,都会依次的检查每一个连接②,检索是否有数据③或正在发送已入队的传出数据⑤。假设输入数据包是具有ID和有效负载的(有实际的数在其中)。一个ID映射到一个 std::promise (可能是在相关容器中进行的依次查找)④,并且值是设置在包的有效负载中的。对于传出包,包是从传出队列中进行检索的,实际上从接口直接发送出去。当发送完成,与传出数据相关的承诺值将置为true,来表明传输成功⑥。是否能映射到实际网络协议上,取决于网络所用协议;这里的“承诺值/期望值”组合方式可能在特殊的情况下无法工作,但它与一些操作系统支持的异步输入/输出结构类似。

上面的代码完全不理会异常,它可能在想象的世界中,一切工作都会很好的执行,但是这有
悖常理。有时候磁盘满载,有时候会找不到东西,有时候网络会断,还有时候数据库会崩
溃。当需要某个操作的结果时,就需要在对应的线程上执行这个操作,因为代码可以通过一
个异常来报告错误;不过,这会对使用 std::packaged_task 或 std::promise 带来一些不必要
的限制(在所有工作都正常的情况下)。因此,C++标准库提供了一种在以上情况下清理异常的
方法,并且允许他们将异常存储为相关结果的一部分。

4.2.4 将异常存与期望值中

看完下面短小的代码段,思考一下,当你传递-1到square_root()中时,它将抛出一个异常,并且让这个异常被调用者看到:

double square_root(double x)
{
	if(x<0)
	{
		throw std::out_of_range(“x<0);
	}
	
	return sqrt(x);
}

假设调用square_root()函数不是当前线程,

double y=square_root(-1);

将调用改为异步调用:

std::future<double> f=std::async(square_root,-1);
double y=f.get();

如果行为是完全相同时,其结果是理想的;任何情况下,y获得函数调用的结果,当线程调用f.get()时,就能再看到异常了,即使在一个单线程例子中。

好吧,事实的确如此:函数作为 std::async 的一部分时,当调用抛出一个异常时,这个异常就会存储到期望值中,之后期望值的状态被置为“就绪”,之后调用get()会抛出这个已存储异常(注意:标准级别没有指定重新抛出的这个异常是原始的异常对象,还是一个拷贝;不同的编译器和库将会在这方面做出不同的选择)。将函数打包入 std::packaged_task 任务包中后,到任务被调用时,同样的事情也会发生;打包函数抛出一个异常,这个异常将被存储在期望值中,准备在get()调用时再次抛出。

当然,通过函数的显式调用, std::promise 也能提供同样的功能。当存入的是一个异常而非一个数值时,就需要调用set_exception()成员函数,而非set_value()。这通常是用在一个catch块中,并作为算法的一部分,为了捕获异常,使用异常填充承诺值:

extern std::promise<double> some_promise;
try
{
	some_promise.set_value(calculate_value());
}
catch(...)
{
	some_promise.set_exception(std::current_exception());
}

这里使用了 std::current_exception() 来检索抛出的异常,可用 std::copy_exception() 作为一个替代方案, std::copy_exception() 会直接存储一个新的异常而不抛出:

some_promise.set_exception(std::copy_exception(std::logic_error("foo ")));

这比使用try/catch块更加清晰,当异常类型已知,就应该优先被使用;不是因为代码实现简单,而是它给编译器提供了极大的代码优化空间。

另一种向期望值中存储异常的方式是,在没有调用承诺值上的任何设置函数前,或正在调用包装好的任务时,销毁与 std::promise 或 std::packaged_task 相关的期望值对象。任何情况下,当期望值的状态还不是“就绪”时,调用 std::promise 或 std::packaged_task 的析构函数,将会存储一个与 std::future_errc::broken_promise 错误状态相关的 std::future_error 异常;

通过创建一个期望值,可以构造一个承诺值为其提供值或异常;可以通过销毁值和异常源,去违背承诺值。这种情况下,编译器没有在期望值中存储任何东西,等待线程可能会永远的等下去。

直到现在,例子都在用 std::future 。不过, std::future 也有局限性。很多线程在等待的时候,只有一个线程能获取等待结果。当多个线程需要等待相同的事件的结果,就需要使用 std::shared_future 来替代 std::future 了。

4.2.5 多个线程的等待

虽然 std::future 可以处理所有在线程间数据转移的同步,但是调用某一特殊 std::future 对象的成员函数,就会让这个线程的数据和其他线程的数据不同步。多线程在没有额外同步的情况下,访问一个独立的 std::future 对象时,就会有数据竞争和未定义的行为。这是因为 std::future 模型独享同步结果的所有权,并且通过调用get()函数,一次性的获取数据,这就让并发访问变的毫无意义——只有一个线程可以获取结果值,因为在第一次调用get()后,就没有值可以再获取了。

如果并行代码没有办法让多个线程等待同一个事件,先别太失落, std::shared_future 可以来帮你解决。因为 std::future 是只移动的,所以其所有权可以在不同的实例中互相传递,但是只有一个实例可以获得特定的同步结果,而 std::shared_future 实例是可拷贝的,所以多个对象可以引用同一关联期望值的结果。

每一个 std::shared_future 的独立对象上,成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时避免数据竞争,必须使用锁来对访问进行保护。优先使用的办法:为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。

这样,当每个线程都通过自己拥有的 std::shared_future 对象获取结果,那么多个线程访问共享同步结果就是安全的。可见图4.1。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/620039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Mysql】InnoDB 中的聚簇索引、二级索引、联合索引

一、聚簇索引 其实之前内容中介绍的 B 树就是聚簇索引。 这种索引不需要我们显示地使用 INDEX 语句去创建&#xff0c;InnoDB 引擎会自动创建。另外&#xff0c;在 InnoDB 引擎中&#xff0c;聚簇索引就是数据的存储方式。 它有 2 个特点&#xff1a; 特点 1 使用记录主键…

【每日挠头算法题(2)】压缩字符串|仅执行一次字符串交换能否使两个字符串相等

文章目录 一、压缩字符串思路 二、仅执行一次字符串交换能否使两个字符串相等思路1&#xff1a;计数法思路2&#xff1a;模拟法 总结 一、压缩字符串 点我直达~ 思路 使用双指针法 大致过程如下&#xff1a; 使用双指针&#xff0c;分别读&#xff08;read&#xff09;&…

mPak的使用文档

mpak介绍 mpak是一种文件格式&#xff0c;同时也是一款虚幻引擎插件&#xff0c;该插件提供了打包、解析和挂载mPak文件的方法&#xff0c;将不同平台的软件包和未编译的资源集成到mPak文件中&#xff0c;该文件具有跨平台兼容性。它支持不同阶段的挂载&#xff0c;例如在编辑…

monaco-editor插件自定义编辑器内容颜色

开始之前先说一下他的自定义内容颜色的api monaco.languages.setMonarchTokensProvider((languageId: string, languageDef: IMonarchLanguage | Thenable<IMonarchLanguage>)) 参数解析&#xff1a;接收两个参数 一个是你要设置的编辑器语言种类&#xff0c;可以是sq…

Ubuntu14.04安装igH EtherCAT Master

一、下载EtherCAT安装包 安装包下载路径&#xff1a;EtherLab EtherCAT Master / Code / [334c34]&#xff0c;打开后点击Download下载 二、安装前置依赖库 最好切换到root用户进行下列步骤 apt-get install autoconf automake libtool net-tools三、编译安装 解压安装包&a…

excel如何折叠展开行列?

Excel可以使用分组功能来实现折叠展开行列的效果&#xff0c;同时可以在单元格内添加号或-号来进行操作。 具体步骤如下&#xff1a; 1. 选中需要进行折叠展开的行或列&#xff0c;右键选择“分组”。 2. 在弹出的“分组”对话框中&#xff0c;选择“行”或“列”&#xff0…

『赠书活动 | 第十一期』清华社赞助 | 《Python系列丛书》

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 『赠书活动 &#xff5c; 第十一期』 本期书籍&#xff1a;《Python系列丛书》 公众号赠书&#xff1a;第三期 参与方式&#xff1a;关注公众号&#xff1a;低调而奢…

逻辑回归分类器-创建词向量-情感分析

题目 请使用您今天学习的逻辑回归分类器对下面的聊天机器人数据进行自动分类。&#xff08;3分&#xff09; https://github.com/songys/Chatbot_data&#xff08;ChatbotData.csv文件&#xff09; https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotDa…

【职业人生】如何有效的在职场当中避免工作失误和提高个人发展

《左传宣公二年》&#xff1a;“人谁无过&#xff0c;过而能改&#xff0c;善莫大焉。”古往今来&#xff0c;多少人犯过错误。强大如“智绝”的诸葛孔明&#xff0c;也有街亭之失。职场人更是难免会在工作中出现失误。 在职场生涯当中避免不了在工作当中带来的失误&#xff0c…

【Hadoop综合实践】手机卖场大数据综合项目分析

&#x1f680; 本文章实现了基于MapReduce的手机浏览日志分析 &#x1f680; 文章简介&#xff1a;主要包含了数据生成部分&#xff0c;数据处理部分&#xff0c;数据存储部分与数据可视化部分 &#x1f680; 【本文仅供参考】其中需求实现的方式有多种&#xff0c;提供的代码并…

从0到1搞定在线OJ

目录 一、在线OJ的的原理 二、在线OJ的使用规则 三、注意事项 1.关于作弊 2.如何防止作弊 3.输入输出格式 4.换行问题 四、经典在线OJ坑人题目以及博主被坑经历 五、提交不成功及解决方法 六、如何得心应手的拿下OJ系统 七、在线OJ的骗分技巧 在线OJ&#xff08;Onl…

管理项目-查询数据

人事管理项目-查询数据模块 后端实现配置文件实体类Dao层测试前端实现1&#xff0e;创建Dept页面2&#xff0e;修改路由3 测试 后端实现 配置文件 在application.yml文件中配置数据库连接、JPA及端口等信息&#xff0c;代码如下&#xff1a; 实体类 配置完成后建立和表结构…

“出海热”仍在持续,进军东南亚市场谁能率先突围?

在油改电的趋势下&#xff0c;伴随钠电池车型的推出&#xff0c;电动两轮车市场被进一步激活。据艾瑞咨询不完全统计与估算&#xff0c;2022年国内两轮电动车销量约5010万辆&#xff0c;较去年增长15.2%&#xff0c;预计2023年销量达到5400万辆。持续增长的销量足以说明当下的国…

JeecgBoot低代码平台 3.5.2,仪表盘版本发布!重磅新功能—支持在线拖拽设计大屏和门户

项目介绍 JeecgBoot是一款企业级的低代码平台&#xff01;前后端分离架构 SpringBoot2.x&#xff0c;SpringCloud&#xff0c;Ant Design&Vue3&#xff0c;Mybatis-plus&#xff0c;Shiro&#xff0c;JWT 支持微服务。强大的代码生成器让前后端代码一键生成! JeecgBoot引领…

北京君正案例:数传网关的集大成者—积木式边缘网关

数传网关的集大成者 USR-M300产品集成了数据的边缘采集、计算、主动上报和数据读写&#xff0c;联动控制&#xff0c;IO采集和控制等功能&#xff0c;采集协议包含标准Modbus协议和多种常见的PLC协议&#xff0c;以及行业专用协议&#xff1b;主动上报采用分组上报方式&#xf…

如何靠自学成为一名网络安全工程师?

1. 前言 说实话&#xff0c;一直到现在&#xff0c;我都认为绝大多数看我这篇文章的读者最后终究会放弃&#xff0c;原因很简单&#xff0c;自学终究是一种适合于极少数人的学习方法&#xff0c;而且非常非常慢&#xff0c;在这个过程中的变数过大&#xff0c;稍有不慎&#…

电脑提示d3dcompiler_47.dll缺失怎么修复?

d3dcompiler_47.dll是 Microsoft 的 DirectX 11 核心组件之一&#xff0c;它主要用于编译和运行 Direct3D 11 应用程序和游戏。如果您的系统中缺少这个 DLL 文件&#xff0c;可能会导致一些程序无法正常运行&#xff0c;很多游戏跟图形处理软件都会运用到。如果电脑提示“找不到…

软考A计划-系统架构师-官方考试指定教程-(1/15)

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&am…

Elasticsearch数据库索引及数据操作

目录结构 前言数据库初始化索引操作创建索引获取索引获取所有索引删除索引 数据操作新增POST方式PUT方式 查询主键查询全量查询search 修改全量覆盖部分修改 删除 前言 Elasticsearch安装成功情况下&#xff1b;使用Postman请求操作数据库&#xff1b;浏览器插件实现Elasticsea…

Ubuntu做深度学习+ros怎么分区

正好要重装系统了&#xff0c;学习以下怎么分区 买了铠侠rc20 &#xff0c; 1T用来做Ubuntu系统盘 整理一下要安装的东西&#xff1a; 1.要装cuda &#xff0c;6G&#xff08; 安装在 /usr/local/cuda-11.1 &#xff09; 挂载点 /usr: 存放用户程序&#xff0c;一般在/usr/…