4. RTPS层
eprosima Fast DDS的较低层RTPS层是RTPS标准协议的实现。与DDS层相比,该层提供了对通信协议内部的更多控制,因此高级用户可以更好地控制库的功能。
4.1 与DDS层的关系
该层的元素与DDS层的元素一一对应,并添加了一些元素。该对应关系如下表所示:
4.2 如何使用RTPS层
现在我们将介绍RTPS层的使用,就像我们在DDS层1中所做的那样,解释它的新特性。
我们建议您在阅读本节的同时,查看两个描述如何使用RTPS层的示例。它们位于examples/cpp/rtps/AsSocket和examples/cpp/rtps/Registered中。
4.2.1 管理Participant
使用RTPSDomain::createParticipant()创建RTPSParticipant。RTPSParticipantAttributes结构用于在创建时配置RTPSPParticipant。
RTPSParticipantAttributes participant_attr;
participant_attr.setName("participant");
RTPSParticipant* participant = RTPSDomain::createParticipant(0, participant_attr);
4.2.2 管理Writer和Reader
正如RTPS标准所规定的,RTPSWriter和RTPSReader始终与History元素相关联。在DDS层中,History的创建和管理是隐藏的,但在RTPS层中,您可以完全控制History的创建和配置。
Writer使用RTPSDomain::createRTPSWriter()创建,并使用WriterAttributes结构配置。他们还需要一个WriterHistory,它的配置是HistoryAttributes结构。
HistoryAttributes history_attr;
WriterHistory* history = new WriterHistory(history_attr);
WriterAttributes writer_attr;
RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, writer_attr, history);
与Writer的创建类似,Reader使用RTPSDomain::createRTPSReader()创建,并使用ReaderAttributes结构配置。HistoryAttributes结构用于配置所需的ReaderHistory。注意,在这种情况下,可以提供实现回调的ReaderListener类:
class MyReaderListener : public ReaderListener
{
// Callbacks override
};
MyReaderListener listener;
HistoryAttributes history_attr;
ReaderHistory* history = new ReaderHistory(history_attr);
ReaderAttributes reader_attr;
RTPSReader* reader = RTPSDomain::createRTPSReader(participant, reader_attr, history, &listener);
4.2.3 使用History去发送和接收数据
在RTPS协议中,读者和作家将有关主题的数据保存在其关联的历史中。每条数据都由一个Change表示,eprosima Fast DDS将其实现为CacheChange_t。Change始终由History管理。
您可以将新的CacheChange_t添加到Writer的History中以发送数据。流程如下:
- 使用RTPSWriter::new_change()向Writer请求CacheChange_t。为了分配足够的内存,需要提供一个回调,该回调返回有效负载中的最大字节数。
- 用数据填充CacheChange_t。
- 使用WriterHistory::add_change()将其添加到History中。
Writer将负责将数据传达给Reader。
//Request a change from the writer
CacheChange_t* change = writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
//Write serialized data into the change
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2) + 1;
//Insert change into the history. The Writer takes care of the rest.
history->add_change(change);
如果主题数据类型有多个字段,则必须提供函数来序列化和反序列化CacheChange_t中的数据。Fast DDS Gen为您提供这一功能。
您可以从ReaderListener::onNewCacheChangeAdded回调中接收数据,就像我们在DDS层中所做的那样:
- 回调接收包含接收数据的CacheChange_t参数。
- 处理接收到的CacheChange_t内的数据。(这一步是Reader处理接收到的数据,也就是业务处理)
- 告知Reader的History,不再需要Change,将Change移除。()
class MyReaderListener : public ReaderListener
{
public:
MyReaderListener()
{
}
~MyReaderListener()
{
}
void onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const change)
{
// The incoming message is enclosed within the `change` in the function parameters
printf("%s\n", change->serializedPayload.data);
// Once done, remove the change
reader->getHistory()->remove_change((CacheChange_t*)change);
}
};
4.3 配置Readers和Writers
使用RTPS层的好处之一是它提供了新的配置选项,同时保留了DDS层的选项。例如,可以将Writer或Reader设置为Reliable或Best Effort端点,如前所述。
writer_attr.endpoint.reliabilityKind = BEST_EFFORT;
4.3.1 设置数据的持久类型(durability kind)(持久类型就是对已经发送的change的处理方式)
Durability参数定义了当新的Reader匹配时,Writer对已发送的样本的行为。eProsima Fast DDS提供三种耐久性选项:
- Volatile(default): 消息在发送时被丢弃。如果新的读取器在消息n之后匹配,它将从消息n+1开始接收。
- Transient_local: Writer将保存其最近发送的k条消息的记录。如果新的读取器在消息n之后匹配,它将从消息n-k开始接收。
- Transient: 类似于Transient_local,但消息的记录将保存到持久性存储中,因此,如果写入程序被销毁并重新创建,或者在应用程序崩溃的情况下,已发送数据仍将可用。
要选择你喜欢的选项:
writer_attr.endpoint.durabilityKind = TRANSIENT_LOCAL;
因为在RTPS层中,您可以控制History,所以在Transient_local和Transient模式下,Writer会发送您尚未从历史中明确释放的所有Change。
4.4 配置History
History有自己的配置结构HistoryAttributes。
4.4.1 更改有效负载的最大大小
您可以选择可以进入CacheChange_t的有效负载的最大大小。请确保选择的大小能够容纳尽可能多的数据:
history_attr.payloadMaxSize = 250;//Defaults to 500 bytes
4.4.2 修改History的大小
您可以指定要保留的历史记录的最大值,并分配历史记录中Change的初始数量:(也就是History中最大值和初始值)
history_attr.initialReservedCaches = 250; //Defaults to 500
history_attr.maximumReservedCaches = 500; //Defaults to 0 = Unlimited Changes
当保留Change的初始数量低于最大值时,History在未来根据需要分配更多更改,直到达到最大大小。
4.5 使用自定义的负载池
有效负载定义为用户希望在Writer和Reader之间传输的数据。RTPS需要向该有效负载添加一些元数据,以便管理端点之间的通信。因此,此Payload被封装在CacheChange_t的SerializedPayload_t字段中,而CacheChange_t的其余字段提供所需的元数据。
WriterHistory和ReaderHistory为用户提供了一个与这些更改交互的界面:将由Writer传输的更改添加到其WriterHistory,并且可以从ReaderHistory中删除已在Reader上处理的更改。从这个意义上说,历史记录充当了尚未完全处理的更改的缓冲区。
在正常执行过程中,新的更改将添加到历史记录中,旧的更改将从历史记录中删除。为了管理这些更改中包含的Payload的生命周期,Reader和Writer使用Pool对象,这是IPayloadPool接口的实现。不同的池实现允许不同的优化。例如,可以从不同的预分配内存块中取出不同大小的Payload。
Writer和Reader可以自动选择最适合HistoryAttributes中给出的配置的默认Payload池实现。但是,可以为RTPSDomain::createRTPSWriter()和RTPSDomain::createRTPSReader()函数提供自定义负载池。当请求或释放新的CacheChange_t时,Writer和Reader将使用提供的池。
4.5.1 IPayloadPool 接口
-
重载带大小参数的函数IPayloadPool::get_payload
将参数指定大小的空Payload绑定到CacheChange_t实例。然后可以用所需的数据填充有效负载。 -
用SerializedPayload参数重载IPayloadPool::get_payload
将给定的Payload数据复制到池中的新Payload,并将其绑定到CacheChange_t实例。此重载还获取一个指向拥有原始Payload的池的指针。这允许某些优化,例如如果原始负载来自同一个池,则共享Payload,从而避免复制操作。 -
IPayload::release_payload:
将绑定到CacheChange_t的Payload放回到池Pool,并将payload和CacheChange_t之间的解绑。
**重要**
在实现自定义Payload池时,确保分配的Payload满足标准RTPS序列化的要求。具体而言,Payloads必须足够大,以容纳序列化用户数据加上RTPS标准第10.2节中规定的SerializedPayloadHeader的4个八位字节。
例如,如果我们知道序列化用户数据的上限,我们可以考虑实现一个池,该池总是分配固定大小的Payload,足够大,可以容纳这些数据。如果串行化的用户数据最多有N个八位字节,则分配的有效负载必须至少有N+4个八位字符。
请注意,请求IPayloadPool::get_payload的大小已经考虑了这个4个八位字节的标头。
4.5.2 默认的负载池实现
如果没有向Writer或Reader提供自定义Payload池,Fast DDS将自动使用与History的memoryPolicy策略最匹配的默认实现。
preallocated_memory_mode
所有有效载荷都将有一个固定大小的数据缓冲区,等于有效载荷MaxSize的值,而不考虑请求IPayloadPool::get_payload的大小。已发布的有效负载可以重新用于另一个CacheChange_t。这以更高的内存使用率为代价减少了内存分配操作。
在历史的初始化过程中,initialReservedCaches Payloads为初始分配的CacheChange_t预先分配。
preallocated_with_realloc_memory_mode
Payloads保证数据缓冲区至少与请求大小和payloadMaxSize之间的最大值一样大。已发布的有效负载可以重新用于另一个CacheChange_t。如果至少有一个空闲有效负载的缓冲区大小等于或大于请求的有效负载,则不进行内存分配。
在历史的初始化过程中,initialReservedCaches Payloads为初始分配的CacheChange_t预先分配。
dynamic_reserve_memory_mode
每次请求Payload时,都会在内存中分配一个具有适当大小的新Payload。payloadMaxSize被忽略。释放的Payload的内存总是被释放的,因此池中永远不会有空闲的Payload。这以频繁的内存分配为代价减少了内存使用。
在历史的初始化过程中不进行有效负载的预分配。
dynamic_resuable_memory_mode
保证有效负载的数据缓冲区至少与请求的大小一样大。payloadMaxSize被忽略。
已发布的有效负载可以重新用于另一个CacheChange_t。如果至少有一个空闲有效负载的缓冲区大小等于或大于请求的有效负载,则不进行内存分配。
4.5.3 使用自定义负载池的案例
// A simple payload pool that reserves and frees memory each time
class CustomPayloadPool : public IPayloadPool
{
bool get_payload(
uint32_t size,
CacheChange_t& cache_change) override
{
// Reserve new memory for the payload buffer
octet* payload = new octet[size];
// Assign the payload buffer to the CacheChange and update sizes
cache_change.serializedPayload.data = payload;
cache_change.serializedPayload.length = size;
cache_change.serializedPayload.max_size = size;
// Tell the CacheChange who needs to release its payload
cache_change.payload_owner(this);
return true;
}
bool get_payload(
SerializedPayload_t& data,
IPayloadPool*& /* data_owner */,
CacheChange_t& cache_change) override
{
// Reserve new memory for the payload buffer
octet* payload = new octet[data.length];
// Copy the data
memcpy(payload, data.data, data.length);
// Assign the payload buffer to the CacheChange and update sizes
cache_change.serializedPayload.data = payload;
cache_change.serializedPayload.length = data.length;
cache_change.serializedPayload.max_size = data.length;
// Tell the CacheChange who needs to release its payload
cache_change.payload_owner(this);
return true;
}
bool release_payload(
CacheChange_t& cache_change) override
{
// Ensure precondition
assert(this == cache_change.payload_owner());
// Dealloc the buffer of the payload
delete[] cache_change.serializedPayload.data;
// Reset sizes and pointers
cache_change.serializedPayload.data = nullptr;
cache_change.serializedPayload.length = 0;
cache_change.serializedPayload.max_size = 0;
// Reset the owner of the payload
cache_change.payload_owner(nullptr);
return true;
}
};
std::shared_ptr<CustomPayloadPool> payload_pool = std::make_shared<CustomPayloadPool>();
// A writer using the custom payload pool
HistoryAttributes writer_history_attr;
WriterHistory* writer_history = new WriterHistory(writer_history_attr);
WriterAttributes writer_attr;
RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, writer_attr, payload_pool, writer_history);
// A reader using the same instance of the custom payload pool
HistoryAttributes reader_history_attr;
ReaderHistory* reader_history = new ReaderHistory(reader_history_attr);
ReaderAttributes reader_attr;
RTPSReader* reader = RTPSDomain::createRTPSReader(participant, reader_attr, payload_pool, reader_history);
// Write and Read operations work as usual, but take the Payloads from the pool.
// Requesting a change to the Writer will provide one with an empty Payload taken from the pool
CacheChange_t* change = writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
// Write serialized data into the change and add it to the history
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2) + 1;
writer_history->add_change(change);