CyberRT-共享内存实现

news2025/1/13 13:46:51

CyberRT共享内存类图

共享内存消息发布

在这里插入图片描述
数据用共享内存发布时,首先会创建ShmTransmitter对象,包含两个主要成员segment和notifier,Segment用于创建共享内存(上面绿色部分),Notifer 最终构建ReadableInfo通知给其他进程。
使用哪个ConditionNotifier-> notify或MulticastNotifier->notify,是在创建时根据配置文件决定的。
ConditionNotifier 在构建时会创建Indicator对象保存到共享内存中。
调ConditionNotifier-> notify,实际时将ReadableInfo保存到Indicator对象。

ConditionNotifier 共享内存数据接收

在这里插入图片描述
在接收数据时,也会创建同样的共享内存。如果共享内存存在,则直接打开。
在接收端也有同样的共享内存操作ConditionNotifier 。
ShmDispatcher会持有多个通道segment,用std::unordered_map<channelid, segment>表示。
同时启动一个后台线程ThreadFunc 线程轮询处理消息回调。

void ShmDispatcher::ThreadFunc() {
  ReadableInfo readable_info;
  // 轮询处理
  while (!is_shutdown_.load()) {
	// 100ms, Listen会转换100000 ms,对比seq,如果不等处理消息。每次轮询会等待递减50ms。
    if (!notifier_->Listen(100, &readable_info)) {
      ADEBUG << "listen failed.";
      continue;
    }

    if (readable_info.host_id() != host_id_) {
      ADEBUG << "shm readable info from other host.";
      continue;
    }
	//从共享内存Indicator中读出的数据
    uint64_t channel_id = readable_info.channel_id();
    uint32_t block_index = readable_info.block_index();

    {
      ReadLockGuard<AtomicRWLock> lock(segments_lock_);
      if (segments_.count(channel_id) == 0) {
        continue;
      }
      // check block index
      // std::unordered_map<uint64_t, uint32_t> previous_indexes_; 
      // 保存key: channelID, value: block_index
      if (previous_indexes_.count(channel_id) == 0) {
        previous_indexes_[channel_id] = UINT32_MAX;
      }
      uint32_t& previous_index = previous_indexes_[channel_id];
      if (block_index != 0 && previous_index != UINT32_MAX) {
        if (block_index == previous_index) {
          ADEBUG << "Receive SAME index " << block_index << " of channel "
                 << channel_id;
        } else if (block_index < previous_index) {
          ADEBUG << "Receive PREVIOUS message. last: " << previous_index
                 << ", now: " << block_index;
        } else if (block_index - previous_index > 1) {
          ADEBUG << "Receive JUMP message. last: " << previous_index
                 << ", now: " << block_index;
        }
      }
      previous_index = block_index;
	  ReadMessage(channel_id, block_index);
    }
  }
}

MulticastNotifier共享内存数据接收

MulticastNotifier时采用多播socket实现的,默认

std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;

创建两个socket notify_fd_ 用于发生消息,listen_addr用于接收消息。
在这里插入图片描述
在发送端调用Notify时,时调的MulticastNotifier::Nofify(const ReadableInfo& info)

