fastdds:传输层SHM和DATA-SHARING的区别

news2025/4/20 3:46:06

 下图是fastdds官方的图,清晰地展示了dds支持的传输层:

根据通信双方的相对位置(跨机器、同机器跨进程、同进程)的不同选择合适的传输层,是通信中间件必须要考虑的事情。

跨机器:udp、tcp

跨机器通信,只能通过网络, fastdds支持UDP和TCP。

同机器,跨进程:SHM、DATA-SHARING

在同一个机器中,使用UDP和TCP进行通信,当然也是可以的。但是,从性能的角度来考虑,更推荐使用SHM,SHM和UDP/TCP相比有2点优势:

1、减少系统调用次数

共享内存创建完成之后,后边使用的时候直接是内存操作,而UCP/TCP每次发送或者接收的的时候,都要通过系统调用。

2、支持超长消息

UDP/TCP受协议栈的影响,可能会对消息进行分片和组装。

SHM也不完全是优点,也有自己的缺点,比如实现复杂:

1、使用UDP/TCP来收发包,功能成熟,拿来就用。使用SHM进行通信的话,很多功能需要自己实现,比如进程间的同步(通过信号量来实现),buffer的管理(segment)等。

 

同进程:INTRA-PROCESS

如果writer和reader在同一个进程,那么可以使用这种传输方式,writer线程会直接调用reader的回调函数,在同一个线程中。

如下图所示,使用如下命令gdb --args ./delivery_mechanisms pubsub -m INTRA-PROCESS,并对函数PubSubApp::on_data_available设置断点,可以看到调用栈,使用INTRA-PROCESS的时候,subscriber的接收回调函数直接被发送线程调用,在同一个线程中。这是一种效率最高的方式,去掉了中间缓存的环节。

 

在默认情况下,在同进程的通信使用intra-process,同机器跨进程使用SHM,跨机器使用UDP。

从上图可以看出,同机器的不同进程间通信可以使用SHM,也可以使用DATA-SHARING,那么两者有什么区别呢?

本文使用fastdds中的例子delivery_mechanisms,这个例子可以测试不同的传输层。

本文先介绍SHM和DATA-SHARING传输层的数据收发流程,最后总结两者之间的异同。

1SHM收发流程

简单来说,如果要看SHM的发送流程,那么可以以函数SharedMemTransport::send为中心进行梳理,通过gdb对这个函数设置断点,可以查看这个函数的调用栈,阅读这个函数的代码,可以看这个函数的发送操作;如果要看SHM的接收过程,可以通过函数SharedMemChannelResource::perform_listen_operation来梳理,对于TCP,UDP,SHM来说,三种传输层都实现了自己的函数perform_listen_operation,这个函数监听接收数据。

上图中左边是TranportDescriptor,右边是Transport。用户在创建传输层的时候,并不需要直接创建一个Transport,而是构造一个传输层的描述符即可,fastdds内部根据描述符来创建传输层。这是典型的设计模式中的建造者模式,当构造一个对象时,如果需要传递较多的参数,并且参数之间还有一些依赖关系,那么构造函数实现起来就会比较复杂,这个时候就可以使用建造者模式,在建造者中对参数进行检查,如果没有问题则直接构造对象,让构造函数只专注于对象的构造,至于参数的检查工作,放到构造器中来完成。

上图中左图的SenderResource负责发送数据,右图的ChannelResource负责接收数据。

UDP、TCP和SHM的类有平行的关系,他们也有自己的TransportDescriptor、Tranport、SenderResource、ChannelResource类。 通过一个基类或者接口派生出3个传输层,体现了c++中多态的使用。

 

Locator用来描述一个端点,其中kind包括LOCATOR_KIND_TCPv4、LOCATOR_KIND_UDPv4、 LOCATOR_KIND_TCPv6、LOCATOR_KIND_UDPv6、LOCATOR_KIND_SHM;port就是通信的端口,比如7400、7410这些;address是地址,在UDP、TCP中就是ip地址。只要知道要发送对象的Locator信息,那么数据就可以发送出去。

Locator {
   int32_t kind;
   uint32_t port;
   std::string address;
}

1.1发送

下图是对SharedMemTransport::send设置断点,看到的调用栈。

发送流程:

①用户调用DataWriter::write进行发送

②在DataWriterHistory中,将要发送的数据封装到一个CacheChange_t中

