FastDDS-4.RTPS层

news2025/1/10 17:28:06


4. RTPS层

eprosima Fast DDS的较低层RTPS层是RTPS标准协议的实现。与DDS层相比,该层提供了对通信协议内部的更多控制,因此高级用户可以更好地控制库的功能。



4.1 与DDS层的关系

该层的元素与DDS层的元素一一对应,并添加了一些元素。该对应关系如下表所示:



4.2 如何使用RTPS层

现在我们将介绍RTPS层的使用,就像我们在DDS层1中所做的那样,解释它的新特性。

我们建议您在阅读本节的同时,查看两个描述如何使用RTPS层的示例。它们位于examples/cpp/rtps/AsSocket和examples/cpp/rtps/Registered中。



4.2.1 管理Participant

使用RTPSDomain::createParticipant()创建RTPSParticipant。RTPSParticipantAttributes结构用于在创建时配置RTPSPParticipant。

RTPSParticipantAttributes participant_attr;
participant_attr.setName("participant");
RTPSParticipant* participant = RTPSDomain::createParticipant(0, participant_attr);


4.2.2 管理Writer和Reader

正如RTPS标准所规定的,RTPSWriter和RTPSReader始终与History元素相关联。在DDS层中,History的创建和管理是隐藏的,但在RTPS层中,您可以完全控制History的创建和配置。

Writer使用RTPSDomain::createRTPSWriter()创建,并使用WriterAttributes结构配置。他们还需要一个WriterHistory,它的配置是HistoryAttributes结构。

HistoryAttributes history_attr;
WriterHistory* history = new WriterHistory(history_attr);
WriterAttributes writer_attr;
RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, writer_attr, history);

与Writer的创建类似,Reader使用RTPSDomain::createRTPSReader()创建,并使用ReaderAttributes结构配置。HistoryAttributes结构用于配置所需的ReaderHistory。注意,在这种情况下,可以提供实现回调的ReaderListener类:

class MyReaderListener : public ReaderListener
{
    // Callbacks override
};
MyReaderListener listener;
HistoryAttributes history_attr;
ReaderHistory* history = new ReaderHistory(history_attr);
ReaderAttributes reader_attr;
RTPSReader* reader = RTPSDomain::createRTPSReader(participant, reader_attr, history, &listener);


4.2.3 使用History去发送和接收数据

在RTPS协议中,读者和作家将有关主题的数据保存在其关联的历史中。每条数据都由一个Change表示,eprosima Fast DDS将其实现为CacheChange_t。Change始终由History管理。

您可以将新的CacheChange_t添加到Writer的History中以发送数据。流程如下:

  1. 使用RTPSWriter::new_change()向Writer请求CacheChange_t。为了分配足够的内存,需要提供一个回调,该回调返回有效负载中的最大字节数。
  2. 用数据填充CacheChange_t。
  3. 使用WriterHistory::add_change()将其添加到History中。

Writer将负责将数据传达给Reader。

//Request a change from the writer
CacheChange_t* change = writer->new_change([]() -> uint32_t
                {
                    return 255;
                }, ALIVE);
//Write serialized data into the change
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2) + 1;
//Insert change into the history. The Writer takes care of the rest.
history->add_change(change);

如果主题数据类型有多个字段,则必须提供函数来序列化和反序列化CacheChange_t中的数据。Fast DDS Gen为您提供这一功能。

您可以从ReaderListener::onNewCacheChangeAdded回调中接收数据,就像我们在DDS层中所做的那样:

  1. 回调接收包含接收数据的CacheChange_t参数。
  2. 处理接收到的CacheChange_t内的数据。(这一步是Reader处理接收到的数据,也就是业务处理)
  3. 告知Reader的History,不再需要Change,将Change移除。()
class MyReaderListener : public ReaderListener
{
public:

    MyReaderListener()
    {
    }

    ~MyReaderListener()
    {
    }

    void onNewCacheChangeAdded(
            RTPSReader* reader,
            const CacheChange_t* const change)
    {
        // The incoming message is enclosed within the `change` in the function parameters
        printf("%s\n", change->serializedPayload.data);
        // Once done, remove the change
        reader->getHistory()->remove_change((CacheChange_t*)change);
    }

};


