Aeron:两个代理之间的单向IPC(One-way IPC between two agents)

news2025/1/13 16:50:44

一、概述

本例展示了如何通过 IPC 在调度于不同线程的两个代理之间传输缓冲区。在继续学习本示例之前,最好先复习一下Simplest Full Example ,因为该示例展示的是 IPC 通信,没有增加代理的复杂性。读者还应熟悉Media Driver

流程构建如下:

  • 以默认模式运行的嵌入式Media Driver(发送器、接收器和指挥器的代理(an agent for Sender, Receiver, Conductor))(an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor))
  • 通过publication发送 IPC 数据的代理(SendAgent)(an agent to send the IPC data over a publication (SendAgent))
  • 通过subscription接收 IPC 数据的代理(ReceiveAgent)(an agent to receive the IPC data over a subscription (ReceiveAgent))

Code Sample overview

代码示例包含在 ipc-core 项目 com.aeroncookbook.ipc.agents 命名空间中的三个文件中。它们是:

  • StartHere.java - the class responsible for setting up Aeron and scheduling the agents;(负责设置 Aeron 和调度代理的类)
  • SendAgent.java - the class holding the Agent responsible for sending data;(负责发送数据的代理的类)
  • ReceiveAgent.java - the class holding the Agent responsible for receiving data.(负责接收数据的代理的类)

下文将对每个部分进行细分和讨论。

 Execution Output

15:13:42.814 [main] starting
15:13:42.964 [receiver] received: 1000000

二、StartHere.java

public static void main(String[] args)
{
 final String channel = "aeron:ipc";
 final int stream = 10;
 final int sendCount = 1_000_000;
 final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
 final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
 final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

 //construct Media Driver, cleaning up media driver folder on start/stop
 final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
         .dirDeleteOnStart(true)
         .threadingMode(ThreadingMode.SHARED)
         .sharedIdleStrategy(new BusySpinIdleStrategy())
         .dirDeleteOnShutdown(true);
 final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

 //construct Aeron, pointing at the media driver's folder
 final Aeron.Context aeronCtx = new Aeron.Context()
         .aeronDirectoryName(mediaDriver.aeronDirectoryName());
 final Aeron aeron = Aeron.connect(aeronCtx);

 //construct the subs and pubs
 final Subscription subscription = aeron.addSubscription(channel, stream);
 final Publication publication = aeron.addPublication(channel, stream);

 //construct the agents
 final SendAgent sendAgent = new SendAgent(publication, sendCount);
 final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,
     sendCount);
 //construct agent runners
 final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
         Throwable::printStackTrace, null, sendAgent);
 final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
         Throwable::printStackTrace, null, receiveAgent);

 LOGGER.info("starting");

 //start the runners
 AgentRunner.startOnThread(sendAgentRunner);
 AgentRunner.startOnThread(receiveAgentRunner);

 //wait for the final item to be received before closing
 barrier.await();

 //close the resources
 receiveAgentRunner.close();
 sendAgentRunner.close();
 aeron.close();
 mediaDriver.close();
}

 Constructing support objects

final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

这部分代码构建了一些支持对象。(This section of the code constructs a few support objects.)

  • Line 1 holds the channel definition, in this case aeron:ipc
  • Line 2 holds the stream ID to use, in this case 10
  • Line 3 is the number of integers to send over IPC
  • 第 4、5 行构建了代理使用的空闲策略(IdleStrategy)。在这种情况下,只要 doWork 工作周期返回 0,空闲策略就会忙于旋转。(Line 4,5 constructs the IdleStrategy to be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.)
  • 第 6 行是一个屏障,用于协调样本的关闭。一旦 ReceiveAgent 总共接收到一个发送计数整数,它就会向屏障发出信号,触发关闭。(Line 6 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the ReceiveAgent has received a total of sendCount integers, it will signal the barrier, triggering the shutdown.)

Constructing the Media Driver 

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
 .dirDeleteOnStart(true)
 .threadingMode(ThreadingMode.SHARED)
 .sharedIdleStrategy(new BusySpinIdleStrategy())
 .dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

本节代码使用定义的上下文构建Media Driver。上下文是一个对象,其中包含Media Driver的所有可选配置参数。在本例中,有两项配置被重写,以确保Media Driver在启动和关闭时整理Media Driver目录。一旦上下文准备就绪,Media Driver就会作为嵌入式代理启动。

See also: Media Driver

 Constructing Aeron, the Publication and the Subscription

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
 .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

这部分代码再次使用 Context 构建 Aeron 对象。有了这个上下文,我们就能让 Aeron 知道Media Driver的 Aeron 目录在哪里。一旦上下文准备就绪,Aeron 对象就会连接到Media Driver。接下来,我们将使用之前定义的通道和流 id 创建 IPC 发布和订阅(IPC publication and subscription)。

