前言
本文是c++协程第一篇,主要是让大家对协程的定义,以及协程的执行流有一个初步的认识,后面还会出两篇对协程的高阶封装。
在开始正式开始协程之前,请务必记住,c++协程 不是挂起当前协程,转而执行其他协程,待 其他协程完成后在恢复当前协程,而是挂起当前协程,转而执行挂起点(awaiter)任务,待该任务结束后,恢复当前协程执行流。话不多说,下面我们直接开始看代码。
看这篇文章之前最好先看下c++20协程的官方手册
创建一个基础的协程
接下来我们创建一个最基本协程
code
#include <coroutine>
#include <future>
#include <chrono>
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <memory>
#include <vector>
struct async_task_base
{
virtual void completed() = 0;
virtual void resume() = 0;
};
std::mutex m;
std::vector<std::shared_ptr<async_task_base>> g_resume_queue; //原来的 eventloop队列
std::vector<std::shared_ptr<async_task_base>> g_work_queue; //执行耗时操作线程队列
template <typename T>
struct AsyncAwaiter;
using namespace std::chrono_literals;
struct suspend_always
{
bool await_ready() const noexcept {
try
{
std::cout << "suspend_always::await_ready" << std::endl;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
return false;
}
void await_suspend(std::coroutine_handle<> handle) const noexcept {
try
{
std::cout << "suspend_always::await_suspend" << std::endl;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
}
void await_resume() const noexcept {
try
{
std::cout << "suspend_always::await_resume" << std::endl;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
}
};
struct suspend_never
{
bool await_ready() const noexcept {
try
{
std::cout << "suspend_never::await_ready" << std::endl;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
return true;
}
void await_suspend(std::coroutine_handle<> handle) const noexcept {
try
{
std::cout << "suspend_never::await_suspend" << std::endl;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
}
void await_resume() const noexcept {
try
{
std::cout << "suspend_never::await_resume" << std::endl;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
}
};
struct Result {
struct promise_type {
promise_type(){
std::cout << "promise_type" << std::endl;
}
~promise_type(){
std::cout << "~promise_type" << std::endl;
}
suspend_never initial_suspend() {
std::cout << "initial_suspend" << std::endl;
return {};
}
suspend_never final_suspend() noexcept {
std::cout << "final_suspend" << std::endl;
return {};
}
Result get_return_object() {
std::cout << "get_return_object" << std::endl;
return {};
}
void return_void() {
std::cout << "return_void" << std::endl;
}
// void return_value(int value) {
// }
void unhandled_exception() {
}
};
};
template <typename ReturnType>
struct AsyncThread
{
using return_type = ReturnType;
AsyncThread(std::function<return_type ()>&& func): func_(func){
}
std::function<return_type ()> func_;
};
template <typename ReturnType>
struct async_task: public async_task_base{
async_task(AsyncAwaiter<ReturnType> &awaiter)
:owner_(awaiter)
{
}
void completed() override{
std::cout << "async_task :: completed ############" << std::endl;
ReturnType result = owner_.func_();
owner_.value_ = result;
}
void resume() override{
std::cout << "async_task :: resume ############" << std::endl;
owner_.h_.resume();
}
AsyncAwaiter<ReturnType> &owner_ ;
};
template <typename ReturnType>
struct AsyncAwaiter
{
using return_type = ReturnType;
AsyncAwaiter(AsyncThread<ReturnType>& info){
// std::cout<< " AsyncAwaiter(AsyncThread<ReturnType>& info)" << std::endl;
value_ = return_type{};
func_ = info.func_;
}
// 该awaite直接挂起
bool await_ready() const noexcept {
return false;
}
void await_suspend(std::coroutine_handle<> h) {
h_ = h;
std::lock_guard<std::mutex> g(m);
g_work_queue.emplace_back(std::shared_ptr<async_task_base>( new async_task<uint64_t>(*this)));
}
return_type await_resume() const noexcept {
// std::cout<< "AsyncAwaiter::await_resume" << std::endl;
return value_;
}
std::function<return_type ()> func_;
std::coroutine_handle<> h_;
return_type value_ = return_type();
};
template<typename T>
inline AsyncAwaiter<T> operator co_await(AsyncThread<T>&& info)
{
return AsyncAwaiter(info);
}
template <typename ReturnType>
AsyncThread<ReturnType> do_slow_work(std::function< ReturnType () > &&func){
return AsyncThread<ReturnType>(std::forward< std::function< ReturnType () > >(func));
}
Result Coroutine() {
int a = 1;
auto func =[&]() -> uint64_t{
// std::cout<< "do a slow work !!!!!!!!!!!!!!!!!!!!!" << std::endl;
return a;
};
uint64_t result = co_await do_slow_work<uint64_t>(func);
std::cout << "@@@@@@@@@ result1 is : " << result << std::endl;
a = 2;
result = co_await do_slow_work<uint64_t>(func);
std::cout << "@@@@@@@@@ result2 is : " << result << std::endl;
a = 3;
result = co_await do_slow_work<uint64_t>(func);
std::cout << "@@@@@@@@@ result3 is : " << result << std::endl;
co_return;
};
void do_work() {
while (1)
{
// 加锁
// std::cout << "void do_work() " << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(1)); //!!!!!还有这个加锁要在锁钱前不然,让出cpu后,由于还没有解锁,又会被其他线程再拿到锁,这样就死锁了
std::lock_guard<std::mutex> g(m);
// std::cout << " g_work_queue size " << g_resume_queue.size() << std::endl;
for(auto task : g_work_queue){
task->completed();
g_resume_queue.push_back(task);
}
// g_resume_queue.assign(g_work_queue.begin(), g_work_queue.end()); //!!!!!!!这里有个大坑坑查了好久,如果连续两次先进来这里,会把g_raw_work_queue中的元素给清理掉,导致后面无法恢复
g_work_queue.clear();
// std::cout << " g_resume_queue size " << g_resume_queue.size() << std::endl;
}
}
void run_event_loop(){
std::vector<std::shared_ptr<async_task_base>> g_raw_work_queue_tmp;
while(1){
g_raw_work_queue_tmp.clear();
// std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> g(m);
// for(auto &task : g_resume_queue){
// task->resume();
// }
g_raw_work_queue_tmp.swap(g_resume_queue);
}
for(auto &task : g_raw_work_queue_tmp){
task->resume();
}
}
}
void test_func(){
Coroutine();
}
int main(){
test_func();
std::thread work_thread(do_work);
run_event_loop();
}
代码分析
在上述代码中我们可以看到有下面几个类
awaiter(等待体):suspend_always,suspend_never,Awaiter。前两个是标准库中已经定义好的等待体,suspend_always为挂起当前协程,suspend_never不挂起当前协程,Awaiter是我们自定义的。观察这三个类,我们不难发现都有三个成员函数await_ready,await_suspend,await_resume,所以只要有这三个成员函数的类,都是等待体,都可以通过co_await挂起协程,这里我们还知道了另外一个知识点,即co_await的操作数是awaiter
coroutine(协程):Result,promise_type,到这里了我们引出一个知识点c++中怎么定义一个协程?当一个函数的返回值的类型中有promise_type这个类,就是协程。那么promise_type和Result之间的关系是什么呢?Result只是一个外壳,由于这个类中有个类型叫promise_type,在这个函数调用时,编译器就会为之生成一些协程代码。promise_type 就是编译器实际操作的对象,这个类中必须包含下面几个函数供编译器调用:initial_suspend ,final_suspend,get_return_object,return_void或return_value。这几个函数的调用时机如下:调用一个协程函数时,首先会调用get_return_object 返回协程对象Result,这个函数只会调用一次,无论是否使用co_await;然后会执行initial_suspend,然后继续执行该协程函数体,直到co_return,这时就会调用return_void或者return_value,将完成后的值作为返回值返回给co_await前的对象;最后会执行final_suspend至此一个协程生命周期结束,释放promise_type对象。
协程基本流程初步认识
接下来我们我们运行下上面的代码看看具体流程
接下来我们描述下上面的过程吧
1.在main函数中调用Coroutine这个协程函数
2.调用get_return_object 返回一个一个Result对象
3.调用initial_suspend返回等待体suspend_never,
4.由于suspend_never的await_ready返回为true,则不调用await_suspend,直接调用await_resume
5.接下来运行到co_await Awaiter
6. Awaiter 的await_ready返回false,意味这需要在当前位置挂起协程
7.执行到Awaiter::await_suspend函数体,让该协程句柄在2s后恢复执行
8.恢复执行后,调用await_resume,将Awaiter中的值返回,一般情况下该值是某些耗时操作完成后的值
9.执行到co_return调用return_void函数
10.最后执行到final_suspend,
11.final_suspend返回的等待体的 是suspend_always ,由于 await_ready返回 false,则 进入await_suspend,挂起当前协程,由于挂起了当前协程,所以没有销毁promise_type对象,这回造成内存泄漏。
接下来修改final_suspend协程执行完不挂起
修改如下
得到结果我们发现promise_type对象被释放了
这是你或许会有一个问题?为什么要在协程执行结束挂起。这个问题其实也一种困扰着我, 在网上看到最多的就是说:去销毁一些资源,但是没有一个能够举出例子要销毁那些?别的地方不能销毁吗?所以我不是很认可。直到看到一篇博客,说到这是一种规范,如果不挂起的话,你没法通过coroutine_handle.done得知协程状态,这是一种规范,交给程序员自行销毁,更规范些,这我才觉得稍微合理些。
我们再修改下
运行结果如下:
这里我们发现在协程体内,只有co_await一个awaiter,才会走await_ready后续的流程,但是initial_suspend和final_suspend都会自动被co_await。我们一般会选择initial_suspend不挂起,final_suspend挂起。