4.3 配置Readers和Writers

使用RTPS层的好处之一是它提供了新的配置选项,同时保留了DDS层的选项。例如,可以将Writer或Reader设置为Reliable或Best Effort端点,如前所述。

writer_attr.endpoint.reliabilityKind = BEST_EFFORT;


4.3.1 设置数据的持久类型(durability kind)(持久类型就是对已经发送的change的处理方式)

Durability参数定义了当新的Reader匹配时,Writer对已发送的样本的行为。eProsima Fast DDS提供三种耐久性选项:

  • Volatile(default): 消息在发送时被丢弃。如果新的读取器在消息n之后匹配,它将从消息n+1开始接收。
  • Transient_local: Writer将保存其最近发送的k条消息的记录。如果新的读取器在消息n之后匹配,它将从消息n-k开始接收。
  • Transient: 类似于Transient_local,但消息的记录将保存到持久性存储中,因此,如果写入程序被销毁并重新创建,或者在应用程序崩溃的情况下,已发送数据仍将可用。

要选择你喜欢的选项:

writer_attr.endpoint.durabilityKind = TRANSIENT_LOCAL;

因为在RTPS层中,您可以控制History,所以在Transient_local和Transient模式下,Writer会发送您尚未从历史中明确释放的所有Change。



4.4 配置History

History有自己的配置结构HistoryAttributes。



4.4.1 更改有效负载的最大大小

您可以选择可以进入CacheChange_t的有效负载的最大大小。请确保选择的大小能够容纳尽可能多的数据:

history_attr.payloadMaxSize  = 250;//Defaults to 500 bytes

4.4.2 修改History的大小

您可以指定要保留的历史记录的最大值,并分配历史记录中Change的初始数量:(也就是History中最大值和初始值)

history_attr.initialReservedCaches = 250; //Defaults to 500
history_attr.maximumReservedCaches = 500; //Defaults to 0 = Unlimited Changes

当保留Change的初始数量低于最大值时,History在未来根据需要分配更多更改,直到达到最大大小。



4.5 使用自定义的负载池

有效负载定义为用户希望在Writer和Reader之间传输的数据。RTPS需要向该有效负载添加一些元数据,以便管理端点之间的通信。因此,此Payload被封装在CacheChange_t的SerializedPayload_t字段中,而CacheChange_t的其余字段提供所需的元数据。

WriterHistory和ReaderHistory为用户提供了一个与这些更改交互的界面:将由Writer传输的更改添加到其WriterHistory,并且可以从ReaderHistory中删除已在Reader上处理的更改。从这个意义上说,历史记录充当了尚未完全处理的更改的缓冲区。

在正常执行过程中,新的更改将添加到历史记录中,旧的更改将从历史记录中删除。为了管理这些更改中包含的Payload的生命周期,Reader和Writer使用Pool对象,这是IPayloadPool接口的实现。不同的池实现允许不同的优化。例如,可以从不同的预分配内存块中取出不同大小的Payload。

Writer和Reader可以自动选择最适合HistoryAttributes中给出的配置的默认Payload池实现。但是,可以为RTPSDomain::createRTPSWriter()和RTPSDomain::createRTPSReader()函数提供自定义负载池。当请求或释放新的CacheChange_t时,Writer和Reader将使用提供的池。



4.5.1 IPayloadPool 接口

  • 重载带大小参数的函数IPayloadPool::get_payload
    将参数指定大小的空Payload绑定到CacheChange_t实例。然后可以用所需的数据填充有效负载。

  • 用SerializedPayload参数重载IPayloadPool::get_payload
    将给定的Payload数据复制到池中的新Payload,并将其绑定到CacheChange_t实例。此重载还获取一个指向拥有原始Payload的池的指针。这允许某些优化,例如如果原始负载来自同一个池,则共享Payload,从而避免复制操作。

  • IPayload::release_payload:
    将绑定到CacheChange_t的Payload放回到池Pool,并将payload和CacheChange_t之间的解绑。

**重要**

在实现自定义Payload池时,确保分配的Payload满足标准RTPS序列化的要求。具体而言,Payloads必须足够大,以容纳序列化用户数据加上RTPS标准第10.2节中规定的SerializedPayloadHeader的4个八位字节。

