最后一章专门介绍了一项对系统编程非常有用的功能,但这在C++标准中相对较新。协程对象迅速找到了应用,成为了一等公民的状态机对象。它们的强大之处在于隐藏了协程帧后面的逻辑。请注意,这是一个高级主题,且C++的协程接口既不简单也不易用。它设计得很周到,但与其他编程语言相比,绝对不是最用户友好的。
在本章中,你将学习使用此功能的基础知识。如果你是新手,那么你将花费一些时间来理解其要求。如果你之前在其他编程语言中有使用协程的经验,那么你会更容易上手。尽管如此,我们还是会在本章中提出在系统编程中应用它们的建议。
我们将展示两个之前示例的实际解决方案,涉及网络编程和共享内存。你将立即看到例程的可预测性和清晰的执行路径。我们希望你对无需使用同步原语就能并发执行的方式感到印象深刻。直接在现实环境中重用是可能的;只需确保你拥有所需的编译器,因为该功能仍然较新。不多说了,让我们进入最后的主题。
在本章中,我们将涵盖以下主要话题:
- 介绍协程
- C++中的网络编程和协程
- 通过C++中的协程重新审视共享内存问题
- 对协程及其在C++中的实现的最终思考
技术要求
为了运行代码示例,你必须准备以下内容:
- 一个能够编译和执行C++20的基于Linux的系统(例如,Linux Mint 21)
- GCC12.2编译器 - https://gcc.gnu.org/git/gcc.git gcc-source:
- 使用
-fcoroutines
、-std=c++2a
、-lpthread
和-lrt
标志
- 使用
- 对于某些示例,你也可以选择使用https://godbolt.org/。
介绍协程
一个进程就是一个程序的运行实例。它有自己的地址空间,除了通过共享内存,不与其他进程共享。线程存在于进程中,它们不能脱离进程存在,尽管在Linux中,进程和线程都被视为任务。它们以相同的方式被调度,并且在内核级别有相同的控制结构。尽管如此,线程被认为是轻量级的,因为程序的初始负载的较大开销由父进程承担。
但这并不是完整的情况。还有纤程和协程。如果说进程和线程是真正的并发并且在共享资源上并行工作,纤程就像线程,但不符合并发。虽然线程通常依赖于任务调度器的抢占式时间分片,纤程使用协作式多任务处理。也就是说,它们在执行过程中自己让出控制权,以运行另一个纤程。它们也被称为有栈协程。与此同时,C++中的协程被称为无栈协程,不由操作系统管理。换句话说,有栈协程可以在嵌套的栈帧中被挂起,而无栈协程只能通过顶级例程嵌套。
这两种设施被认为是隐式同步的,因此前几章的所有同步原语和原子结构都是不必要的。但你可以想象之前的例子,读取文件系统时,操作系统等待文件被打开,然后通知调用者进程继续其工作。想象纤程和协程正是用于这种反应式访问的,它不需要额外的CPU处理。实际上,网络和文件系统是认为纤程和协程最有价值的领域。当发出请求时,一个纤程将控制权交给主线程,当I/O操作完成时,纤程继续从它让出的地方继续。
协程技术相当古老。C++最近才引入它,它对于网络编程、I/O操作、事件管理等非常有用。协程也被认为是具有暂停能力的执行。尽管如此,它们以协作方式提供多任务处理,并不并行工作。这意味着任务不能同时执行。同时,它们是实时友好的,允许在协程之间快速切换上下文,不需要系统调用。事实上,它们对硬实时操作系统友好,因为执行顺序和调度由系统程序员控制,正如你稍后将在本章中看到的。C++中的协程非常适用于实现任务图和状态机等。
你们中的一些人可能想知道协程和标准单线程函数式编程之间的区别。嗯,后者被认为是同步方法,而前者是具有同步可读性的异步方法。但协程真正关注的是减少不必要的(忙碌的)等待,并在准备所需资源或调用时做一些有用的事情。以下简图虽然简单,但提醒我们同步和异步执行之间的相应区别。
图10.1 - 同步与异步应用程序执行
普通的单线程执行在某些方面也是有限的。首先,程序内部无法追踪调用、挂起或恢复函数,或者至少不能通过引用追踪。换句话说,控制流在后台发生且是隐式的。此外,控制流有一个严格的方向 - 函数要么返回到其调用者,要么继续向内调用另一个函数。每个函数调用在栈上创建一个新记录,并立即发生,一旦调用,方法不能被延迟。一旦该函数返回,其在栈上的部分就被清除,无法恢复。换言之,激活是无法追踪的。
另一方面,协程拥有自己的生命周期。协程是一个对象,可以明确地被引用。如果协程应该比其调用者存活更久,或者应该被转移给另一个对象,则可以将其存储在堆中。同时,控制权可以在协程之间双向传递 - 向上或向下。协程增加了函数调用和函数类型的含义。int func(int arg)
原型意味着一个名为func
的函数,接收一个整数类型的参数arg
,返回一个整数。类似的协程可能永远不会返回到其调用者,而调用者期望的值可能由另一个协程产生。让我们看看C++中是如何发生的。
C++中的协程设施
最初,你可以将它们视为智能指针。你已经知道它们是指针的包装器,并为内存管理提供额外的控制。协程以类似的方式工作,但围绕它们的代码更复杂。这次,我们需要一个函数原型的包装器。这个包装器将处理数据流和调度控制。包装器本身就是协程。我们定义了一个Task exCoroutine()
任务(任务与Linux定义的任务不同) - 如果它使用以下三个操作符之一:co_await
、co_yield
或co_return
,则被解释为协程。这里是一个例子:
#include <coroutine>
...
Task exCoroutine() {
co_return;
}
int main() { Task async_task = exCoroutine(); }
包装器类型当前是Task
。它在调用者级别上是已知的。通过co_return
操作符,协程对象被识别为exCoroutine()
函数。创建Task
类是系统程序员的工作。它不是标准库的一部分。那么Task
类是什么?
struct Task {
struct promise_type {
Task get_return_object()
{ return {}; }
std::suspend_never initial_suspend()
{ return {}; }
std::suspend_never final_suspend() noexcept
{ return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
重要提示
这是一个非常通用的模式,几乎在每个协程示例中都会使用。你最初应该在https://en.cppreference.com/w/cpp/language/coroutines上参考它。
我们称执行给定例程但不返回值的协程为任务。此外,协程与promise
对象相关联 。promise
对象在协程级别上被操纵。协程通过这个对象返回操作结果或引发异常。这个设施还需要协程帧(或协程状态),这是一个在堆上的内部对象,包含promise
。它还由传递的参数组成 - 通过值复制,当前调用引用的表示;暂停点,以便协程相应地恢复;以及该点范围之外的局部变量。那么,我们的代码做了什么?嗯,从用户的角度来看,它什么也没做,但在后台发生了很多事情。让我们观察以下图表:
图10.2 - 协程启动的简单演示
记住,按值传递的参数在协程的作用域内被复制或移动,而按引用传递的参数保持为引用。这意味着程序员应该考虑它们在任务调用者中的生命周期,以避免出现悬挂指针。之后,构造promise
并调用get_return_object()
。当协程首次挂起时,结果将返回给任务调用者。
图10.2 展示了一个promise
返回suspend_always
并且我们懒惰地启动了一个协程的情况。initial_suspend()
操作恢复,而不知道或不了解如何继续,协程将永远不会被恢复,并且会泄露。为了处理这个问题,我们需要…一个handle
对象。你可以将handle
对象视为一个视图。类似于string_view
对象与string
对象之间的关系,或者vector
对象与具有range view
对象的range
对象之间的关系,handle
对象用于提供对*this
的间接访问。通过handle
对象,我们可以调用resume()
来继续协程的工作。它必须首先被挂起,否则行为将是未定义的:
图10.3 - 展示协程创建和恢复的图表
通过initial_suspend()
操作调用,结果通过co_await
处理。这是通过编译器在背景中围绕suspend_never
awaitable 生成额外代码来完成的——协程不是像使用suspend_always
那样以懒惰方式创建的,而是立即启动的。这两者都在C++标准库中定义。
当前协程使用co_return
关键字(在exCoroutine()
中)。但这样,协程体就退出了。如果我们想使用它来持续生成新值或下一个生成的值,那么我们需要co_yield
操作符。我们称这样的协程为生成器。我们可以将co_yield
操作符表达为co_await promise.yield_value(<某些表达式>)
。否则,如果它只是调用co_await
,它就是一个任务,如前所述。现在,如果我们再次看图10.3,使用co_yield
操作符将会将箭头从线程调用者控制重定向到协程执行,从而为协程提供继续工作的机会。换句话说,co_return
关键字将导致执行完成,而co_yield
关键字将只是暂时挂起协程。
让我们退一步,看一下awaitable和co_await
调用。它们的工作在以下图表中展示:
图10.4 - 展示co_await
调用后生成的调用的图表
现在,使用Handle
类型的私有变量来调用真正的resume()
函数。让我们检查代码:
using namespace std;
struct Task {
struct promise_type {
using Handle = coroutine_handle<promise_type>;
Task get_return_object() {
return Task { Handle::from_promise(*this) };
}
...
我们将使用explicit
说明符。在C++ 20中,它允许您对构造函数调用更加严格。也就是说,它不能用于复制初始化或隐式转换。此外,我们将handle
对象保持为私有。现在,让我们看看这如何派上用场(标记{1}和{2},同时提供给调用者一个包装器 - 标记{1}和{3}):
explicit Task (promise_type::Handle crtHdnl) :
crtHandle(crtHdnl) {}
void resume() { crtHandle.resume(); } // {1}
private:
promise_type::Handle crtHandle; // {2}
...
auto async_task = exCoroutine();
async_task.resume(); // {3}
让我们使用这个代码结构来构建一个完整功能的示例。我们将重命名Task
结构为Generator
,并实现一个具有生成器功能的协程。
我们将通过协程增加一个变量N次。这就是为什么它需要能够产生,并且我们添加以下内容到Generator
:
...
suspend_always yield_value(auto value) {
currValue = value;
return {};
}
...
uint32_t currValue;
};
然后,获取下一个元素的过程如下:
int next() {
crtHndl.resume();
return crtHndl.promise().currValue; } ...
继续协程主体及其在主线程中的创建。增量将发生100,000次。这个示例允许程序员懒惰地生成数据,不占用大量RAM。同时,没有使用单独的线程,因此执行保持在用户空间中,没有过多的上下文切换:
Generator exCoroutine() {
auto idx = 0;
for (;;) {
co_yield idx++;
}
}
int main() {
auto crt = exCoroutine();
for (auto idx = 1; (idx = crt.next()) <= 100000; )
cout << idx << " ";
cout << endl;
return 0;
}
输出的简化版本如下:
1 2 3 4 ... 100000
不幸的是,你可能已经明白为什么在C++中创建一个简单的协程应用程序并不是那么简单。作为一个新功能,这个设施持续改进,并且在即将到来的C++版本中有新的接口预期,这应该简化协程的使用。但这不应该阻止你继续使用它们。这个示例可以轻松扩展到其他功能,并且您可以一步一步地构建您的知识。在接下来的章节中,我们将确切地这样做,并将讨论重新带回系统编程领域。
C++中的网络编程和协程
你学习了TCP和UDP通信协议。我们讨论了它们在网络数据传输中的用途,但协程使它们更加强大。正如提到的,代码将以异步方式运行,但我们可以控制调度。协程在上下文切换方面将更高效,因为它们在用户级别执行。我们将继续使用Generator
定义以匹配前面讨论的协程类型。传统上,该对象被制作为仅移动的——这允许我们限制对协程包装器的使用,但在一般情况下,协程对象是不可复制和不可移动的,因为协程框架是它们的一部分,一些局部变量可能是对其他局部变量的引用或指针。因此,让我们相应地扩展结构:
重要说明
这再次是一个在几乎每个协程示例中使用的非常通用的模式。你应该最初参考cppreference。
template<typename T> struct Generator {
Generator(const Generator&) = delete;
Generator& operator = (const Generator&) = delete;
Generator(Generator&& other) noexcept :
c_routine(other.c_routine) {
other.c_routine = {};
}
你会注意到struct
对象被定义为template
以便通用。我们重载()
操作符以便能够适当地将控制权交还给调用者:
Generator& operator = (Generator&& other) noexcept {
if (this == &other)
return *this;
if (c_routine)
c_routine.destroy();
c_routine = other.c_routine;
other.c_routine = {};
return *this;
}
optional<T> operator()() {
c_routine.resume();
if (c_routine.done()) {
return nullopt;
}
return c_routine.promise().currValue;
}
我们还增加了异常期间的行为——应用程序将被终止:
void unhandled_exception() {
exit(EXIT_FAILURE);
}
在主线程中,我们创建并加入两个线程——一个服务器和一个客户端。每个都将执行各自领域的协程。我们提供了一个UDP示例,以便代码更短,但TCP的方法类似,最初,我们创建socket并设置它。之后,我们继续进行其绑定和发送者的实际协程构建。不时地,结果将被打印出来。目前,它将是通过UDP socket
发送的字节数(以下代码中的标记{9}):
auto sockfd = 0;
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
const auto ecode{ make_error_code(errc{errno}) };
cerr << "Error opening shm region";
system_error exception{ ecode };
throw exception;
}
auto server = jthread([&sockfd] {
struct sockaddr_in servaddr = { 0 };
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
if (bind(sockfd,
(const struct sockaddr*)&servaddr,
sizeof(struct sockaddr_in)) < 0) {
perror("Bind failed");
exit(EXIT_FAILURE);
}
cout << "\nsend_to():\n";
string_view message{ "This is a test!" };
auto sender = send_to(sockfd, message,
servaddr);// {9}
在协程函数内部,我们调用sendto()
方法。我们使用string_view
对象,就像我们在[第3章]中所做的那样——原因主要是代码的安全性和数据及其大小的紧凑性。在循环的最后,我们使用co_yield value
,从而向主线程提供发送的字节数。无尽的循环允许协程运行,直到被外部逻辑真正取消——在这种情况下,它被调用了10次,因为主线程中的for
循环(以下代码中的标记{10}):
for (int i = 1; i <= 10; i++) {
auto sentData = sender();
cout << i << " Bytes sent: "
<< *sentData << endl; // {10}
}
});
客户端线程以类似的方式实现:
auto client = jthread([&sockfd] {
cout << "\nrecv_from():\n" << endl;
struct sockaddr_in clntaddr = { 0 };
auto receiver = recv_from(sockfd, clntaddr);
for (auto i = 1; i <= 10; i++) {
auto recvData = receiver();
cout << i << " Message received: "
<< *recvData << endl; // {11}
}
});
server.join(); client.join();
close(sockfd); return 0;
}
服务器端协程的主体如下:
Generator<size_t> send_to(int sockfd,
string_view buffer,
auto servaddr) noexcept {
for (;;) {
auto value = sendto(sockfd,
buffer.data(),
buffer.size(),
MSG_DONTWAIT,
(const struct sockaddr*)
&servaddr,
sizeof(servaddr));
co_yield value;
}
}
客户端协程以类似方式实现:
Generator<string> recv_from(int sockfd,
auto clntaddr,
size_t buf_size =
BUF_SIZE) noexcept {
socklen_t len = sizeof(struct sockaddr_in);
array<char, BUF_SIZE> tmp_buf = {};
协程函数调用recvfrom()
系统调用。在最后,而不是接收到的字节,来自socket的消息存储在currValue
成员变量中。然后在主线程中打印出来。我们还使用了MSG_DONTWAIT
标志。相应的输出每次以不同的方式打印出来,因为代码是异步的。最后一部分如预期:
for (;;) {
recvfrom(sockfd,
tmp_buf.data(),
tmp_buf.size(),
MSG_DONTWAIT,
(struct sockaddr*)&clntaddr,
&len);
co_yield tmp_buf.data();
}
文本的合并或错放是可以预期的,但它证明了协程的可用性。输出的简化版本如下:
send_to():
1 Bytes sent: 15
...
10 Bytes sent: 15
recv_from():
1 Message received: This is a test!
...
10 Message received: This is a test!
在上一章中,我们也有同步并行线程的问题,但代码并不是每次都真正并行。例如,等待“资源可访问”这样的事件是并发问题,而不是并行执行。话虽如此,协程在共享内存问题中也是一个强大的工具——让我们在下一节中检查一下。
通过协程重新审视C++中的共享内存问题
我们在使用条件变量时遇到的一个问题是在进程启动期间的同步问题。换句话说,对于生产者-消费者示例,我们不知道哪个线程会先执行。我们通过条件变量同步代码——它的互斥锁,以及一个谓词来处理事件的正确顺序。否则,我们可能会冒丢失信息或陷入死锁的风险。为本书的示例准备工作的大部分时间,我们都遇到了这种情况,这使得编写体验更加出色。但协程提供了另一种做法,有时可能更有效率且更易于使用(在你习惯了协程的界面后,因为它并不容易掌握)。
下一个示例是由awaitable-awaiter模式激发的。它类似于条件变量,但不使用此类同步原语。不过,通知信号依赖于原子变量。我们将回到Task协程。它将用于处理接收端。
重要说明
该示例受https://www.modernescpp.com/index.php/c-20-thread-synchronization-with-coroutines/的启发。
共享内存示例代码:
template<typename T, typename N>
Task receiver(Event& event, int fd, N size) {
co_await event;
ftruncate(fd, size);
我们首先对共享内存进行对齐并设置其大小,然后继续映射指向它的指针:
if (const auto ptr = mmap(0, size,
PROT_RW, MAP_SHARED,
fd, 0); ptr != MAP_FAILED) {
auto* obj = static_cast<T*>(ptr);
auto del = mmap_deallocator<T>(size);
auto res =
unique_ptr<T, mmap_deallocator<T>>(obj, del);
if (res != nullptr)
cout << "Receiver: " << *res << endl;
}
else {
cerr << "Error mapping shm region";
} }
确保res
的地址在协程内部可用于解引用非常重要。否则,代码将以Segmentation fault
崩溃,这比悬空指针更可取。另一个备注是,不同的编译器(或环境)将给你不同的代码行为。在我们到达Event
结构之前,让我们看看发送者做了什么——再次,我们依赖我们之前的代码:
template<typename T, typename N>
void Event::notify(T buffer, int fd, N size) noexcept {
notified = false;
auto* waiter =
static_cast<Awaiter*>(suspended.load());
if (waiter != nullptr) {
ftruncate(fd, size);
再次,我们确保共享内存具有正确的大小并映射指向它的指针:
if (const auto ptr = mmap(0, size,
PROT_RW, MAP_SHARED,
fd, 0);
ptr != MAP_FAILED) {
auto* obj = new (ptr) T(buffer);
auto del = mmap_deallocator<T>(size);
auto res =
unique_ptr<T, mmap_deallocator<T>>
(obj, del);
}
else {
cerr << "Error mapping shm region";
}
waiter->coroutineHandle.resume();
}
}
最初,通知标志设置为false
,意味着协程不会像常规函数那样行为,而是会被挂起。然后,加载waiter
对象,它是nullptr
,因为它之前没有设置。其相应的resume()
操作未被调用。随后执行的await_suspend()
函数获取协程句柄并存储它,以便稍后在协程恢复时调用。最重要的是,waiter
状态存储在suspended
成员变量中。稍后,notify()
被触发,并完全执行:
bool
Event::Awaiter::await_suspend(coroutine_handle<> handle)
noexcept {
coroutineHandle = handle;
if (event.notified) return false;
event.suspended.store(this);
return true;
}
在主线程中,需要一个Event
对象来同步工作流程。还定义了一个共享内存区域。如果在每个协程中调用shm_open()
,它实际上不会是共享的虚拟内存,因为文件描述符将访问每个协程的私有区域。因此,我们将最终得到Segmentation fault
。有两个线程,分别代表发送端和接收端。上述协程在线程加入后分别被调用:
Event event{};
int fd = shm_open(SHM_ID, O_CREAT | O_RDWR, 0644);
auto senderT = jthread([&event, &fd]{
event.notify<const char*, size_t>(message.data(),
fd,
message.size());
});
接收者的代码类似,但将event
对象作为参数传递:
auto receiverT = jthread([&event, &fd]{
receiver<char*, size_t>(ref(event),
fd, (message.size())); });
输出结果如下:
This is a testing message!
这个示例为您提供了以并发方式管理共享资源的灵活性。awaitable-awaitable的通知机制将完成任务,无需同步原语。我们鼓励您自己尝试一下。与此同时,我们将继续进行有关在系统编程中使用协程的一些最后说明。
协程及其在C++中的实现的最终思考
前面的示例虽然实用,但并不简单。它们有助于理解协程执行可能采取的顺序。虽然可视化协程的状态图是有益的,但我们仍然认为这对于经验不足的开发者来说可能会令人困惑。
如前所述,图10.2、图10.3和图10.4几乎涵盖了我们通过代码示例已经解释的内容。理解围绕协程及其成员生成的额外逻辑量是有用的。其中大部分在后台发生,系统程序员只是安排调度。在本章的示例中,我们通过promise
对象和awaitables来实现这一点。前述图表部分代表协程执行为有限状态机的事实应该提示您,这是协程有用的另一个应用。它们将状态机转化为一等对象。一旦定义了协程框架,大部分逻辑就留在那里,并对调用者隐藏。这为系统程序员提供了暂时搁置并发逻辑并只专注于通过短代码片段调用协程的机会,正如我们所做的那样。系统行为代码和任务调度将更简单、更明显。因此,管理算法、解析器、数据结构遍历、轮询等的许多能力可以通过这种技术来解释。不幸的是,我们无法在这里涵盖所有内容,但我们认为值得检查这些内容。
最后但同样重要的是,我们想强调协程对于这种语言来说相对较新。由于C++中的协程接口仍然缺乏舒适性和简单性,您可以在互联网上找到许多定制的协程库。我们建议您只依赖可信赖的库,或者等待这个设施的下一个标准特性。应用这些比自己重新实现它们更有意义。正如您所见,这是一个相当复杂的概念,目前正在对此进行大量研究。对于好奇的读者,我们鼓励您花一些时间了解C++中协程的演变,特别是近年来的情况。C++标准中讨论了三种技术——Coroutines TS、Core Coroutines和Resumable expressions。尽管目前标准中只使用了其中一种,但这三种都值得关注。Geoffrey Romer、Gor Nishanov、Lewis Baker和Mihail Mihailov在这里进行了很好的总结和分析:https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1493r0.pdf。