③在发送侧,dds和rtps之间的接口类是dds侧的DataWriterHistory,rtps侧的WriterHistory

④rtps中的BaseWriter进行发送

⑤RtpsParticipantImpl::sendSync进行发送,SenderResource属于Participant中的资源,在Participant中调用SenderResource的send函数,最终调用到SharedMemTransport的send

 

然后我们再来看看SharedMemTransport::send函数内部的实现:

bool SharedMemTransport::send(
        const std::vector<NetworkBuffer>& buffers,
        uint32_t total_bytes,
        fastdds::rtps::LocatorsIterator* destination_locators_begin,
        fastdds::rtps::LocatorsIterator* destination_locators_end,
        const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
    ...
    fastdds::rtps::LocatorsIterator& it = *destination_locators_begin;

    bool ret = true;

    std::shared_ptr<SharedMemManager::Buffer> shared_buffer;

    try
    {
        while (it != *destination_locators_end)
        {
            if (IsLocatorSupported(*it))
            {
                if (shared_buffer == nullptr)
                {
                    shared_buffer = copy_to_shared_buffer(buffers, total_bytes, max_blocking_time_point);
                }

                ret &= send(shared_buffer, *it);
                ...
            }

            ++it;
        }
    }
    ...
    return ret;

}

①对于第一次循环,需要通过函数copy_to_shared_buffer将数据保存到共享内存中

在函数copy_to_shared_buffer中打印shared_mem_segment_的信息,可以看到segment name是fastdds_aec0cadd732cebe2,这个就是发送数据要拷贝的共享内存。所以说,使用共享内存发送数据时,会进行一次数据拷贝。

 

将数据拷贝到共享内存之后,就是通知接收者,通知接收者需要两个步骤来实现:

①构造一个BufferDescrptor(简称BD),并将BD发送到对应的端口

②环形监听者

构造一个BufferDescriptor:

唤醒监听者:

1.2接收

对Subscriber的接收回调设置断点,调用栈如下:

2DATA-SHARING收发流程

DATA-SHARING不是一个标准的传输层,没有TransportDescriptor、Transport、SenderResource、ChannelResource这些资源。

2.1发送

delivery mechanisms这个例子,publish的时候,并不是构造了一个临时的变量发送出去的,而是通过load_sample首先从底层申请了一块内存,而这块内存就是从data sharing的writer pool中申请的,这样数据直接就是保存在这里的,payload pool就是从这里申请的,申请的是data sharing的writer pool,而writer pool就是基于共享内存创建的。所以说,在发送时,没有拷贝。

std::shared_ptr<IPayloadPool> DataWriterImpl::get_payload_pool()

{

    if (!payload_pool_)

    {

        // Avoid calling the serialization size functors on PREALLOCATED mode

        fixed_payload_size_ =

                pool_config_.memory_policy == PREALLOCATED_MEMORY_MODE ? pool_config_.payload_initial_size : 0u;

 

        // Get payload pool reference and allocate space for our history

        if (is_data_sharing_compatible_)

        {

            payload_pool_ = DataSharingPayloadPool::get_writer_pool(pool_config_);

        }

        else

        {

            payload_pool_ = TopicPayloadPoolRegistry::get(topic_->get_name(), pool_config_);

            if (!std::static_pointer_cast<ITopicPayloadPool>(payload_pool_)->reserve_history(pool_config_, false))

            {

                payload_pool_.reset();

            }

        }

 

        // Prepare loans collection for plain types only

        if (type_->is_plain(data_representation_))

        {

            loans_.reset(new LoanCollection(pool_config_));

        }

    }

 

    return payload_pool_;

}

在这个函数里把数据发送到对方

DeliveryRetCode StatefulWriter::deliver_sample_nts(

        CacheChange_t* cache_change,

        RTPSMessageGroup& group,

        LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl

        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)

{

    DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED;

 

    if (there_are_local_readers_)

    {

        deliver_sample_to_intraprocesses(cache_change);

    }

 

    // Process datasharing then

    if (there_are_datasharing_readers_)

    {

        deliver_sample_to_datasharing(cache_change);

    }

 

    if (there_are_remote_readers_)

    {

        ret_code = deliver_sample_to_network(cache_change, group, locator_selector, max_blocking_time);

    }

 

    check_acked_status();

 

    return ret_code;

}

 发送侧通知接收侧