例如,如果我们知道序列化用户数据的上限,我们可以考虑实现一个池,该池总是分配固定大小的Payload,足够大,可以容纳这些数据。如果串行化的用户数据最多有N个八位字节,则分配的有效负载必须至少有N+4个八位字符。

请注意,请求IPayloadPool::get_payload的大小已经考虑了这个4个八位字节的标头。


4.5.2 默认的负载池实现

如果没有向Writer或Reader提供自定义Payload池,Fast DDS将自动使用与History的memoryPolicy策略最匹配的默认实现。

preallocated_memory_mode

所有有效载荷都将有一个固定大小的数据缓冲区,等于有效载荷MaxSize的值,而不考虑请求IPayloadPool::get_payload的大小。已发布的有效负载可以重新用于另一个CacheChange_t。这以更高的内存使用率为代价减少了内存分配操作。

在历史的初始化过程中,initialReservedCaches Payloads为初始分配的CacheChange_t预先分配。

preallocated_with_realloc_memory_mode
Payloads保证数据缓冲区至少与请求大小和payloadMaxSize之间的最大值一样大。已发布的有效负载可以重新用于另一个CacheChange_t。如果至少有一个空闲有效负载的缓冲区大小等于或大于请求的有效负载,则不进行内存分配。

在历史的初始化过程中,initialReservedCaches Payloads为初始分配的CacheChange_t预先分配。

dynamic_reserve_memory_mode
每次请求Payload时,都会在内存中分配一个具有适当大小的新Payload。payloadMaxSize被忽略。释放的Payload的内存总是被释放的,因此池中永远不会有空闲的Payload。这以频繁的内存分配为代价减少了内存使用。

在历史的初始化过程中不进行有效负载的预分配。

dynamic_resuable_memory_mode
保证有效负载的数据缓冲区至少与请求的大小一样大。payloadMaxSize被忽略。
已发布的有效负载可以重新用于另一个CacheChange_t。如果至少有一个空闲有效负载的缓冲区大小等于或大于请求的有效负载,则不进行内存分配。



4.5.3 使用自定义负载池的案例

// A simple payload pool that reserves and frees memory each time
class CustomPayloadPool : public IPayloadPool
{
    bool get_payload(
            uint32_t size,
            CacheChange_t& cache_change) override
    {
        // Reserve new memory for the payload buffer
        octet* payload = new octet[size];

        // Assign the payload buffer to the CacheChange and update sizes
        cache_change.serializedPayload.data = payload;
        cache_change.serializedPayload.length = size;
        cache_change.serializedPayload.max_size = size;

        // Tell the CacheChange who needs to release its payload
        cache_change.payload_owner(this);

        return true;
    }

    bool get_payload(
            SerializedPayload_t& data,
            IPayloadPool*& /* data_owner */,
            CacheChange_t& cache_change) override
    {
        // Reserve new memory for the payload buffer
        octet* payload = new octet[data.length];

        // Copy the data
        memcpy(payload, data.data, data.length);

        // Assign the payload buffer to the CacheChange and update sizes
        cache_change.serializedPayload.data = payload;
        cache_change.serializedPayload.length = data.length;
        cache_change.serializedPayload.max_size = data.length;

        // Tell the CacheChange who needs to release its payload
        cache_change.payload_owner(this);

        return true;
    }

    bool release_payload(
            CacheChange_t& cache_change) override
    {
        // Ensure precondition
        assert(this == cache_change.payload_owner());

        // Dealloc the buffer of the payload
        delete[] cache_change.serializedPayload.data;

        // Reset sizes and pointers
        cache_change.serializedPayload.data = nullptr;
        cache_change.serializedPayload.length = 0;
        cache_change.serializedPayload.max_size = 0;

        // Reset the owner of the payload
        cache_change.payload_owner(nullptr);

        return true;
    }

};

std::shared_ptr<CustomPayloadPool> payload_pool = std::make_shared<CustomPayloadPool>();

// A writer using the custom payload pool
HistoryAttributes writer_history_attr;
WriterHistory* writer_history = new WriterHistory(writer_history_attr);
WriterAttributes writer_attr;
RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, writer_attr, payload_pool, writer_history);

