目录
DataWriter分析
DataWriter 类分析
DataWriterImpl 类分析
关键函数分析
DataWriter分析
DataWriter 类分析
DataWriter 类是 Fast DDS 库中的一个重要类,它用于实现 DDS(Data Distribution Service)发布-订阅通信模型中的数据写入功能。
用途:
DataWriter 类用于向特定主题(Topic)发布数据。它负责将数据写入到该主题,并将数据传输给订阅该主题的数据读取器(DataReader)。通过 DataWriter,用户可以发布数据,注册和注销数据实例,获取状态信息以及设置数据写入的 QoS(Quality of Service)。
成员变量:
- impl_:指向 DataWriterImpl 类的实例的指针,它是 DataWriter 的实际实现。
成员函数:
DataWriter 类提供了多个成员函数,下面是一些重要的函数及其作用:
- enable():启用数据写入器。
- write(data):将数据写入到主题。
- register_instance(instance):注册数据实例。
- unregister_instance(instance):注销数据实例。
- get_key_value(key_holder, handle):获取数据实例的键值。
- set_qos(qos):设置数据写入的 QoS。
- get_qos():获取当前数据写入的 QoS。
- set_listener(listener):设置数据写入的监听器。
- get_topic():获取关联的主题。
- get_publisher():获取创建该数据写入器的发布者。
大致的实现方法:
DataWriter 类的实现方法主要依赖于底层的 DataWriterImpl 类。DataWriterImpl 类是 DataWriter 的实际实现,它封装了与数据写入相关的底层逻辑和操作。在 DataWriter 的成员函数中,通过调用 DataWriterImpl 对应的成员函数来完成相应的操作。例如,在 write(data) 函数中,会调用 DataWriterImpl 的 write(data) 函数来实际执行数据写入操作。
同时,DataWriter 类还提供了一些其他功能,如通过 loan_sample 和 discard_loan 函数直接在内部池中借用和归还数据样本的功能,以及获取数据写入的状态信息等。
下面是详细的方法描述
因为这个类本质上只是一个包装类,所以重点函数基本上都是DataWriterImpl 类实现的
DataWriterImpl 类分析
类图
DataWriterImpl类是Fast DDS库中的一个关键类,用于实现数据写入的功能。它包含了数据写入相关的操作和状态,以及与底层通信层(RTPS)的交互。下面是对该类及其函数的作用和大致实现方式的介绍:
- DataWriterImpl(PublisherImpl*, TypeSupport, Topic*, const DataWriterQos&, DataWriterListener*):类的构造函数,用于创建DataWriterImpl对象并初始化其成员变量。
- ~DataWriterImpl():类的析构函数,用于清理资源和释放内存。
- enable():启用DataWriterImpl对象,创建底层实体(RTPSWriter)等。
- loan_sample(void*, LoanInitializationKind):从内部池中借用一个样本,用于写入数据。
- discard_loan(void*&):废弃之前借用的样本。
- write(void*):将数据写入到Topic中。
- write(void*, fastrtps::rtps::WriteParams&):附带写入参数,将数据写入到Topic中。
- write(void*, const InstanceHandle_t&):将数据写入到指定的实例中。
- write_w_timestamp(void*, const InstanceHandle_t&, const fastrtps::Time_t&):附带时间戳,将数据写入到指定的实例中。
- register_instance(void*):注册一个新的实例,并返回实例的句柄。
- register_instance_w_timestamp(void*, const fastrtps::Time_t&):附带时间戳,注册一个新的实例,并返回实例的句柄。
- unregister_instance(void*, const InstanceHandle_t&, bool = false):注销指定实例。
- unregister_instance_w_timestamp(void*, const InstanceHandle_t&, const fastrtps::Time_t&, bool = false):附带时间戳,注销指定实例。
- guid():获取DataWriter的GUID。
- get_instance_handle():获取DataWriter的实例句柄。
- get_type():获取数据类型支持对象。
- get_qos():获取DataWriter的QoS配置。
- get_topic():获取DataWriter关联的Topic。
- get_listener():获取DataWriter的监听器。
- set_listener(DataWriterListener*):设置DataWriter的监听器。
- get_publication_matched_status(PublicationMatchedStatus&):获取发布匹配状态。
- get_offered_deadline_missed_status(OfferedDeadlineMissedStatus&):获取未满足的最后期限状态。
- get_offered_incompatible_qos_status(OfferedIncompatibleQosStatus&):获取不兼容的QoS状态。
- set_qos(const DataWriterQos&):设置DataWriter的QoS配置。
- get_liveliness_lost_status(LivelinessLostStatus&):获取失去活跃性的状态。
- get_publisher():获取关联的Publisher。
- assert_liveliness():声明活跃性。
- disable():禁用DataWriterImpl对象,移除所有监听器。
- clear_history(size_t*):清除历史记录中的所有数据。
- get_sending_locators(rtps::LocatorList&):获取DataWriter可以发送数据的定位器列表。
- filter_is_being_removed(const char*):在过滤器被移除时调用的方法。
关键函数分析
write
ReturnCode_t DataWriterImpl::write(void* data) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 检查数据是否为空 if (data == nullptr) { return ReturnCode_t::RETCODE_BAD_PARAMETER; } // 检查数据共享是否兼容 if (is_data_sharing_compatible_) { // 使用负载池分配Payload SerializedPayload_t payload; if (!get_payload_pool()->get_payload(payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, &payload)) { get_payload_pool()->release_payload(payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(payload); return ret; } // 设置负载并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = payload; writer_->add_change(change); } else { // 使用内部负载分配器分配Payload std::unique_ptr<SerializedPayload_t> payload(new SerializedPayload_t()); if (!payload || !get_payload_pool()->get_payload(*payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, payload.get())) { get_payload_pool()->release_payload(*payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(*payload); return ret; } // 设置负载并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = *payload; writer_->add_change(change); } return ReturnCode_t::RETCODE_OK; }
write函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 然后,检查数据是否为空,如果为空则返回RETCODE_BAD_PARAMETER。
- 接下来,根据数据共享的兼容性情况,选择使用不同的负载分配器。
- 如果数据共享兼容,则使用负载池分配SerializedPayload_t对象,然后将数据序列化到负载中。
- 创建新的更改并将其添加到历史记录中。
- 设置负载并将更改添加到DataWriter中。
- 如果数据共享不兼容,则使用内部负载分配器分配SerializedPayload_t对象,然后将数据序列化到负载中。
- 创建新的更改并将其添加到历史记录中。
- 设置负载并将更改添加到DataWriter中。
- 返回RETCODE_OK表示写入操作成功。
write_w_timestamp
ReturnCode_t DataWriterImpl::write_w_timestamp(void* data, const fastrtps::Time_t& timestamp) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 检查数据是否为空 if (data == nullptr) { return ReturnCode_t::RETCODE_BAD_PARAMETER; } // 检查数据共享是否兼容 if (is_data_sharing_compatible_) { // 使用负载池分配Payload SerializedPayload_t payload; if (!get_payload_pool()->get_payload(payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, &payload)) { get_payload_pool()->release_payload(payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(payload); return ret; } // 设置时间戳并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = payload; change->sourceTimestamp = timestamp; writer_->add_change(change); } else { // 使用内部负载分配器分配Payload std::unique_ptr<SerializedPayload_t> payload(new SerializedPayload_t()); if (!payload || !get_payload_pool()->get_payload(*payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, payload.get())) { get_payload_pool()->release_payload(*payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(*payload); return ret; } // 设置时间戳并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = *payload; change->sourceTimestamp = timestamp; writer_->add_change(change); } return ReturnCode_t::RETCODE_OK; }
write_w_timestamp函数与write函数的实现非常相似。不同之处在于它接受一个额外的参数timestamp,用于指定数据的时间戳。
write_w_timestamp函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 然后,检查数据是否为空,如果为空则返回RETCODE_BAD_PARAMETER。
- 接下来,根据数据共享的兼容性情况,选择使用不同的负载分配器。
- 如果数据共享兼容,则使用负载池分配SerializedPayload_t对象,并将数据序列化到负载中。
- 创建新的更改并将其添加到历史记录中。
- 设置时间戳并将更改添加到DataWriter中。
- 如果数据共享不兼容,则使用内部负载分配器分配SerializedPayload_t对象,并将数据序列化到负载中。
- 创建新的更改并将其添加到历史记录中。
- 设置时间戳并将更改添加到DataWriter中。
- 返回RETCODE_OK表示写入操作成功。
register_instance
ReturnCode_t DataWriterImpl::register_instance(void* key) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 检查键值是否为空 if (key == nullptr) { return ReturnCode_t::RETCODE_BAD_PARAMETER; } // 检查键是否已注册 if (history_.key_exists(key)) { return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(NOT_ALIVE_UNREGISTERED, key, wparams); if (ret != ReturnCode_t::RETCODE_OK) { return ret; } // 将实例注册到历史记录中 CacheChange_t* change = history_.get_last_added_change(); history_.register_instance(change, key); return ReturnCode_t::RETCODE_OK; }
register_instance函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 然后,检查键值是否为空,如果为空则返回RETCODE_BAD_PARAMETER。
- 接下来,检查键是否已经注册,如果已经注册则返回RETCODE_PRECONDITION_NOT_MET。
- 创建新的更改并将其添加到历史记录中。这里使用create_new_change_with_params函数创建一个新的更改,更改类型为NOT_ALIVE_UNREGISTERED,数据为键值。
- 将实例注册到历史记录中。通过调用register_instance函数将更改与键值关联起来,这样历史记录就可以跟踪实例的状态。
- 返回RETCODE_OK表示注册实例操作成功。
history_是DataWriterImpl类中的成员变量,它是用来管理DataWriter的历史更改记录的对象。
history_的作用是存储DataWriter发送的所有更改。每当DataWriter要发送新的数据时,它会创建一个新的更改(CacheChange_t对象),并将其添加到history_中。历史记录中的更改可以按照序列号进行排序,以便按照正确的顺序发送和传输数据。
通过管理历史记录,DataWriter可以实现一些重要的功能,例如支持可靠性、历史访问、数据回溯和重传等。历史记录还可用于处理订阅者的需求,如按需读取、历史数据查询和数据过滤等。
在DataWriterImpl中,history_是一个WriterHistory类型的对象,它实现了管理和维护历史更改记录的功能。WriterHistory是Fast DDS库提供的一个用于管理历史更改的实现。
通过使用history_,DataWriter可以跟踪和管理发送的数据,保证数据可靠性和一致性,并提供灵活的历史访问和数据处理能力。
get_publication_matched_status
ReturnCode_t DataWriterImpl::get_publication_matched_status(PublicationMatchedStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取PublicationMatchedStatus并复制给status参数 status = publication_matched_status_; // 重置current_count_change和total_count_change publication_matched_status_.current_count_change = 0; publication_matched_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::publication_matched(), false); return ReturnCode_t::RETCODE_OK; }
get_publication_matched_status函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 使用std::unique_lock对DataWriter的互斥锁进行加锁,以确保线程安全性。
- 将publication_matched_status_复制到status参数中。
- 重置publication_matched_status_中的current_count_change和total_count_change为0,以表示已经处理了这些变化。
- 通过调用user_datawriter_->get_statuscondition().get_impl()->set_status()设置相应的状态条件,将publication_matched_status_的变化通知给用户。
- 返回RETCODE_OK表示获取PublicationMatchedStatus成功。
DataWriter的当前PublicationMatchedStatus表示与该DataWriter相关的订阅者(DataReader)的匹配状态。PublicationMatchedStatus记录了与DataWriter相关的订阅者数量的变化情况。
PublicationMatchedStatus结构包含以下字段:
- total_count: 表示与DataWriter相关的总订阅者数量。
- total_count_change: 表示自上次获取PublicationMatchedStatus以来总订阅者数量的变化值。
- current_count: 表示当前与DataWriter匹配的订阅者数量。
- current_count_change: 表示自上次获取PublicationMatchedStatus以来当前订阅者数量的变化值。
通过监视PublicationMatchedStatus的变化,可以了解DataWriter与订阅者之间的匹配情况的变化。例如,当新的订阅者与DataWriter匹配时,current_count和total_count会增加,并且可以通过检查current_count_change和total_count_change字段来获知匹配状态的变化。
使用PublicationMatchedStatus可以实现一些功能,例如:
- 监测DataWriter与订阅者的连接状态,以便确定是否成功建立了通信连接。
- 根据与DataWriter匹配的订阅者数量来调整数据发布的策略。
- 基于PublicationMatchedStatus的变化触发特定的操作或回调函数
get_offered_deadline_missed_status
ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status(OfferedDeadlineMissedStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取OfferedDeadlineMissedStatus并复制给status参数 status = offered_deadline_missed_status_; // 重置total_count和total_count_change offered_deadline_missed_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_deadline_missed(), false); return ReturnCode_t::RETCODE_OK; }
get_offered_deadline_missed_status函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 使用std::unique_lock对DataWriter的互斥锁进行加锁,以确保线程安全性。
- 将offered_deadline_missed_status_复制到status参数中。
- 重置offered_deadline_missed_status_中的total_count_change为0,以表示已经处理了这些变化。
- 通过调用user_datawriter_->get_statuscondition().get_impl()->set_status()设置相应的状态条件,将OfferedDeadlineMissedStatus的变化通知给用户。
- 返回RETCODE_OK表示获取OfferedDeadlineMissedStatus成功。
OfferedDeadlineMissedStatus表示DataWriter的最后期限未达到的情况。该状态记录了DataWriter未能按照其所定义的最后期限要求及时发送数据的统计信息。
OfferedDeadlineMissedStatus结构包含以下字段:
- total_count:表示自DataWriter启动以来,达到或超过最后期限的次数总计。
- total_count_change:表示自上次获取OfferedDeadlineMissedStatus以来,达到或超过最后期限的次数的变化值。
通过监视OfferedDeadlineMissedStatus的变化,可以了解DataWriter未按照最后期限要求发送数据的情况。当DataWriter未能及时发送数据时,total_count和total_count_change字段将增加,可以通过检查这些字段的值来获知最后期限未达到的次数和变化情况。
使用OfferedDeadlineMissedStatus可以实现一些功能,例如:
- 监测DataWriter是否按照最后期限要求及时发送数据。
- 根据最后期限未达到的情况采取相应的容错措施,例如重新发送数据或进行补偿操作。
- 基于OfferedDeadlineMissedStatus的变化触发特定的操作或回调函数。
get_offered_incompatible_qos_status
ReturnCode_t DataWriterImpl::get_offered_incompatible_qos_status(OfferedIncompatibleQosStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取OfferedIncompatibleQosStatus并复制给status参数 status = offered_incompatible_qos_status_; // 重置total_count和total_count_change offered_incompatible_qos_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_incompatible_qos(), false); return ReturnCode_t::RETCODE_OK; }
get_offered_incompatible_qos_status函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 使用std::unique_lock对DataWriter的互斥锁进行加锁,以确保线程安全性。
- 将offered_incompatible_qos_status_复制到status参数中。
- 重置offered_incompatible_qos_status_中的total_count_change为0,以表示已经处理了这些变化。
- 通过调用user_datawriter_->get_statuscondition().get_impl()->set_status()设置相应的状态条件,将OfferedIncompatibleQosStatus的变化通知给用户。
- 返回RETCODE_OK表示获取OfferedIncompatibleQosStatus成功。
OfferedIncompatibleQosStatus表示DataWriter提供的QoS(Quality of Service)与订阅者(DataReader)的QoS不兼容的情况。该状态记录了DataWriter与订阅者之间QoS不兼容的统计信息。
OfferedIncompatibleQosStatus结构包含以下字段:
- total_count:表示自DataWriter启动以来,发现的不兼容QoS的次数总计。
- total_count_change:表示自上次获取OfferedIncompatibleQosStatus以来,发现的不兼容QoS的次数的变化值。
通过监视OfferedIncompatibleQosStatus的变化,可以了解DataWriter与订阅者之间QoS不兼容的情况。当DataWriter与订阅者发现不兼容的QoS时,total_count和total_count_change字段将增加,可以通过检查这些字段的值来获知不兼容QoS的次数和变化情况。
使用OfferedIncompatibleQosStatus可以实现一些功能,例如:
- 监测DataWriter与订阅者之间QoS的兼容性情况。
- 根据不兼容QoS的情况进行相应的处理,例如尝试重新配置QoS或与订阅者进行协商。
- 基于OfferedIncompatibleQosStatus的变化触发特定的操作或回调函数。
get_liveliness_lost_status
ReturnCode_t DataWriterImpl::get_liveliness_lost_status(LivelinessLostStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取LivelinessLostStatus并复制给status参数 status = liveliness_lost_status_; // 重置total_count和total_count_change liveliness_lost_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_lost(), false); return ReturnCode_t::RETCODE_OK; }
get_liveliness_lost_status函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 使用std::unique_lock对DataWriter的互斥锁进行加锁,以确保线程安全性。
- 将liveliness_lost_status_复制到status参数中。
- 重置liveliness_lost_status_中的total_count_change为0,以表示已经处理了这些变化。
- 通过调用user_datawriter_->get_statuscondition().get_impl()->set_status()设置相应的状态条件,将LivelinessLostStatus的变化通知给用户。
- 返回RETCODE_OK表示获取LivelinessLostStatus成功。
在数据通信中,活跃性(Liveliness)是指一个实体(如DataWriter)继续存在和活跃的状态。在发布-订阅模型中,DataWriter通过发送活跃性消息或定期发送心跳消息来证明自己的活跃状态。这样,订阅者(DataReader)能够确认DataWriter是否仍然处于活跃状态,以便维持通信链接。
活跃性的目的是确保数据通信中的实体保持活跃,以便保持通信链路的有效性和一致性。通过活跃性机制,可以检测到DataWriter是否不再活跃,即停止发送活跃性消息或心跳消息,或者由于某些原因无法满足活跃性要求。当DataWriter丢失活跃性时,订阅者可以做出相应的处理,如更新订阅者的状态、重新分配资源或采取其他容错措施。
get_sending_locators
ReturnCode_t DataWriterImpl::get_sending_locators(rtps::LocatorList& locators) const { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 获取DataWriter正在使用的发送定位器列表 rtps::LocatorList sending_locators = writer_->getRTPSParticipant()->getSendingLocators(); // 将发送定位器列表复制给输出参数locators locators = sending_locators; return ReturnCode_t::RETCODE_OK; }
get_sending_locators函数的主要步骤如下:
- 首先,检查DataWriter是否已启用,如果未启用则返回RETCODE_NOT_ENABLED。
- 调用writer_->getRTPSParticipant()->getSendingLocators()获取DataWriter正在使用的发送定位器列表。
- 将发送定位器列表复制给输出参数locators。
- 返回RETCODE_OK表示获取发送定位器列表成功。
通过调用get_sending_locators函数,可以获取DataWriter正在使用的发送定位器列表。发送定位器列表包含了DataWriter用于发送数据的网络定位器信息,可用于了解DataWriter当前的通信配置和连接信息。这对于监视和调试数据传输以及网络配置非常有用。