//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);

Constructing and scheduling the agents

//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);

//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
     Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
     Throwable::printStackTrace, null, receiveAgent);

//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);

 

这部分代码构建发送代理(SendAgent)和接收代理(ReceiveAgent),创建代理运行程序来管理它们,然后在特定线程上启动它们。关键行如下:

  • 第 6-7 行和第 8-9 行:这两行分别构建了发送和接收的代理运行程序。请注意,每行都给出了空闲策略,用于控制线程在 doWork 工作周期后如何使用资源。
  • 第 12 和 13 行:这两行为每个代理创建新线程,并开始工作周期。

Shutting down cleanly

//wait for the final item to be received before closing
barrier.await();

//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();

代码的最后部分负责等待 ReceiveAgent 触发屏障,然后正确清理资源。首先关闭代理,然后关闭 aeron 对象,最后关闭Media Driver。如果不注意关闭过程中的执行顺序,可能会出现核心转储或其他看似严重的故障。 

三、SendAgent.java

public class SendAgent implements Agent
{
 private final Publication publication;
 private final int sendCount;
 private final UnsafeBuffer unsafeBuffer;
 private int currentCountItem = 1;
 private final Logger logger = LoggerFactory.getLogger(SendAgent.class);

 public SendAgent(final Publication publication, int sendCount)
 {
     this.publication = publication;
     this.sendCount = sendCount;
     this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));
     unsafeBuffer.putInt(0, currentCountItem);
 }

 @Override
 public int doWork()
 {
     if (currentCountItem > sendCount)
     {
         return 0;
     }

     if (publication.isConnected())
     {
         if (publication.offer(unsafeBuffer) > 0)
         {
             currentCountItem += 1;
             unsafeBuffer.putInt(0, currentCountItem);
         }
     }
     return 0;
 }

 @Override
 public String roleName()
 {
     return "sender";
 }
}

 

send 对象负责通过提供的 Aeron Publication 发送 sendCount 整数。doWork 方法用于保持代理的工作周期,该方法会被持续调用,直至代理关闭。一旦达到 sendCount 限制,它就会停止向publication发送更多信息,并开始闲置。

这段代码中最有趣的部分是:

  • Line 18 to 34: the doWork method holding the duty cycle for this agent
  • 第 22 行和第 34 行:这两条返回语句都返回 0,这将导致选定的空闲策略 BusySpinIdleStrategy 调用 ThreadHints.onSpinWait()
  • 第 25 行:只有当publication已连接时,才会返回 true。一旦连接,就可以安全地向publication提供信息。
  • 第 27 行:这将为publication提供缓冲数据。
  • Line 30: this logs the last sent integer, for example 15:13:42.818 [sender] sent: 123
  • Line 41: this sets the thread name to sender, as is visible in the log output.

四、ReceiveAgent.java

public class ReceiveAgent implements Agent
{
 private final Subscription subscription;
 private final ShutdownSignalBarrier barrier;
 private final int sendCount;
 private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);

 public ReceiveAgent(final Subscription subscription,
                     ShutdownSignalBarrier barrier, int sendCount)
 {
     this.subscription = subscription;
     this.barrier = barrier;
     this.sendCount = sendCount;
 }

 @Override
 public int doWork() throws Exception
 {
     subscription.poll(this::handler, 1000);
     return 0;
 }

 private void handler(DirectBuffer buffer, int offset, int length,
                     Header header)
 {
     final int lastValue = buffer.getInt(offset);

     if (lastValue >= sendCount)
     {
         logger.info("received: {}", lastValue);
         barrier.signal();
     }
 }

 @Override
 public String roleName()
 {
     return "receiver";
 }
}

接收代理负责轮询所提供的订阅并记录接收到的值。一旦达到 sendCount 值,接收代理就会发出屏障信号。该对象中最有趣的部分是:

  • 第 17-21 行 - doWork 方法保持着该代理的duty cycle 。duty cycle由两部分组成,一部分是轮询订阅,将事件传递给提供的处理程序,另一部分是返回 0。通过配置的 IdleStrategy,返回 0 将导致线程停顿一微秒。
  • Line 26 - this logs the integer value received, for example: 15:13:42.814 [receiver] received: 5
  • Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
  • Line 38 - this sets the role name to receiver, as visible in log output.

 五、Performance

在英特尔笔记本电脑上,本示例每秒可传输约 1 千万条 4 字节信息。如果使用的是 Linux 系统,且有可用的 /dev/shm,代码会自动使用。通过交换 NoOpIdleStrategy,并将media driver线程移至 DEDICATED,每秒可传输超过 2000 万条信息。主要更改见下文。请注意,您需要确保硬件上至少有 8 个物理内核。