(gdb) bt
#0  eprosima::fastdds::rtps::DataSharingNotifier::notify (this=0x7fffe00029b0) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/DataSharingNotifier.hpp:78
#1  0x00007ffff7788956 in eprosima::fastdds::rtps::ReaderLocator::datasharing_notify (this=0x7fffe0002568) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/ReaderLocator.cpp:237
#2  0x00007ffff77a3cf8 in eprosima::fastdds::rtps::ReaderProxy::datasharing_notify (this=0x7fffe0002560) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.hpp:388
#3  0x00007ffff779761a in eprosima::fastdds::rtps::StatefulWriter::deliver_sample_to_datasharing (this=0x555555789150, change=0x555555786f70)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:612
#4  0x00007ffff779eab1 in eprosima::fastdds::rtps::StatefulWriter::deliver_sample_nts (this=0x555555789150, cache_change=0x555555786f70, group=..., locator_selector=..., max_blocking_time=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:2130
#5  0x00007ffff762d4af in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample_impl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode> (this=0x55555567f9d0, writer=0x555555789150, change=0x555555786f70, max_blocking_time=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1191
#6  0x00007ffff762a44c in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample (
    this=0x55555567f9d0, writer=0x555555789150, change=0x555555786f70, max_blocking_time=...) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1012
#7  0x00007ffff7796415 in eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history (this=0x555555789150, change=0x555555786f70, max_blocking_time=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:386
#8  0x00007ffff7644784 in eprosima::fastdds::rtps::WriterHistory::notify_writer (this=0x555555788ca0, a_change=0x555555786f70, max_blocking_time=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/history/WriterHistory.cpp:201
#9  0x00007ffff71efd05 in eprosima::fastdds::rtps::WriterHistory::add_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastdds::rtps::ChangeKind_t, void const*, eprosima::fastdds::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastdds::rtps::CacheChange_t *, eprosima::fastdds::rtps::WriteParams &, struct {...}, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > >) (this=0x555555788ca0, a_change=0x555555786f70,
    wparams=..., pre_commit=..., max_blocking_time=...) at /root/Fast-DDS/Fast-DDS/include/fastdds/rtps/history/WriterHistory.hpp:299
#10 0x00007ffff71ef6a0 in eprosima::fastdds::dds::DataWriterHistory::add_pub_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastdds::rtps::ChangeKind_t, void const*, eprosima::fastdds::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastdds::rtps::CacheChange_t *, eprosima::fastdds::rtps::WriteParams &, struct {...}, std::unique_lock<std::recursive_timed_mutex> &, const std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > > &) (this=0x555555788ca0, change=0x555555786f70, wparams=..., pre_commit=..., lock=..., max_blocking_time=...) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterHistory.hpp:159
#11 0x00007ffff71e830d in eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change (this=0x55555577d430, change_kind=eprosima::fastdds::rtps::ALIVE, data=0x7ffff6233ef4, wparams=..., handle=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:1057
#12 0x00007ffff71e88e0 in eprosima::fastdds::dds::DataWriterImpl::create_new_change_with_params (this=0x55555577d430, changeKind=eprosima::fastdds::rtps::ALIVE, data=0x7ffff6233ef4, wparams=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:1131
#13 0x00007ffff71e7ba9 in eprosima::fastdds::dds::DataWriterImpl::create_new_change (this=0x55555577d430, changeKind=eprosima::fastdds::rtps::ALIVE, data=0x7ffff6233ef4)
    at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:976
#14 0x00007ffff71e6312 in eprosima::fastdds::dds::DataWriterImpl::write (this=0x55555577d430, data=0x7ffff6233ef4) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:655
#15 0x00007ffff71d9d8f in eprosima::fastdds::dds::DataWriter::write (this=0x55555577db90, data=0x7ffff6233ef4) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriter.cpp:84
#16 0x0000555555601386 in eprosima::fastdds::examples::delivery_mechanisms::PublisherApp::publish (this=0x555555657c60) at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/PublisherApp.cpp:295
#17 0x00005555556010b3 in eprosima::fastdds::examples::delivery_mechanisms::PublisherApp::run (this=0x555555657c60) at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/PublisherApp.cpp:266
#18 0x0000555555609105 in std::__invoke_impl<void, void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application>>(std::__invoke_memfun_deref, void (eprosima::fastdds::examples::delivery_mechanisms::Application::*&&)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application>&&) (
    __f=@0x55555565e578: &virtual table offset 16, __t=...) at /usr/include/c++/11/bits/invoke.h:74