bool MulticastNotifier::Notify(const ReadableInfo& info) {
  if (is_shutdown_.load()) {
    return false;
  }

  std::string info_str;
  info.SerializeTo(&info_str);
  ssize_t nbytes =
      sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,
             (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
  return nbytes > 0;
}

接收端用同样的方式轮询

bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {
  if (is_shutdown_.load()) {
    return false;
  }

  if (info == nullptr) {
    AERROR << "info nullptr.";
    return false;
  }

  struct pollfd fds;
  fds.fd = listen_fd_;
  fds.events = POLLIN;
  int ready_num = poll(&fds, 1, timeout_ms);
  if (ready_num > 0) {
    char buf[32] = {0};  // larger than ReadableInfo::kSize
    ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
    if (nbytes == -1) {
      AERROR << "fail to recvfrom, " << strerror(errno);
      return false;
    }
    return info->DeserializeFrom(buf, nbytes);
  } else if (ready_num == 0) {
    ADEBUG << "timeout, no readableinfo.";
  } else {
    if (errno == EINTR) {
      AINFO << "poll was interrupted.";
    } else {
      AERROR << "fail to poll, " << strerror(errno);
    }
  }
  return false;
}
bool Block::TryLockForWrite() {
  int32_t rw_lock_free = kRWLockFree;
  //lock_num_ == rw_lock_free, kWriteExclusive赋值给lock_num_,返回true
  //lock_num_ != rw_lock_free, lock_num_赋值给rw_lock_free,返回false
  if (!lock_num_.compare_exchange_weak(rw_lock_free, kWriteExclusive,
                                       std::memory_order_acq_rel,
                                       std::memory_order_relaxed)) {
    ADEBUG << "lock num: " << lock_num_.load();
    return false;
  }
  return true;
}

总结
1、CyberRT的共享内存读写都时需要加锁的。
2、每次写数据可以是不连续的block
3、每次当Block.lock_num_= 0:空闲,>0:有读操作, -1 : 写操作。
效率不是高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1241735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工会排队模式系统,打破传统创新消费

​小编介绍&#xff1a;10年专注商业模式设计及软件开发&#xff0c;擅长企业生态商业模式&#xff0c;商业零售会员增长裂变模式策划、商业闭环模式设计及方案落地&#xff1b;扶持10余个电商平台做到营收过千万&#xff0c;数百个平台达到百万会员&#xff0c;欢迎咨询。 在…

ES ElasticSearch安装、可视化工具kibana安装

1、安装ES docker run -d --name es9200 -e "discovery.typesingle-node" -p 9200:9200 elasticsearch:7.12.1访问测试&#xff1a; http://域名:9200/ 2、安装kibana对es进行可视化操作 执行命令 docker run -d --name kibana5601 -p 5601:5601 kibana:7.1.12.修…

人工智能对我们的生活影响

目录 前言 一、人工智能的领域 二、人工智能的应用 三、对人工智能的看法 总结 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助。 &#x1f4a1;本文由Filotimo__✍️原创&#xff0c;首发于CSDN&#x1f4…

Python编写的爬虫为什么受欢迎?

每每回想起我当初学习python爬虫的经历&#xff0c;当初遇到的各种困难险阻至今都历历在目。即便当初道阻且长&#xff0c;穷且益坚&#xff0c;我也从来没有想过要放弃。今天我将以我个人经历&#xff0c;和大家聊一聊有关Python语音编写的爬虫的事情。谈一谈为什么最近几年py…

关于实时云渲染并发问题的分享,实时云渲染到底能不能省显卡?

近期遇到很多客户咨询实时云渲染技术中的并发问题&#xff0c;在这里点量小芹针对这个问题的几个常见疑惑进行集中的解答分享&#xff0c;希望对有迷惑的朋友有所启发和帮助。 第一个问题&#xff0c;实时云渲染能否扩展一张显卡支持的并发数&#xff1f; 实时云渲染是一个新兴…

IDEA中注释快捷键及模板

单行注释 将光标放置于要注释所在行&#xff0c;使用 Ctrl /&#xff0c; 添加行注释&#xff0c;再次使用&#xff0c;去掉行注释 若需要将多行进行单行注释&#xff0c;只需要选中要注释的多行&#xff0c;然后使用 Ctrl /&#xff0c; 添加行注释&#xff0c;再次使用&a…

PTA-成绩转换

本题要求编写程序将一个百分制成绩转换为五分制成绩。转换规则&#xff1a; 大于等于90分为A&#xff1b;小于90且大于等于80为B&#xff1b;小于80且大于等于70为C&#xff1b;小于70且大于等于60为D&#xff1b;小于60为E。 输入格式: 输入在一行中给出一个整数的百分制成…

【洛谷算法题】P5714-肥胖问题【入门2分支结构】

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5714-肥胖问题【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格式&a…

网络安全等级保护2.0国家标准

等级保护2.0标准体系主要标准如下&#xff1a;1.网络安全等级保护条例2.计算机信息系统安全保护等级划分准则3.网络安全等级保护实施指南4.网络安全等级保护定级指南5.网络安全等级保护基本要求6.网络安全等级保护设计技术要求7.网络安全等级保护测评要求8.网络安全等级保护测评…

10和一万能分销商城源码系统 源码全开源可二开 一个后台轻松管理所有设备 并附带完整的搭建教程

电子商务和移动商务的兴起&#xff0c;传统的实体销售已经无法满足市场的需求。为了适应这种趋势&#xff0c;小编来给大家分享一款10和一万能分销商城源码系统。这是一个全新的、具有高度可定制性的电子商务平台&#xff0c;其背后的逻辑是简化商家操作流程&#xff0c;提高销…

opencv-分水岭算法分割

原理 任何一副灰度图像都可以被看成拓扑平面&#xff0c;灰度值高的区域可以被看成是山峰&#xff0c;灰度值低的区域可以被看成是山谷。我们向每一个山谷中灌不同颜色的水。随着水的位的升高&#xff0c;不同山谷的水就会相遇汇合&#xff0c;为了防止不同山谷的水汇合&#x…

Sentinel 授权规则 (AuthorityRule)

Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 SpringbootDubboNacos 集成 Sentinel&…

如何在AppLink配置金蝶云星空预算使用单流程

上一篇有提到金蝶云星空如何通过AppLink平台配置销售订单操作&#xff0c;这次来演示下如何“保存预算使用单”、“调拨单定时自动审核”以及“预算使用单反审核后删除”操作。 根据请求数据保存预算使用单 当webhook接收到数据时触发流程 步骤1&#xff1a;根据webhook的请…

从根到叶:随机森林模型的深入探索

一、说明 在本综合指南中&#xff0c;我们将超越基础知识。当您盯着随机森林模型的文档时&#xff0c;您将不再对“节点杂质”、“加权分数”或“成本复杂性修剪”等术语感到不知所措。相反&#xff0c;我们将剖析每个参数&#xff0c;阐明其作用和影响。通过理论和 Python 实践…

Navicat 技术指引 | 适用于 GaussDB 的自动运行功能

Navicat Premium&#xff08;16.2.8 Windows版或以上&#xff09; 已支持对 GaussDB 主备版的管理和开发功能。它不仅具备轻松、便捷的可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结构同步、协同合作、数据迁移等&#xff09;&#xff0c;这…

机器学习/sklearn 笔记:K-means,kmeans++,MiniBatchKMeans

1 K-means介绍 1.0 方法介绍 KMeans算法通过尝试将样本分成n个方差相等的组来聚类&#xff0c;该算法要求指定群集的数量。它适用于大量样本&#xff0c;并已在许多不同领域的广泛应用领域中使用。KMeans算法将一组样本分成不相交的簇&#xff0c;每个簇由簇中样本的平均值描…

好用的局域网监控软件推荐

局域网监控软件是一种用于监控局域网内计算机使用情况的软件&#xff0c;可以帮助企业管理者更好地了解员工的工作状态和行为&#xff0c;规范上网行为并保护企业网络资源。 一、域之盾软件 这是一款专业的上网监控软件&#xff0c;它支持多种操作系统和平台&#xff0c;可以全…

【STM32外设系列】GPS定位模块(ATGM336H)

&#x1f380; 文章作者&#xff1a;二土电子 &#x1f338; 关注公众号获取更多资料&#xff01; &#x1f438; 期待大家一起学习交流&#xff01; 文章目录 一、GPS模块简介二、使用方法2.1 引脚介绍2.2 数据帧介绍2.3 关于不同的启动方式 三、前置知识3.1 strstr函数3.2…

Mac下载的软件显示文件已损坏,如何解决文件已损坏问题,让文件可以正常运行

Mac下载的软件显示文件已损坏&#xff0c;如何解决文件已损坏问题&#xff0c;让文件可以正常运行 设备/引擎&#xff1a;Mac&#xff08;11.6&#xff09;/Mac Mini 开发工具&#xff1a;终端 开发需求&#xff1a;让显示已损坏的文件顺利安装到电脑 大家肯定都遇到过下载…