返回目录
5.3.5.5 Event-Driven vs Polling-Based access
ara::com实现完全支持事件驱动和轮询的方式来访问新数据。
对于轮询方式,典型的用例是,一个应用程序被周期性地触发并在特定的截止时间前进行一些处理。这是调节器/控制算法的典型模式 —— 循环激活由一个实时定时器驱动,以确保最小的抖动。在这样的设置中,在每个激活周期中调用*GetNewSamples(),然后将那些更新的缓存数据用作当前处理迭代的输入。此时,应用程序在调度处理算法时获取最新的数据进行处理就完全足够了。如果ara::com实现在任何有新数据可用的时通知你的应用程序,那将适得其反:这只会意味着对你的应用程序进程进行不必要的上下文切换,因为在你收到通知的时候,你可能并不想处理那些新数据,因为还没到处理它的时候。
然而,也有其他用例。如果你的应用程序没有这样一种周期性的、由截止时间驱动的方法,而是应当在某些事件发生时简单地做出反应,那么设置周期性调用*GetNewSamples()来轮询新事件就有点不合适并且效率低下。在这种情况下,你明确希望ara::com实现通知应用程序,从而对应用程序进程进行异步上下文切换。我们通过以下 API 机制来支持这种方式:
ara::core::Result<void> SetReceiveHandler(ara::com::EventReceiveHandler handler);
这个 API允许你注册一个用户定义的回调函数,当自上次调用 GetNewSamples()以来有新的事件数据可用时,通信管理必须调用这个回调函数。注册的回调函数不需要是可重入的,因为ara::com实现必须序列化注册的回调函数,明确允许从注册的回调函数内部调用GetNewSamples()!
请注意,用户可以随时在事件驱动和轮询方法之间切换,因为他也可以使用事件包装类提供的UnsetReceiveHandler()方法撤回用户特定的 “接收处理程序”。
以下简短的代码片段是一个关于在服务消费者如何使用事件的简单示例。在这个示例中,在main函数中创建了一个RadarService类型的Proxy实例,并注册了一个接收处理程序,每当接收到新的BrakeEvent事件时,ara::com实现就会调用这个处理程序。这意味着我们使用了“事件驱动” 方法。
在我们的示例中,我们在接收处理程序中用新接收到的事件更新我们的本地缓存,从而过滤掉所有不满足特定属性的BrakeEvent事件,之后,我们调用一个处理函数,该函数处理我们决定保留的Sample。
#include "RadarServiceProxy.hpp"
#include <memory>
#include <deque>
using namespace com::mycompany::division::radarservice;
using namespace ara::com;
/**
* our radar proxy - initially the unique ptr is invalid.
*/
std::unique_ptr<proxy::RadarServiceProxy> myRadarProxy;
/**
* a storage for BrakeEvent samples in fifo style
*/
std::deque<SamplePtr<const proxy::events::BrakeEvent::SampleType>> stNActiveSamples;
/**
* \brief application function, which processes current set of BrakeEvent
* samples.
* \param samples
*/
void processLastBrakeEvents(
std::deque<SamplePtr<const proxy::events::BrakeEvent::SampleType>>&mples)
{
// do whatever with those BrakeEvent samples ...
}
/**
* \brief event reception handler for BrakeEvent events, which we register
get informed about new events.
*/
void handleBrakeEventReception()
{
/**
* we get newly arrived BrakeEvent events into our process space.
* For each sample we get passed in, we check for a certain property
* "active" and if it fulfills the check, we move it into our Last10-
orage.
* So this few lines basically implement filtering and a LastN policy.
*/
myRadarProxy->BrakeEvent.GetNewSamples(
[](SamplePtr<proxy::events::BrakeEvent::SampleType> samplePtr)
{
if(samplePtr->active)
{
lastNActiveSamples.push_back(std::move(samplePtr));
if (lastNActiveSamples.size() > 10)
lastNActiveSamples.pop_front();
}
});
// ... now process those samples ...
processLastBrakeEvents(lastNActiveSamples);
}
int main(int argc, char** argv)
{
/* Instance Specifier from model */
ara::core::InstanceSpecifier instspec {...}
auto handles = proxy::RadarServiceProxy::FindService(instspec);
if (!handles.empty())
{
/* we have at least one valid handle - we are not very particular
* here and take the first one to create our proxy */
myRadarProxy = std::make_unique<proxy::RadarServiceProxy>(handles[0]);
/* we are interested in receiving the event "BrakeEvent" - so we
* subscribe for it. We want to access up to 10 events, since our
* sample algo averages over at most 10.*/
myRadarProxy->BrakeEvent.Subscribe(10);
/* whenever new BrakeEvent events come in, we want be called, so we
* register a callback for it!
* Note: If the entity we would subscribe to, would be a field
* instead of an event, it would be crucial, to register our
* reception handler BEFORE subscribing, to avoid race conditions.
* After a field subscription, you would get instantly so called
* "initial events" and to be sure not to miss them, you should care
* for that your reception handler is registered before.*/
myRadarProxy->BrakeEvent.SetReceiveHandler( handleBrakeEventReception);
}
// ... wait for application shutdown trigger by application exec mgmt.
}
5.3.5.6 Buffering Strategies
以下图形描绘了一个简单的部署情况,其中有一个服务提供一个事件,两个不同的本地服务消费者(SWC)通过它们各自的Proxy的事件包装类订阅了这个事件。正如在图中看到的,两个Proxy都有一个本地事件缓存,可以通过GetNewSamples()填充的缓存。这幅图还描绘了服务实现将其事件数据发送到一个通信管理缓冲区,这个缓冲区显然在服务实现的进程空间之外 —— 图中假设这个缓冲区由内核拥有,或者为通信的Proxy和Skeleton之间的共享内存,或者由一个单独的特定于绑定实现的 “守护” 进程拥有。
图中假设如下:应用程序被实现为具有独立 / 受保护的内存 / 地址空间的进程。
服务实现(通过Skeleton)发送出的事件数据不能在服务 / 骨架进程的私有地址空间内缓冲:如果是这样的话,代理对事件数据的访问通常会导致切换到服务应用程序进程的上下文。我们希望在服务端通过方法调用处理模式(见 5.4.5 小节)对事件访问完全控制,不应该由服务消费者的通信行为触发。现在让我们大致看一下 “发送事件” 的目标缓冲区可能位于的三个不同位置:
- 内核空间:数据被发送到一个不在应用程序进程的内存区域。当绑定实现使用诸如管道或套接字这样的进程间通信原语时,通常就是这种情况,写入这样一个原语的数据最终会在内核缓冲区空间中。
- 共享内存:数据被发送到一个内存区域,这个区域也可以直接被接收者 / 代理读取。不同方之间的写入 / 读取是专门同步的(使用内存屏障或显式互斥锁进行轻量级同步)。
- 进程间通信守护进程空间:数据被发送到一个明确的非应用程序进程,这个进程充当进程间通信 / 绑定实现的一种守护进程。请注意,从技术上讲,这种方法可能建立在一个进程间通信原语之上,比如通过内核空间或共享内存进行通信,以便将数据从服务进程传输到守护进程。
这些方法中的每一种在缓冲区空间的灵活性 / 大小、访问速度 / 开销效率以及防止恶意访问 / 写入缓冲区的保护方面可能都有不同的优缺点。因此,在一个汽车开放平台(AP)产品及其使用中考虑不同的约束可能会导致不同的解决方案。
在这个例子中需要强调的是,鼓励 AP 产品供应商使用基于引用的方法来访问事件数据:事件包装类的ara::com API 有意地通过SamplePtr来建模访问,这些指针被传递给回调函数,而不是值!在那些相当典型的 1:N 事件通信场景中,允许在 “本地事件缓存” 中不是事件数据值本身,而是指向包含在中央通信管理缓冲区中的数据的指针 / 引用。然后,通过GetNewSamples()更新本地缓存可以实现为不是值的复制,而是引用的更新。
说实话:这显然是关于缓冲区使用优化可能性的一个粗略图景!正如这里所暗示的(7.1 节),传输到应用程序进程的数据通常必须在应用程序访问之前进行反序列化。由于反序列化必须特定于服务消费者的应用程序的对齐方式,因此,已经反序列化的数据在中央共享可能很棘手。但至少你明白了这一点,即:服务消费者的事件数据访问的API设计为消费者之间的事件数据共享提供了空间。
♦️♦️总结♦️♦️
1. Event - Driven vs Polling - Based access
1.1 轮询方式(Polling - Based access)
- 典型用例:应用程序周期性被触发,在特定截止时间前处理,常见于调节器 / 控制算法,循环激活由实时定时器驱动以确保最小抖动。在每个激活周期调用
*GetNewSamples()
,用更新的缓存数据作为当前处理迭代输入,这种情况下,应用程序按调度获取最新数据处理即可,若有新数据就通知应用程序则可能造成不必要的上下文切换。
1.2 事件驱动方式(Event - Driven access)
- 适用场景:当应用程序不是周期性、截止时间驱动,而是需在某些事件发生时简单做出反应,周期性调用
*GetNewSamples()
轮询新事件就不合适且效率低下,此时希望ara::com
实现通知应用程序进行异步上下文切换。 - 实现机制
- 通过
ara::core::Result<void> SetReceiveHandler(ara::com::EventReceiveHandler handler)
API 注册用户定义的回调函数,当有新事件数据可用时通信管理调用该回调函数,注册的回调函数无需可重入,因为ara::com
实现会序列化注册的回调函数,且允许从注册的回调函数内部调用GetNewSamples()
。 - 用户可通过事件包装类提供的
UnsetReceiveHandler()
方法在事件驱动和轮询方法之间切换。
- 通过
- 示例代码
- 创建
RadarService
类型的Proxy
实例,并注册接收处理程序,当接收到新的BrakeEvent
事件时ara::com
实现调用该处理程序,在接收处理程序中更新本地缓存、过滤不满足特定属性的BrakeEvent
事件并调用处理函数处理决定保留的Sample
。
- 创建
2. Buffering Strategies
2.1 部署情况
- 一个服务提供一个事件,两个本地服务消费者(SWC)通过各自
Proxy
的事件包装类订阅该事件,Proxy
有本地事件缓存(通过GetNewSamples()
填充),服务实现将事件数据发送到通信管理缓冲区,该缓冲区可能在以下位置:- 内核空间:数据发送到不在应用程序进程内存区域,使用进程间通信原语(如管道或套接字)时常见,数据最终在内核缓冲区空间。
- 共享内存:数据发送到接收者 / 代理可直接读取的内存区域,不同方的写入 / 读取通过内存屏障或显式互斥锁进行轻量级同步。
- 进程间通信守护进程空间:数据发送到明确的非应用程序进程(充当进程间通信 / 绑定实现的守护进程),从技术上讲可基于进程间通信原语(如通过内核空间或共享内存)将数据从服务进程传输到守护进程。
2.2 设计考虑
- 鼓励汽车开放平台(AP)产品供应商使用基于引用的方法访问事件数据,事件包装类的
ara::com
API 通过SamplePtr
建模访问(传递指针给回调函数而非值),在典型的 1:N 事件通信场景中,本地事件缓存中是指向中央通信管理缓冲区数据的指针 / 引用,GetNewSamples()
更新本地缓存可实现为引用更新。虽然传输到应用程序进程的数据在访问前通常需反序列化且共享已反序列化的数据可能棘手,但 API 设计为消费者之间的事件数据共享提供了空间。