#19 0x000055555560903d in std::__invoke<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > (
    __fn=@0x55555565e578: &virtual table offset 16) at /usr/include/c++/11/bits/invoke.h:96
#20 0x0000555555608f9d in std::thread::_Invoker<std::tuple<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > >::_M_invoke<0ul, 1ul> (this=0x55555565e568) at /usr/include/c++/11/bits/std_thread.h:259
#21 0x0000555555608f52 in std::thread::_Invoker<std::tuple<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > >::operator() (this=0x55555565e568) at /usr/include/c++/11/bits/std_thread.h:266
#22 0x0000555555608f32 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > > >::_M_run (this=0x55555565e560) at /usr/include/c++/11/bits/std_thread.h:211
#23 0x00007ffff60dc253 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#24 0x00007ffff5c94ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#25 0x00007ffff5d26850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
(gdb)
 

 

DATA-SHARING初始化writer pool

(gdb) bt
#0  eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> > (this=0x5555556633c0,
    writer=0x555555789150, shared_dir="") at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/./WriterPool.hpp:132
#1  0x00007ffff760f455 in eprosima::fastdds::rtps::WriterPool::init_shared_memory (this=0x5555556633c0, writer=0x555555789150, shared_dir="")
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/./WriterPool.hpp:248
#2  0x00007ffff777f315 in eprosima::fastdds::rtps::BaseWriter::init (this=0x555555789150, att=...) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/BaseWriter.cpp:388
#3  0x00007ffff777e14f in eprosima::fastdds::rtps::BaseWriter::BaseWriter (this=0x555555789150, impl=0x555555663600, guid=..., att=..., flow_controller=0x55555567f9d0, hist=0x555555788ca0,
    listen=0x55555577d930) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/BaseWriter.cpp:82
#4  0x00007ffff7794db9 in eprosima::fastdds::rtps::StatefulWriter::StatefulWriter (this=0x555555789150, pimpl=0x555555663600, guid=..., att=..., flow_controller=0x55555567f9d0, history=0x555555788ca0,
    listener=0x55555577d930) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:203
#5  0x00007ffff7685df0 in operator() (__closure=0x7fffffffbc50, guid=..., watt=..., flow_controller=0x55555567f9d0, persistence=0x0, is_reliable=true)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp:1246
#6  0x00007ffff768f132 in eprosima::fastdds::rtps::RTPSParticipantImpl::create_writer<eprosima::fastdds::rtps::RTPSParticipantImpl::create_writer(eprosima::fastdds::rtps::RTPSWriter**, eprosima::fastdds::rtps::WriterAttributes&, eprosima::fastdds::rtps::WriterHistory*, eprosima::fastdds::rtps::WriterListener*, const eprosima::fastdds::rtps::EntityId_t&, bool)::<lambda(const GUID_t&, eprosima::fastdds::rtps::WriterAttributes&, eprosima::fastdds::rtps::FlowController*, eprosima::fastdds::rtps::IPersistenceService*, bool)> >(eprosima::fastdds::rtps::RTPSWriter **, eprosima::fastdds::rtps::WriterAttributes &, const eprosima::fastdds::rtps::EntityId_t &, bool, const struct {...} &) (this=0x555555663600, writer_out=0x7fffffffbe88, param=..., entity_id=..., is_builtin=false, callback=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp:955
#7  0x00007ffff7686266 in eprosima::fastdds::rtps::RTPSParticipantImpl::create_writer (this=0x555555663600, WriterOut=0x7fffffffbe88, watt=..., hist=0x555555788ca0, listen=0x55555577d930, entityId=...,
    isBuiltin=false) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp:1264
#8  0x00007ffff76d5f74 in eprosima::fastdds::rtps::RTPSDomainImpl::create_rtps_writer (p=0x5555556631f0, entity_id=..., watt=..., hist=0x555555788ca0, listen=0x55555577d930)
    at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/RTPSDomain.cpp:384
