目录
源码结构
Publisher分析
Publisher 类分析
PublisherIImpl 类分析
源码结构
—builtin:该目录包含FastDDS使用的内置类型和协议的实现。
—core:该目录包含FastDDS库中使用的核心类和函数。这包括处理错误、管理内存和处理线程的类。
--domain:此目录包含DomainParticipant类的实现,它表示应用程序在特定域中的参与。
—log: FastDDS使用的日志系统实现。
—publisher:此目录包含用于发送数据的publisher类的实现。
—subscriber:此目录包含用于接收数据的subscriber类的实现。
—topic:这个目录包含topic类的实现,它定义了要发布和订阅的数据。
—utils:该目录包含整个FastDDS库中使用的各种实用程序函数和类。
Publisher分析
源码分析之前先看一个fastDDS pub 消息的demo 程序
#include <iostream> #include <string> #include "HelloWorldPubSubTypes.h" #include "HelloWorld.h" #include <fastdds/dds/domain/DomainParticipantFactory.hpp> #include <fastdds/dds/publisher/Publisher.hpp> #include <fastdds/dds/publisher/DataWriter.hpp> using namespace eprosima::fastdds::dds; int main() { // Create a DomainParticipant with default attributes DomainParticipant* participant = DomainParticipantFactory::get_instance()->create_participant(0); if (nullptr == participant) { std::cerr << "Error creating DomainParticipant." << std::endl; return EXIT_FAILURE; } // Register the HelloWorld type HelloWorldPubSubType helloWorldType; if (participant->register_type(helloWorldType) != ReturnCode_t::RETCODE_OK) { std::cerr << "Error registering HelloWorld type." << std::endl; return EXIT_FAILURE; } // Create a Publisher and a Topic Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr); Topic* topic = participant->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT); // Create a DataWriter for the HelloWorld topic DataWriter* writer = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT, nullptr); if (nullptr == writer) { std::cerr << "Error creating DataWriter." << std::endl; return EXIT_FAILURE; } // Publish the HelloWorld messages HelloWorld hello; hello.message("Hello, World!"); for (int i = 1; i <= 10; ++i) { hello.index(i); std::cout << "Sending: " << hello << std::endl; writer->write((void*)&hello); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } // Clean up participant->delete_datawriter(writer); participant->delete_publisher(publisher); participant->delete_topic(topic); DomainParticipantFactory::get_instance()->delete_participant(participant); return EXIT_SUCCESS; }
接下来再分析Publisher
关键组件包括:
- PublisherImpl:PublisherImpl 类是Publisher接口的具体实现。它负责创建并管理与此发布者相关的DataWriter对象。PublisherImpl 类还处理与发布者相关的QoS策略,并负责在匹配的订阅者出现时建立连接。
- DataWriter:DataWriter 类表示一个特定主题的数据发布者。它负责序列化用户数据并将其发送给订阅者。DataWriter 还处理与数据实例相关的生命周期管理,例如实例的注册和反注册。
- DataWriterImpl:DataWriterImpl 类是DataWriter接口的具体实现。它处理底层的数据序列化和网络通信。此外,DataWriterImpl 类还负责管理与DataWriter相关的QoS策略和实例生命周期。
- DataWriterListener:DataWriterListener 类定义了DataWriter在特定事件发生时回调的接口。这些事件包括匹配的订阅者出现、实例状态改变等。用户可以通过实现 DataWriterListener 接口来定制事件处理。
- QoS:在 src/cpp/fastdds/publisher 目录中,还包含了一些与发布者和数据写入器相关的QoS策略实现。这些策略可以根据需要配置,以满足特定应用程序的性能和可靠性要求。
下面是距离的文件说明:
—DataWriter.cpp和DataWriterImpl.cpp:这两个文件包含了DataWriter类的实现和实现细节。DataWriter类用于发送数据。
—DataWriterHistory.cpp:该文件包含DataWriterHistory类的实现,用于管理DataWriter发送的数据样本的历史记录。
—Publisher.cpp和PublisherImpl.cpp:这些文件包含Publisher类的实现及其实现细节。Publisher类用于创建DataWriter对象并管理数据的发送。
—DataWriterImpl.hpp、DataWriterHistory.hpp、PublisherImpl.hpp:这是上述cpp文件对应的头文件。它们声明类和类的方法。
—filtering:该目录包含内容过滤功能的实现,该功能允许订阅服务器只接收符合特定条件的数据。
—history:包含历史缓存的实现,保存DataWriter或DataReader发送或接收的数据样本。
—qos:该目录包含qos (Quality of Service)策略的实现,qos策略控制系统行为的各个方面,如可靠性、持久性和资源限制。
Publisher 类分析
下面是Publisher类的成员变量和函数:
成员变量
PublisherImpl* impl_:这是一个指向Publisher类实现的指针。发布者的实际功能在这个类中实现。
成员函数
●create_datawriter():该函数用于创建用于发送数据的DataWriter。
●delete_datawriter():删除DataWriter。
●lookup_datawriter():该函数用于根据主题名称查找DataWriter。
●get_qos()和set_qos():这些函数用于获取和设置发布者的服务质量(QoS)策略。
●get_listener()和set_listener():这些函数用于获取和设置Publisher的PublisherListener。
●suspend_publications()和resume_publications():这两个函数用于挂起和恢复数据的发送。
●begin_coherent_changes()和end_coherent_changes():这些函数用于开始和结束应该一起发送的一组更改。
●wait_for_acknowledgement():该函数用于等待所有数据被订阅方确认。
●get_participant():该函数用于获取Publisher所属的DomainParticipant。
delete_contained_entities():该函数用于删除Publisher中包含的所有实体(如DataWriter)。
●set_default_datawriter_qos()和get_default_datawriter_qos():这些函数用于设置和获取该Publisher创建的DataWriter的默认QoS策略。
●copy_from_topic_qos():将QoS策略从Topic复制到DataWriter。
●get_datawriter_qos_from_profile():从配置文件中获取dataswriter对应的QoS策略。
●get_datawriters():该函数用于获取该Publisher创建的所有DataWriter对象。
●has_datawriters():这个函数用于检查Publisher是否有DataWriter对象。
Publisher类充当facade,将调用委托给PublisherImpl类。这些方法的实际实现隐藏在PublisherImpl类中。
PS:begin_coherent_changes()和end_coherent_changes() 这俩函数还没有实现。
/** * @brief Signals the beginning of a set of coherent cache changes using the Datawriters attached to the publisher * * @return RETCODE_OK if successful, an error code otherwise * @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED */ RTPS_DllAPI ReturnCode_t begin_coherent_changes();
PublisherIImpl 类分析
类图如下:
PublisherImpl 类是 Publisher 的实际实现,它包含了 Publisher 的行为。这个类不应直接使用,而是通过 DomainParticipant 的 create_publisher 方法创建 Publisher。
成员变量:
- DomainParticipantImpl* participant_:指向关联的 DomainParticipantImpl 实例的指针。
- PublisherQos qos_:存储 Publisher 的 QoS(Quality of Service,服务质量)策略。
- std::map<std::string, std::vector<DataWriterImpl*>> writers_:存储与 DataWriter 相关的指针。主题名称作为键值。
- std::mutex mtx_writers_:用于保护对 writers_ 成员变量的访问的互斥锁。
- PublisherListener* listener_:指向 PublisherListener 的指针,用于处理 Publisher 事件。
- PublisherWriterListener publisher_listener_:监听 DataWriter 事件的内部类实例。
- Publisher* user_publisher_:指向用户层 Publisher 的指针。
- fastrtps::rtps::RTPSParticipant* rtps_participant_:指向底层 RTPSParticipant 实例的指针。
- DataWriterQos default_datawriter_qos_:存储默认的 DataWriter QoS 策略。
- fastrtps::rtps::InstanceHandle_t handle_:存储 Publisher 的实例句柄。
方法:
- PublisherImpl(DomainParticipantImpl* p, const PublisherQos& qos, PublisherListener* p_listen = nullptr):构造函数,创建 PublisherImpl 实例。
- ~PublisherImpl():析构函数,销毁 PublisherImpl 实例。
- enable():启用 PublisherImpl。
- get_qos() const:获取 Publisher 的 QoS 策略。
- set_qos(const PublisherQos& qos):设置 Publisher 的 QoS 策略。
- get_listener() const:获取 PublisherListener 指针。
- set_listener(PublisherListener* listener):设置 PublisherListener。
- create_datawriter(...):创建 DataWriter,有多个重载版本。
- delete_datawriter(const DataWriter* writer):删除指定的 DataWriter。
- lookup_datawriter(const std::string& topic_name) const:根据主题名称查找对应的 DataWriter。
- contains_entity(const fastrtps::rtps::InstanceHandle_t& handle) const:检查给定实例句柄是否属于 Publisher。
- get_datawriters(std::vector<DataWriter*>& writers) const:获取 Publisher 创建的所有 DataWriter 对象。
- has_datawriters() const:检查 Publisher 是否有 DataWriter 对象。
- wait_for_acknowledgments(const fastrtps::Duration_t& max_wait):等待订阅方确认所有数据。
- get_participant() const:获取所属的 DomainParticipant。
- delete_contained_entities():删除包含的所有实体,如 DataWriter。
- set_default_datawriter_qos(const DataWriterQos& qos):设置默认的 DataWriter QoS 策略。
- reset_default_datawriter_qos():重置默认的 DataWriter QoS 策略。
- get_default_datawriter_qos() const:获取默认的 DataWriter QoS 策略。
- get_datawriter_qos_from_profile(const std::string& profile_name, DataWriterQos& qos) const:从配置文件中获取 DataWriter 的 QoS 策略。
- disable():移除所有监听器,以便安静地销毁 Publisher。
- type_in_use(const std::string& type_name) const:检查是否有任何 DataWriter 使用给定的类型名称。
- get_listener_for(const StatusMask& status):返回处理给定状态回调的最合适的监听器,如果没有合适的监听器,则返回 nullptr。
- can_be_deleted():检查 Publisher 是否可以被删除。
另外,还有一些被注释掉的方法(可能在未来版本中实现):
- suspend_publications()
- resume_publications()
- begin_coherent_changes()
- end_coherent_changes()
- copy_from_topic_qos(...)