final IdleStrategy idleStrategySend = new NoOpIdleStrategy();
final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
     .dirDeleteOnStart(true)
     .threadingMode(ThreadingMode.DEDICATED)
     .conductorIdleStrategy(new BusySpinIdleStrategy())
     .senderIdleStrategy(new NoOpIdleStrategy())
     .receiverIdleStrategy(new NoOpIdleStrategy())
     .dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
     .idleStrategy(new NoOpIdleStrategy())
     .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

 

有一个相关的 Two Agent example of OneToOneRingBuffer 非常相似,只不过它使用了 Agrona 的 OneToOneRingBuffer,并通过 BusySpinIdleStrategy 每秒发送大约 1800 万条 4 字节信息,或通过 NoOpIdleStrategy 每秒发送超过 5000 万条信息。

六、Using the C Media Driver

要使用 Aeron 的 C Media Driver测试此示例,您需要执行以下操作:

首先,从源代码中构建 C Media Driver(说明因操作系统而异,此部分参考博客即可,建议翻墙进行操作,贴出cookbook只是帮助借阅):

  • Building the C Media Driver on macOS
  • Building the C Media Driver on CentOS Linux 8
  • Building the C Media Driver on Ubuntu 20.04
  • Building the C Media Driver on Windows 10

Next, start the C Media Driver with default settings

  • ./aeronmd (Linux/macOS)
  • aeronmd (Windows)

Then, remove the Media Driver from StartHere.java, and reduce the Aeron context to defaults:

//construct Media Driver, cleaning up media driver folder on start/stop
//final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
//        .dirDeleteOnStart(true)
//        .threadingMode(ThreadingMode.SHARED)
//        .sharedIdleStrategy(new BusySpinIdleStrategy())
//        .dirDeleteOnShutdown(true);
//final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context();
final Aeron aeron = Aeron.connect(aeronCtx);

Aeron 和Media Driver将默认使用同一目录。

最后,正常运行 StartHere.java。进程应正常运行,输出应包括类似内容:

14:30:00.293 [main] starting
14:30:00.758 [receiver] received: 10000000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1833396.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Maya 2024 mac/win版:创意无界,设计新生

Maya 2024是一款由Autodesk推出的业界领先的三维计算机图形软件,广泛应用于电影、游戏、广告等创意产业。这款软件以其强大的功能和卓越的性能,为艺术家们提供了一个实现创意梦想的平台。 Maya 2024 mac/win版获取 在建模方面,Maya 2024提供…

Linux_理解程序地址空间和页表

目录 1、进程地址空间示意图 2、验证进程地址空间的结构 3、验证进程地址空间是虚拟地址 4、页表-虚拟地址与物理地址 5、什么是进程地址空间 6、进程地址空间和页表的存在意义 6.1 原因一(效率性) 6.2 原因二(安全性) …

Langchain中使用Ollama提供的Qwen大模型进行Function Call实现天气查询、网络搜索