#9  0x00007ffff71e4834 in eprosima::fastdds::dds::DataWriterImpl::enable (this=0x55555577d430) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:375
#10 0x00007ffff783570a in eprosima::fastdds::statistics::dds::DataWriterImpl::enable (this=0x55555577d430) at /root/Fast-DDS/Fast-DDS/src/cpp/statistics/fastdds/publisher/DataWriterImpl.hpp:77
#11 0x00007ffff71d9ceb in eprosima::fastdds::dds::DataWriter::enable (this=0x55555577db90) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriter.cpp:63
#12 0x00007ffff7204a76 in eprosima::fastdds::dds::PublisherImpl::create_datawriter (this=0x55555577c8a0, topic=0x555555668dd0, impl=0x55555577d430, mask=...)
    at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/PublisherImpl.cpp:277
#13 0x00007ffff720480c in eprosima::fastdds::dds::PublisherImpl::create_datawriter (this=0x55555577c8a0, topic=0x555555668dd0, qos=..., listener=0x555555657c68, mask=...,
    payload_pool=std::shared_ptr<eprosima::fastdds::rtps::IPayloadPool> (empty) = {...}) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/PublisherImpl.cpp:257
#14 0x00007ffff7202b0c in eprosima::fastdds::dds::Publisher::create_datawriter (this=0x55555577ced0, topic=0x555555668dd0, qos=..., listener=0x555555657c68, mask=...,
    payload_pool=std::shared_ptr<eprosima::fastdds::rtps::IPayloadPool> (empty) = {...}) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/Publisher.cpp:119
#15 0x0000555555600ae1 in eprosima::fastdds::examples::delivery_mechanisms::PublisherApp::PublisherApp (this=0x555555657c60, config=..., topic_name="delivery_mechanisms_topic")
    at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/PublisherApp.cpp:221
#16 0x00005555555daf5f in __gnu_cxx::new_allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>::construct<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdddf,
    __p=0x555555657c60) at /usr/include/c++/11/ext/new_allocator.h:162
#17 0x00005555555daafc in std::allocator_traits<std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp> >::construct<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (__a=...,
    __p=0x555555657c60) at /usr/include/c++/11/bits/alloc_traits.h:516
#18 0x00005555555da3d7 in std::_Sp_counted_ptr_inplace<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x555555657c50, __a=...) at /usr/include/c++/11/bits/shared_ptr_base.h:519
#19 0x00005555555d9d73 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdf88, __p=@0x7fffffffdf80: 0x0, __a=...) at /usr/include/c++/11/bits/shared_ptr_base.h:650
#20 0x00005555555d9ade in std::__shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdf80, __tag=...) at /usr/include/c++/11/bits/shared_ptr_base.h:1342
#21 0x00005555555d97e1 in std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>::shared_ptr<std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdf80,
    __tag=...) at /usr/include/c++/11/bits/shared_ptr.h:409
#22 0x00005555555d948f in std::allocate_shared<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (__a=...)
    at /usr/include/c++/11/bits/shared_ptr.h:863
#23 0x00005555555d916d in std::make_shared<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> () at /usr/include/c++/11/bits/shared_ptr.h:879
#24 0x00005555555d885a in eprosima::fastdds::examples::delivery_mechanisms::Application::make_app (config=..., topic_name="delivery_mechanisms_topic")
    at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/Application.cpp:41
--Type <RET> for more, q to quit, c to continue without paging--
#25 0x0000555555603cb9 in main (argc=4, argv=0x7fffffffe398) at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/main.cpp:54
(gdb) p shared_dir
$1 = ""
(gdb) s
136             segment_id_ = writer->getGuid();
(gdb) n
137             segment_name_ = generate_segment_name(shared_dir, segment_id_);
(gdb) n
138             std::unique_ptr<T> local_segment;
(gdb) p segment_id_
$2 = {guidPrefix = {static size = 12, value = "\001\017\325\227\065\253k[\000\000\000"}, entityId = {static size = 4, value = "\000\000\001\003"}}
(gdb) p segment_name_
$3 = "fast_datasharing_01.0f.d5.97.35.ab.6b.5b.00.00.00.00_0.0.1.3"
(gdb)
 

2.2接收

DataSharingListener::run

 

3相同点:均通过共享内存进行通信

3.1SHM

在linux下,共享内存默认使用/dev/shm临时文件系统。如下是使用SHM的时候,启动了一个publisher和subscriber,在/dev/shm中创建的文件。

(1)SHM使用了端口的概念

