目录
1.引言
2.源码分析
2.1.std::promise的源码实现
2.2.std::future的源码实现
2.3.关联状态对象的代码实现
3.整个类图
4.future和promise之间的并发安全和线程同步
5.总结
1.引言
异步编程之std::future(一): 使用-CSDN博客
在std::future(一)中详细的介绍了它的用法。在讲std::future会不可避免的想到std::promise,二者的关系对应如下:
如上图所示,异步调用创建的时候,会返回一个std::future
对象实例给异步调用创建方。异步调用执行方持有std::promise
对象实例。双方持有的std::promise
对象实例和std::future
对象实例分别连接一个共享对象,这个共享对象在异步调用创建方和异步调用执行方之间构建了一个信息同步的通道(channel
),双方通过这个通道进行异步调用执行情况的信息交互
异步调用执行方访问这个通道是通过自身持有的std::promise
实例来向通道中写入值的,异步调用创建方是通过自身持有的std::future
对象实例来获取通道中的值的。当异步调用执行方完成异步调用的执行之后,会通过std::promise
对象实例向通道中写入异步调用执行的结果值。异步调用创建方通过自身持有的std::future
对象实例来获取异步调用结果
异步调用执行方通过std::promise
来兑现承诺(promise
,承诺调用完成之后在未来某一时刻交付结果),异步调用创建方通过std::future
来获取这个未来的值(future
,未来兑现的承诺对应的结果值)。
2.源码分析
首先,我们先看一下promise
的源码实现(以VS2019实现为例)。
2.1.std::promise的源码实现
首先,我们先看一下promise
的数据成员,代码截图如下。从下图我们可以看出,promise
的数据成员只有一个_MyPromise:
template <class _Ty>
class promise { // class that defines an asynchronous provider that holds a value
public:
static_assert(!is_array_v<_Ty> && is_object_v<_Ty> && is_destructible_v<_Ty>,
"T in promise<T> must meet the Cpp17Destructible requirements (N4878 [futures.promise]/1).");
private:
_Promise<_Ty> _MyPromise;
};
于是找到_MyPromise,它的定义如下:
template <class _Ty>
class _Promise {
public:
_Promise(_Associated_state<_Ty>* _State_ptr) : _State(_State_ptr, false), _Future_retrieved(false) {}
_Promise(_Promise&& _Other) : _State(_STD move(_Other._State)), _Future_retrieved(_Other._Future_retrieved) {}
_Promise& operator=(_Promise&& _Other) {
_State = _STD move(_Other._State);
_Future_retrieved = _Other._Future_retrieved;
return *this;
}
private:
_State_manager<_Ty> _State;
bool _Future_retrieved;
};
_MyPromise里面有个_State_manager,它的定义如下:
template <class _Ty>
class _State_manager {
// class for managing possibly non-existent associated asynchronous state object
public:
_State_manager() : _Assoc_state(nullptr) { // construct with no associated asynchronous state object
_Get_only_once = false;
}
_State_manager(_Associated_state<_Ty>* _New_state, bool _Get_once)
: _Assoc_state(_New_state) { // construct with _New_state
_Get_only_once = _Get_once;
}
private:
_Associated_state<_Ty>* _Assoc_state;
bool _Get_only_once;
};
该数据成员对应的类型是_Associated_state<_Ty>*
,是一个指针类型。_Assoc_state
从类型的名上我们可以看出,该指针指向的是一个状态,该状态对象对应的就是promise
和future
之间的通道。_Ty
为异步操作返回值的类型,也就是说异步该状态对象内部保存并传递异步调用的返回值
下面,我们先看一下promise
的默认构造函数的代码。如下图所示,该构造函数比较简单,就是通过new
操作符构建了一个关联状态对象_Associated_state<_Ty>
,用这个关联对象的地址信息初始化内部的指针变量_Assoc_state
,完成promise
和通道channel
(即__assoc_state
实例)的链接
promise() : _MyPromise(new _Associated_state<_Ty>) {}
下面,我们看一下get_future
的源码实现,代码如下。首先在判断当前promise
中是否关联了关联状态对象实例(通过判断_State
是否为空),如果未关联则抛出异常。如果关联了关联状态对象,则通过该关联对象的地址信息来构建future
对象,返回一个链接该关联状态对象的future
对象实例
template <class _Ty>
class promise { // class that defines an asynchronous provider that holds a value
public:
_NODISCARD future<_Ty> get_future() {
return future<_Ty>(_MyPromise._Get_state_for_future(), _Nil());
}
}
template <class _Ty>
class _Promise {
public:
_State_manager<_Ty>& _Get_state_for_future() {
if (!_State.valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
}
if (_Future_retrieved) {
_Throw_future_error(make_error_code(future_errc::future_already_retrieved));
}
_Future_retrieved = true;
return _State;
}
};
下面,我们看一下set_value
的源码实现,代码如下。有两个版本的set_value
内部逻辑是一样的,区别在于前者是对应于左值版本,后者是对应于右值版本。下面我们以左值版本为例,代码中先判断了一下当前promise
实例是否链接了关联对象实例,如果未链接关联对象实例则抛出异常。如果关联了关联对象实例,则调用关联对象实例的set_value
成员方法,将_Val实参传入,完成将异步调用结果写入关联状态对象中(即所链接的通道中)
template <class _Ty>
class promise { // class that defines an asynchronous provider that holds a value
public:
void set_value(const _Ty& _Val) {
_MyPromise._Get_state_for_set()._Set_value(_Val, false);
}
void set_value_at_thread_exit(const _Ty& _Val) {
_MyPromise._Get_state_for_set()._Set_value(_Val, true);
}
void set_value(_Ty&& _Val) {
_MyPromise._Get_state_for_set()._Set_value(_STD forward<_Ty>(_Val), false);
}
};
template <class _Ty>
class _Promise {
public:
_State_manager<_Ty>& _Get_state_for_set() {
if (!_State.valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
}
return _State;
}
};
template <class _Ty>
class _State_manager {
// class for managing possibly non-existent associated asynchronous state object
public:
_State_manager() : _Assoc_state(nullptr) { // construct with no associated asynchronous state object
_Get_only_once = false;
}
void _Set_value(const _Ty& _Val, bool _Defer) { // store a result
if (!valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
}
_Assoc_state->_Set_value(_Val, _Defer);
}
void _Set_value(_Ty&& _Val, bool _Defer) { // store a result
if (!valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
}
_Assoc_state->_Set_value(_STD forward<_Ty>(_Val), _Defer);
}
private:
_Associated_state<_Ty>* _Assoc_state;
bool _Get_only_once;
};
2.2.std::future的源码实现
惯例首先我们看一下future
的数据成员,代码截图如下所示。它是继承_State_manager,而_State_manager正是类_Promise的成员变量。future的构造函数接收一个关联状态对象的引用,然后使用该应用信息来初始化基类_Mybase,完成当前future
到关联状态对象的链接
template <class _Ty>
class future : public _State_manager<_Ty> {
// class that defines a non-copyable asynchronous return object that holds a value
using _Mybase = _State_manager<_Ty>;
public:
future(const _Mybase& _State, _Nil) : _Mybase(_State, true) {}
};
template <class _Ty>
class _Promise {
private:
_State_manager<_Ty> _State;
bool _Future_retrieved;
};
通过promise
和future
类的内部成员,我们可以看出这两个类都是通过内部的_State
指针完成和通道的关联,这里的通道对应的实现就是_Associated_state
这个模版类
下面,我们看一下future
对象的get
函数的代码实现,代码截图如下所示:
template <class _Ty>
class future : public _State_manager<_Ty> {
future(future&& _Other) noexcept : _Mybase(_STD move(_Other), true) {}
_Ty get() {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move(*this)};
return _STD move(_Local._Get_value());
}
};
template <class _Ty>
class _State_manager {
_Ty& _Get_value() const {
if (!valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
}
return _Assoc_state->_Get_value(_Get_only_once);
}
_NODISCARD bool valid() const noexcept {
return _Assoc_state && !(_Get_only_once && _Assoc_state->_Already_retrieved());
}
};
- 首先创建一个名为
_Local
的临时对象,并且将this用std::move
给该对象,该对象是一个临时对象,将会在get
函数运行结束的时候进行释放;将当前this
的值赋值给临时变量_Local
,将this进行置空,完成future
对状态对象的链接的断开 - 接下来通过调用std::
move(_Local._Get_value())
来完成对关联状态中异步调用结果的获取,获取完成之后将该值作为get
函数的返回值进行返回。同时std::move(_Local._Get_value())
函数不仅仅是单纯的获取异步调用结果,同时还会判断是否有异步调用的值,如果异步调用没有完成,则会阻塞在move
函数中,等待异步调用完成
上面我们看到在调用get
方法之后,future
断开了和关联状态对象的链接,这说明future
对象只能调用一次get
方法来获取,如果多次调用,其内部_Assoc_state
将为空指针,则会因为对空指针调用move
方法,造成未定义行为。
2.3.关联状态对象的代码实现
1) 引用计数,用来保存类引用计数信息,通过该引用计数信息来实现自身对象生命周期的管理。用来跟踪链接到自身的promise
和future
对象的数量,当没有任何对象链接自身的时候,进行自身资源的释放。源码如下:
using _Atomic_counter_t = unsigned long;
#define _MT_INCR(x) _INTRIN_RELAXED(_InterlockedIncrement)(reinterpret_cast<volatile long*>(&x))
#define _MT_DECR(x) _INTRIN_ACQ_REL(_InterlockedDecrement)(reinterpret_cast<volatile long*>(&x))
template <class _Ty>
class _Associated_state { // class for managing associated synchronous state
public:
void _Retain() { // increment reference count
_MT_INCR(_Refs);
}
void _Release() { // decrement reference count and destroy when zero
if (_MT_DECR(_Refs) == 0) {
_Delete_this();
}
}
private:
_Atomic_counter_t _Refs;
};
2) promise的setValue和future的get采用的是条件变量来实现的,代码如下:
template <class _Ty>
class _Associated_state { // class for managing associated synchronous state
public:
using _State_type = _Ty;
using _Mydel = _Deleter_base<_Ty>;
public:
virtual void _Wait() { // wait for signal
unique_lock<mutex> _Lock(_Mtx);
_Maybe_run_deferred_function(_Lock);
while (!_Ready) {
_Cond.wait(_Lock);
}
}
struct _Test_ready { // wraps _Associated_state
_Test_ready(const _Associated_state* _St) : _State(_St) {}
bool operator()() const { // test state
return _State->_Ready != 0;
}
const _Associated_state* _State;
};
template <class _Rep, class _Per>
future_status _Wait_for(const chrono::duration<_Rep, _Per>& _Rel_time) { // wait for duration
unique_lock<mutex> _Lock(_Mtx);
if (_Has_deferred_function()) {
return future_status::deferred;
}
if (_Cond.wait_for(_Lock, _Rel_time, _Test_ready(this))) {
return future_status::ready;
}
return future_status::timeout;
}
template <class _Clock, class _Dur>
future_status _Wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time) { // wait until time point
unique_lock<mutex> _Lock(_Mtx);
if (_Has_deferred_function()) {
return future_status::deferred;
}
if (_Cond.wait_until(_Lock, _Abs_time, _Test_ready(this))) {
return future_status::ready;
}
return future_status::timeout;
}
virtual _Ty& _Get_value(bool _Get_only_once) {
unique_lock<mutex> _Lock(_Mtx);
if (_Get_only_once && _Retrieved) {
_Throw_future_error(make_error_code(future_errc::future_already_retrieved));
}
if (_Exception) {
_Rethrow_future_exception(_Exception);
}
_Retrieved = true;
_Maybe_run_deferred_function(_Lock);
while (!_Ready) {
_Cond.wait(_Lock);
}
if (_Exception) {
_Rethrow_future_exception(_Exception);
}
return _Result;
}
void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result
unique_lock<mutex> _Lock(_Mtx);
_Set_value_raw(_Val, &_Lock, _At_thread_exit);
}
void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock,
bool _At_thread_exit) { // store a result while inside a locked block
if (_Has_stored_result) {
_Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
}
_Result = _Val;
_Do_notify(_Lock, _At_thread_exit);
}
void _Set_value(_Ty&& _Val, bool _At_thread_exit) { // store a result
unique_lock<mutex> _Lock(_Mtx);
_Set_value_raw(_STD forward<_Ty>(_Val), &_Lock, _At_thread_exit);
}
void _Set_value_raw(_Ty&& _Val, unique_lock<mutex>* _Lock,
bool _At_thread_exit) { // store a result while inside a locked block
if (_Has_stored_result) {
_Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
}
_Result = _STD forward<_Ty>(_Val);
_Do_notify(_Lock, _At_thread_exit);
}
void _Set_value(bool _At_thread_exit) { // store a (void) result
unique_lock<mutex> _Lock(_Mtx);
_Set_value_raw(&_Lock, _At_thread_exit);
}
void _Set_value_raw(
unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a (void) result while inside a locked block
if (_Has_stored_result) {
_Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
}
_Do_notify(_Lock, _At_thread_exit);
}
public:
mutex _Mtx;
condition_variable _Cond;
private:
virtual void _Do_notify(unique_lock<mutex>* _Lock, bool _At_thread_exit) { // notify waiting threads
// TRANSITION, ABI: This is virtual, but never overridden.
_Has_stored_result = true;
if (_At_thread_exit) { // notify at thread exit
_Cond._Register(*_Lock, &_Ready);
} else { // notify immediately
_Ready = true;
_Cond.notify_all();
}
}
};
3.整个类图
几个类的关系图如下:
4.future和promise之间的并发安全和线程同步
从上面讨论我们知道,异步操作创建方和异步操作执行方都是是通过其各自的future/promise
对象共同链接的关联状态对象进行同步和信息传递的
通过上面对于关联状态对象的讨论,我们可以看出其API
的内部是通过互斥量对内部状态进行保护,从而实现了线程安全。通过条件变量来实现的线程同步。并且一个关联状态对象在一个时刻只能被一个future
和一个promise
所链接,如果被多个链接则会抛出异常。
5.总结
由于 std::future
的实现依赖于标准库的具体实现(如 libstdc++、libc++ 等),并且这些实现可能较为复杂且高度优化,因此直接查看和总结其源码可能相当繁琐。这个只是以一种比较简单的方式大概讲解它们的原理,起到了抛砖引玉的作用。