// A reader using the same instance of the custom payload pool
HistoryAttributes reader_history_attr;
ReaderHistory* reader_history = new ReaderHistory(reader_history_attr);
ReaderAttributes reader_attr;
RTPSReader* reader = RTPSDomain::createRTPSReader(participant, reader_attr, payload_pool, reader_history);

// Write and Read operations work as usual, but take the Payloads from the pool.
// Requesting a change to the Writer will provide one with an empty Payload taken from the pool
CacheChange_t* change = writer->new_change([]() -> uint32_t
                {
                    return 255;
                }, ALIVE);

// Write serialized data into the change and add it to the history
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2) + 1;
writer_history->add_change(change);




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【使用两个栈实现队列】

文章目录一、栈和队列的基本特点二、基本接口函数的实现1.栈的接口2.创建队列骨架3.入队操作4.取出队列元素5.返回队首元素6.判断队列是否为空7.销毁队列总结一、栈和队列的基本特点 栈的特点是后进先出&#xff0c;而队列的特点是先进先出。 使用两个栈实现队列&#xff0c;必…

【DataX】数据同步到PG时遇到的分区不存在问题

数据同步到PG时遇到的分区不存在问题前言正文问题分析解决方法结语前言 大概说下这个问题牵扯出来的背景&#xff0c;一个外场项目&#xff0c;选型用PG存业务数据&#xff0c;然后客户要求保存保留一年的数据&#xff0c;运行到现在服务器5个T的磁盘已经有点扛不住了&#xf…

内存的管理

取指令——译码——执行——返存 计组课我们学过cpu真正读指令并非是从内存中读入&#xff0c;而是从cache读和存&#xff0c;再由cache进行取指或返存&#xff0c;因为cpu指令周期比内存周期速度快很多&#xff0c;cpu若要取指或返存都需要等待内存完成他的动作才可以进行下一…

python爬虫:如何定义内容提取器

项目背景 在python 即时网络爬虫项目启动说明中我们讨论一个数字&#xff1a;程序员浪费在调测内容提取规则上的时间&#xff0c;从而我们发起了这个项目&#xff0c;把程序员从繁琐的调测规则中解放出来&#xff0c;投入到更高端的数据处理工作中。 解决方案 为了解决这个问题…

微信小程序使用scss编译wxss文件的配置步骤

文章目录1、在 vscode 中搜索 easysass 插件并安装2、在微信开发工具中导入安装的easysass插件3、修改 spook.easysass-0.0.6/package.json 文件中的配置4、重启开发者工具&#xff0c;就可用使用了微信小程序开发者工具集成了 vscode 编辑器&#xff0c;可以使用 vscode 中众多…

C++修炼之练气期三层——函数重载

目录 1.引例 2.函数重载的概念 3.C支持函数重载的原理 1.引例 倘若现在要实现一个加法计算器&#xff0c;用C语言实现的话我们会选择这样的方式&#xff1a; int Add_int(int a, int b) {return a b; }double Add_double(double a, double b) {return a b; } 在使用加…

Exposure2023专业摄影RAW格式大师专业滤镜特效

Exposure2023是一款专为摄影艺术设计的图像编辑器。新的 Exposure2023结合了专业级的照片调整、庞大的华丽照片库和令人愉悦的高效设计。可以提供最大&#xff0c;最准确的电影外观选择。Exposure的创意外观不仅限于电影模拟&#xff0c;从干净优雅的现代风格到引人注目的色彩变…

SpringBoot+Nacos+OpenFeign环境搭建

目录 1.boot方式nacos与openFeign集成 1.引入依赖 2.添加配置 3.测试接口调用 4.常见问题&#xff1a; 1.版本依赖 2.nacos客户端 2.cloud方式nacos与openFeign集成 1.引入依赖 2.添加配置 3.接口定义 4.开启FeignClients客户端 5.远程接口测试 6.Nacos配置中心 1…

Java - 数据结构,二叉树

一、什么是树 概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。它具有以下的特点&#xff1a; 1、有…

SAP ERP系统MM模块常用增强之四:采购申请输入字段的校验检查

在SAP/ERP项目的实施中采购管理模块&#xff08;MM&#xff09;的创建和修改采购申请一般都会有输入字段校验检查的需求&#xff0c;来防止业务人员录入错误或少录入数据&#xff0c;这方面需求部分是可以通过配置实现&#xff0c;比如一些字段是否必输&#xff0c;是否显示等&…