fastdds中对SHM的使用,借鉴了UDP的通信方式,也使用了端口这个概念。在通信中,端口的表示这个通信通道被哪个进程使用。

上图中7400、7410、7411、7412、7413端口与下文中UDP的端口,作用是相同的。

fastdds:传输层端口号计算规则_fastdds mechanism-CSDN博客

 

下图中显示了SHM传输层的通信时序图。可以看到port是用来传输descriptor的,真正的数据是通过fastdds_56f311faa23f4389这样的文件进行传输的。descriptor在网络通信中是经常见到的一个概念,作为数据的描述符描述了数据的基本信息,比如数据的起始地址,数据的长度等。在网卡驱动中,经常说的BD,全称是buffer descriptor,也是网络收发包的描述符,与这里的descriptor是类似的。

(2)文件的作用

①锁,进程间同步

上图中以sem开头,以mutex结尾的文件用于进程间同步。以sem.xxx.port7413_mutex为例,如果一个进程监听7413这个端口的数据,那么就会wait这个锁;如果有进程要向7413发数据,那么写好数据之后,便会notiy这个锁,从而接收方就会被唤醒,进而接收并处理数据。

②fastdds_port7413

这个是端口文件,用于传输descriptor,如果有进程要向7413发送数据,那么便会将descriptor写到这个文件中。

③fastdds_56f311faa23f4389

数据文件,进程要发送的数据都会写到这个文件中。

也就是说,通过共享内存发送数据的时候,首先要将数据写到这个文件中,然后再填充一个desctiptor,将descriptor文件放到port文件中,最后调用notify,环形对应port的进程。

当我们想要了解这些文件是什么用的时候,除了从文件的名字、代码、文档中去查找确认之外,还可以通过在gdb中对shm_open设置断点,从而查看调用栈的方式去学习。/dev/shm中的文件通过shm_open打开。

 

3.2DATA-SHARING

如下是使用DATA-SHARING,启动了一个publisher和subscriber的情况下,在/dev/shm下创建的文件。从下图中可以看出,使用DATA-SHARING 的时候,/dev/shm下的文件,除了使用SHM时的哪些文件之外,还多了一个文件,比如fast_datasharing_01.0f.d5.97.90.46.db.3a.00.00.00.00_0.0.1.3。

 

那么文件fast_datasharing_01.0f.d5.97.90.46.db.3a.00.00.00.00_0.0.1.3是做什么用的呢?

首先我们可以在gdb中对shm_open设断点,查看这个函数的调用栈。调用栈如下图所示:

从中我们可以看到这个函数,可以大胆猜测在使用DATA-SHARING的时候,这个文件用于数据的传输。而原来与SHM传输层相同的文件,用于服务发现阶段。

#8  0x00007ffff7610de5 in eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> > (
    this=0x5555556633c0, writer=0x555555789150, shared_dir="") at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/./WriterPool.hpp:183

4不同点

(1)SHM是标准的传输层port based的传输层,DATA-SHARING不是

(2)相对于SHM,DATA-SHARING在发送侧少一次拷贝

(3)SHM数据和Descriptor保存在了不同的文件中,数据保存在fastdds_683379ac5b6c3ba5中,descriptor发送到fastdds_port7413中;而DATA-SHARING的数据和描述符保存在了同一块内存中,下边的函数,node就相当于SHM中的descriptor,可以看到node和数据所在的内存是爱着的。

    void add_to_shared_history(

            const CacheChange_t* cache_change)

    {

        assert(cache_change);

        assert(cache_change->serializedPayload.data);

        assert(cache_change->serializedPayload.payload_owner == this);

        assert(free_history_size_ > 0);

 

        // Fill the payload metadata with the change info

        PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data);

        node->status(ALIVE);

        node->data_length(cache_change->serializedPayload.length);

        node->source_timestamp(cache_change->sourceTimestamp);

        node->writer_GUID(cache_change->writerGUID);

        node->instance_handle(cache_change->instanceHandle);

        if (cache_change->write_params.related_sample_identity() != SampleIdentity::unknown())

        {

            node->related_sample_identity(cache_change->write_params.related_sample_identity());

        }

 

        // Set the sequence number last, it signals the data is ready

        node->sequence_number(cache_change->sequenceNumber);

 

        // Add it to the history

        history_[static_cast<uint32_t>(descriptor_->notified_end)] = segment_->get_offset_from_address(node);

        EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Change added to shared history"

                << " with SN " << cache_change->sequenceNumber);

        advance(descriptor_->notified_end);

        --