Function Call,或者叫函数调用、工具调用,是大语言模型中比较重要的一项能力,对于扩展大语言模型的能力,或者构建AI Agent,至关重要。 Function Call的简单原理如下: 按照特定规范(这个一般是L…

TensorRT的循环样例代码

官方文档地址 https://docs.nvidia.com/deeplearning/tensorrt/developer-guide/index.html#define-loops 非顺序结构,其内容确实有点乱,而且没有完整可运行的样例。 可以有多个IIteratorLayer, IRecurrenceLayer, and ILoopOutputLayer 层,…

wondershaper 一款限制 linux 服务器网卡级别的带宽工具

文章目录 一、关于奇迹整形器二、文档链接三、源码下载四、限流测试五、常见报错1. /usr/local/sbin/wondershaper: line 145: tc: command not found2. Failed to download metadata for repo ‘appstream‘: Cannot prepare internal mirrorlist: No URLs.. 一、关于奇迹整形…

抖音矩阵系统搭建,AI剪辑短视频,一键管理矩阵账号

目录 前言: 一、抖音矩阵系统有哪些功能? 1.AI智能文案 2.多平台账号授权 3.多种剪辑模式 4. 矩阵一键发布,智能发布 5.抖音爆店码功能 6.私信实时互动 7.去水印及外链 二、抖音矩阵系统可以解决哪些问题? 总结&#xff…

【MySQL基础随缘更系列】AB复制

文章目录 mysql AB复制实战一、mysql AB复制二、AB复制原理三、master服务器设置3.1、安装mysql并启动3.2、关闭防火墙,selinux3.3、设置时间服务器3.4、修改配置文件 设置server-idN3.5、创建slave连接master的账号,用于取SQL语句 四、slave设置4.3、修改配置文件 …

C#调用OpenCvSharp和SkiaSharp绘制图像直方图

最近在B站上学习OpenCv教程,学到图像直方图,后者描述的是不同色彩在整幅图像中所占的比例(统计不同色彩在图像中的出现次数),可以对灰度图、彩色图等计算并绘制图像直方图。本文学习OpenCvSharp中与计算直方图相关的函…

MySQL-DDL(Data Definition Language)

078-对表结构进行增删改操作 增删改表结构DDL(Data Definition Language) 创建一个学生表 create table t_student( no bigint, name varchar(255), age int comment 年龄 );查看建表语句 show create table t_student;修改表名 alter table 表名 r…

手写操作系统

对喜欢操作系统的伙伴强推一门课程 从0开始实现了支持文件系统、任务切换和网络协议栈的操作系统。 具体见 :http://www.ziyuanwang.online/977.html

Part 4.2 背包动态规划

->背包模型模板(0/1,分组&#xff0c;完全&#xff0c;多重)<- [NOIP2018 提高组] 货币系统 题目背景 NOIP2018 提高组 D1T2 题目描述 在网友的国度中共有 n n n 种不同面额的货币&#xff0c;第 i i i 种货币的面额为 a [ i ] a[i] a[i]&#xff0c;你可以假设每…

RocketMQ源码学习笔记:NameServer启动流程

这是本人学习的总结&#xff0c;主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、Overview2、NameServer启动流程2.1、总结2.2、NamesrvController2.2.1、主要职责2.2.2、关键的成员变量2.2.3、核心代码2.2.4、值得注意的点 1、Overview NameServer主要就做三件事 Nam…

DoIP——step2:车辆发现

文章目录 前言一、IP地址配置1.1 AutoIP1.2 DHCP1.3 DoIP实体的IP地址配置流程二、车辆发现车辆声明报文内容如下:前言 完成诊断设备到车辆的物理连接并通过激活线使能诊断连接后边缘节点将会将连接状态传递至应用层,在开始车辆发现过程之前,需要先进行各自的IP地址配置,获…

CTFshow之RCE代码命令远程执行第49关详细讲解。可私信!

棺材里伸手&#xff0c;死要钱&#xff01; --古吉拉特邦 莫迪大仙 引言&#xff1a;由于有些题目实在是让人抓挠&#xff0c;我看完题解后难以接受知识机械的执行获取flag&#xff0c;所以我想着尽可能用我的语言去进行解释&#xff01; 由于是验证猜想实验&#xff0c;所以…

【数据结构与算法 刷题系列】判断链表是否有环(图文详解)

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;《数据结构与算法 经典例题》C语言 期待您的关注 ​ ​ 目录 一、问题描述 二、解题思路 1.解题思路: 2.快慢指针的移动分三个…

区间预测 | Matlab实现CNN-ABKDE卷积神经网络自适应带宽核密度估计多变量回归区间预测

区间预测 | Matlab实现CNN-ABKDE卷积神经网络自适应带宽核密度估计多变量回归区间预测 目录 区间预测 | Matlab实现CNN-ABKDE卷积神经网络自适应带宽核密度估计多变量回归区间预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现CNN-ABKDE卷积神经网络自适应…

什么是隐马尔可夫模型?

文章目录 一、说明二、玩具HMM&#xff1a;5′拼接位点识别三、那么&#xff0c;隐藏了什么&#xff1f;四、查找最佳状态路径五、超越最佳得分对齐六、制作更逼真的模型七、收获 关键词&#xff1a;hidden markov model 一、说明 被称为隐马尔可夫模型的统计模型是计算生物学…

同三维TT806-1 USB单路网络视频流/U盘采集卡

同三维TT806-1 USB单路网络视频流/U盘采集卡 (1路网络音视频信号或U盘直播推流器) 支持采集1路网络视频流或U盘音视频信号&#xff0c;USB输出到电脑 同时还可流推2个直播平台&#xff0c;可设置6组定时推流&#xff0c;有线网络 可录像到U盘&#xff0c;支持定时录像 一…

JAVA-线程

先上图&#xff0c;有点长&#xff0c;比较碎&#xff0c;有xmind文件......&#xff0c;详细内容均在图片里介绍了&#xff0c;提供了PDF文件 1.线程简介 进程是操作系统中正在执行的不同的应用程序&#xff0c;例如&#xff1a;我们可以同时打开Word和记事本 线程是一个应用…

创建型模式--抽象工厂模式

产品族创建–抽象工厂模式 工厂方法模式通过引入工厂等级结构,解决了简单工厂模式中工厂类职责太重的问题。 但由于工厂方法模式中的每个工厂只生产一类产品,可能会导致系统中存在大量的工厂类,势必会增加系统的开销。此时,可以考虑将一些相关的产品组成一个“产品族”,…