WebRTC拥塞控制算法——GCC介绍

网络拥塞是基于IP协议的数据报交换网络中常见的一种网络传输问题&#xff0c;它对网络传输的质量有严重的影响&#xff0c; 网络拥塞是导致网络吞吐降低&#xff0c; 网络丢包等的主要原因之一&#xff0c; 这些问题使得上层应用无法有效的利用网络带宽获得高质量的网络传输效果…

C++——智能指针1

目录 RAII auto_ptr模拟实现 智能指针拷贝问题 唯一指针 shared_ptr&#xff08;可以拷贝&#xff09; shared_ptr模拟实现 完整代码 循环引用 weak_ptr模拟实现 定制删除器 shared_ptr定制删除器模拟实现 内存泄漏 RAII RAII&#xff08;Resource Acquisit…

SkyWalking使用案例

SkyWalking监控java项目Halo博客 Halo是一个开源的博客项目&#xff0c;使用java编写&#xff0c;官网地址&#xff1a;https://halo.run/ 安装java环境&#xff0c;Halo对java版本有限制&#xff0c;1.4.3版本以上需要使用java11以上 apt -y install openjdk-11-jdk java -…

matplotlib常用操作

文章目录1 matplotlib绘图1.1 绘图步骤2 matplotlib基本元素2.1 matplotlib 画布2.2 设置坐标轴长度和范围2.3 设置图形的线型和颜色2.4 设置图形刻度范围、刻度标签和坐标轴标签等2.4.1 设置刻度范围2.4.2 设置坐标轴刻度2.5 文本标签图例3 matplotlib的ax对象绘图4 绘制子图5…

2.3 黑群晖驱动:开启nvme缓存、将nvme缓存作为存储盘 教程

黑群晖驱动安装工具下载&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1CMLl6waOuW-Ys2gKZx7Jgg?pwdchct提取码&#xff1a;chct一、开启NVME缓存在群辉->控制面板->终端机和SNMP->终端机中 勾选“启动SSH”功能&#xff0c;并点击应用&#xff1b;下载安装P…

HACKTHEBOX——Teacher

nmapnmap -sV -sC -p- -T4 -oA nmap 10.10.10.153nmap只发现了对外开放了80端口&#xff0c;从http-title看出可能是某个中学的官网http打开网站确实是一个官网&#xff0c;查看每个接口看看有没有可以利用的地方发现了一个接口&#xff0c;/images/5.png&#xff0c;但是响应包…

国内有哪些支持定制化的低代码平台?

编者按&#xff1a;贴合企业业务需求的系统才是好系统&#xff0c;高程度的定制能力平台意味着可以提供更高契合度的产品&#xff0c;更好地匹配业务需求。本文介绍了国内支持定制化的老厂商低代码平台&#xff0c;具有源码交付、私有化部署、国产化、数据对接等优势。关键词&a…

服务端开发之Java备战秋招面试篇5

努力了那么多年,回头一望,几乎全是漫长的挫折和煎熬。对于大多数人的一生来说,顺风顺水只是偶尔,挫折、不堪、焦虑和迷茫才是主旋律。我们登上并非我们所选择的舞台,演出并非我们所选择的剧本。继续加油吧&#xff01; 目录 1.ArrayList与LinkedList区别&#xff0c; 应用场景…

免费数据恢复软件哪个好?排名前十的软件有这些!

我们经常会使用电脑&#xff0c;有时是为了放松娱乐&#xff0c;有时是为了处理工作。里面保存着大大小小的数据&#xff0c;多的数不胜数。如果我们的数据丢失&#xff0c;通过很多方法都没有办法恢复&#xff0c;那么软件可以帮助用户轻松处理各种丢失的文件&#xff0c;并可…

IDEA git cherry pick 简单使用

cherry pick的作用&#xff1a; 参考了一些博客&#xff0c; eg&#xff1a;参考博客1 参考博客2 再做了个小案例后&#xff0c;我目前的理解是&#xff0c;cherry pick的作用是将一个分支的部分提交/历史提交,可以合并到另外一个分支。这也只是解决办法之一,之后遇到详细真实场…