(4)SHM发送侧和接收侧的notify和wait是以sem.fastdds_portxxxx_mutex为桥梁;DATA-SHARING的notify和wait就是以数据传输文件为桥梁。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2338445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

树莓派_利用Ubuntu搭建gitlab

树莓派_利用Ubuntu搭建gitlab 一、给树莓派3A搭建基本系统 1、下载系统镜像 https://cdimage.ubuntu.com/ubuntu/releases/18.04/release/ 2、准备系统SD卡 二、给树莓派设备联网 1、串口后台登录 使用串口登录后台是最便捷的&#xff0c;因为前期网络可能不好直接成功 默…

ARINC818协议(三)

源特定参数 源特定参数被定义&#xff0c;用于在源和目的之间进行传输 源特定参数包括初始化&#xff0c;合适的解释&#xff0c;周期性的验证。 gamma or palette tables&#xff1a;伽马或者调色板 color format:颜色格式 Brightness and backlight control &#xff1a;亮度…

得佳胜哲讯科技 SAP项目启动会:胶带智造新起点 数字转型新征程

在全球制造业加速向数字化、智能化转型的浪潮中&#xff0c;胶带制造行业正迎来以“自动化生产、数据化运营、智能化决策”为核心的新变革。工业互联网、大数据分析与智能装备的深度融合&#xff0c;正推动胶带制造从传统生产模式向“柔性化生产精准质量控制全链路追溯”的智慧…

万字解析TCP

通过学习视频加博客的组合形式&#xff0c;整理了一些关于TCP协议的知识。 *图源&#xff1a;临界~的csdn博客。 一、TCP建立连接 TCP的建立连接&#xff0c;大致可以分为面向连接、TCP报文结构、TCP的三次握手、TCP的建立状态、SYN泛洪攻击。 1.1、面向连接 面向连接 --- …

2025年大数据实训室建设及大数据实训平台解决方案

一、引言 在数字化浪潮中&#xff0c;大数据技术已成为推动各行业创新发展的核心驱动力。从金融领域的风险预测到医疗行业的精准诊断&#xff0c;从电商平台的个性化推荐到交通系统的智能调度&#xff0c;大数据的应用无处不在。据权威机构预测&#xff0c;到 2025 年&#xf…

贪心、动态规划、其它算法基本原理和步骤

目录 1. 贪心1.1 贪心算法的基本步骤1.2 贪心算法实战1.2.1 贪心的经典问题1.2.2 贪心解决数组与子序列问题1.2.3 贪心解决区间调度问题1.2.4 贪心解决动态决策问题1.2.5 贪心解决一些复杂场景应用 2. 动态规划2.1 动态规划的基本步骤和一些优化2.2 动态规划实战2.2.1 斐波那契…

python-各种文件(txt,xls,csv,sql,二进制文件)读写操作、文件类型转换、数据分析代码讲解

1.文件txt读写标准用法 1.1写入文件 要读取文件&#xff0c;首先得使用 open() 函数打开文件。 file open(file_path, moder, encodingNone) file_path&#xff1a;文件的路径&#xff0c;可以是绝对路径或者相对路径。mode&#xff1a;文件打开模式&#xff0c;r 代表以…

ctfshow-大赛原题-web702

因为该题没有理解到位&#xff0c;导致看wp也一直出错&#xff0c;特此反思一下。 参考yu22x师傅的文章 &#xff1a;CTFSHOW大赛原题篇(web696-web710)_ctfshow 大赛原题-CSDN博客 首先拿到题目&#xff1a; // www.zip 下载源码 我们的思路就是包含一个css文件&#xff0c;…

Triton(2)——Triton源码接结构

1 triton 3.0.0 源码结构 triton docs/&#xff1a;项目文档 cmake/&#xff1a;构建配置相关 bin/&#xff1a;工具、脚本 CmakeLists.txt&#xff1a;cmake 配置文件 LSCENSE README.md Pyproject.toml&#xff1a;python 项目配置文件 utils/&#xff1a;项目配置文…

容器docker入门学习

这里写目录标题 容器容器的软件厂商 dockerdocker引擎 虚拟化虚拟化技术 docker安装详解1、安装检查2、安装yum相关的工具3、安装docker-ce软件4、查看docker版本5、启动docker服务6、设置docker开机启动7、查看有哪些docker容器运行进程8、查看容器里有哪些镜像9、下载nginx软…

HarmonyOS NEXT开发教程:全局悬浮窗

今天跟大家分享一下HarmonyOS开发中的悬浮窗。 对于悬浮窗&#xff0c;可能有的同学会想到使用层叠布局是否可以实现&#xff0c;将悬浮窗叠在导航栏组件Tabs上&#xff0c;像这样&#xff1a; Stack({alignContent:Alignment.BottomEnd}){Tabs({barPosition:BarPosition.End…

解锁元生代:ComfyUI工作流与云原生后端的深度融合

目录 蓝耘元生代&#xff1a;智算新势力崛起​ ComfyUI 工作流创建详解​ ComfyUI 初印象​ 蓝耘平台上搭建 ComfyUI 工作流​ 构建基础工作流实操​ 代码示例与原理剖析​ 云原生后端技术全景 云原生后端概念解析​ 核心技术深度解读​ 蓝耘元生代中两者的紧密联系​…

STM32 基本GPIO控制

目录 GPIO基础知识 ​编辑IO八种工作模式 固件库实现LED点灯 蜂鸣器 按键基础知识 ​编辑继电器 震动传感器 433M无线模块 GPIO基础知识 GPIO(General-Purpose input/output,通用输入/输出接口) 用于感知外部信号&#xff08;输入模式&#xff09;和控制外部设备&…

汽车免拆诊断案例 | 2019款大众途观L车鼓风机偶尔不工作

故障现象 一辆2019款大众途观L车&#xff0c;搭载DKV发动机和0DE双离合变速器&#xff0c;累计行驶里程约为8万km。车主进厂反映&#xff0c;鼓风机偶尔不工作。 故障诊断  接车后试车&#xff0c;鼓风机各挡位均工作正常。用故障检测仪检测&#xff0c;空调控制单元&#x…

FastAPI与SQLAlchemy数据库集成

title: FastAPI与SQLAlchemy数据库集成 date: 2025/04/17 15:33:34 updated: 2025/04/17 15:33:34 author: cmdragon excerpt: FastAPI与SQLAlchemy的集成通过创建虚拟环境、安装依赖、配置数据库连接、定义数据模型和实现路由来完成。核心模块包括数据库引擎、会话工厂和声…

免费将静态网站部署到服务器方法(仅支持HTML,CSS,JS)

原视频链接&#xff1a;把HTML免费部署到网站上&#xff0c;实现别人也能访问的教程来啦QAQ_哔哩哔哩_bilibili 注意&#xff1a;仅支持HTML、CSS、JS。不支持Vue等框架。 1.打开网站www.wordpress.org 点击红框按钮 点击红框按钮下载wordpress模板文件并解压。 将自己编写的…

51单片机实验一:点亮led灯

目录 一、实验环境与实验器材 二、实验内容及实验步骤 1.用keil 软件创建工程&#xff0c;C文件编写程序&#xff0c;编译生成hex文件​编辑 2.用STC烧写hex文件&#xff0c;点亮第一个LED灯 3.使用法2&#xff0c;点除第一个以外的LED灯 一、实验环境与实验器材 环境&am…

PyCharm 开发工具 修改字体大小及使用滚轮没有反应

PyCharm 开发工具 修改字体大小及使用滚轮没有反应 提示&#xff1a;帮帮志会陆续更新非常多的IT技术知识&#xff0c;希望分享的内容对您有用。本章分享的是PyCharm 开发工具。前后每一小节的内容是有学习/理解关联性&#xff0c;希望对您有用~ PyCharm 开发工具 修改字体大小…

zookeeper启动报错have small server identifier

解决方案&#xff1a; 1、查看myid是否有重复 2、查看server.X 与myid的X是否一致 3、启动顺序为myid从小到大的服务器顺序

1.Framer Motion 中 motion/react 和 motion/react-client 的用法和区别

背景知识&#xff1a;服务器端渲染 (SSR) 和客户端渲染 (CSR) 在最新的 Motion for React&#xff08;原 Framer Motion&#xff09;12.x 及更高版本中&#xff0c;官方提供了两个入口模块&#xff1a;motion/react 和 motion/react-client。二者对外 API 完全一致&#xff0c…