一、概述
本例展示了如何通过 IPC 在调度于不同线程的两个代理之间传输缓冲区。在继续学习本示例之前,最好先复习一下Simplest Full Example ,因为该示例展示的是 IPC 通信,没有增加代理的复杂性。读者还应熟悉Media Driver
流程构建如下:
- 以默认模式运行的嵌入式Media Driver(发送器、接收器和指挥器的代理(an agent for Sender, Receiver, Conductor))(an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor))
- 通过publication发送 IPC 数据的代理(SendAgent)(an agent to send the IPC data over a publication (SendAgent))
- 通过subscription接收 IPC 数据的代理(ReceiveAgent)(an agent to receive the IPC data over a subscription (ReceiveAgent))
Code Sample overview
代码示例包含在 ipc-core 项目 com.aeroncookbook.ipc.agents
命名空间中的三个文件中。它们是:
StartHere.java
- the class responsible for setting up Aeron and scheduling the agents;(负责设置 Aeron 和调度代理的类)SendAgent.java
- the class holding the Agent responsible for sending data;(负责发送数据的代理的类)ReceiveAgent.java
- the class holding the Agent responsible for receiving data.(负责接收数据的代理的类)
下文将对每个部分进行细分和讨论。
Execution Output
15:13:42.814 [main] starting
15:13:42.964 [receiver] received: 1000000
二、StartHere.java
public static void main(String[] args)
{
final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1_000_000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
.dirDeleteOnStart(true)
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(new BusySpinIdleStrategy())
.dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
.aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);
//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);
//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,
sendCount);
//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
Throwable::printStackTrace, null, receiveAgent);
LOGGER.info("starting");
//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);
//wait for the final item to be received before closing
barrier.await();
//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();
}
Constructing support objects
final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
这部分代码构建了一些支持对象。(This section of the code constructs a few support objects.)
- Line 1 holds the channel definition, in this case
aeron:ipc
- Line 2 holds the stream ID to use, in this case 10
- Line 3 is the number of integers to send over IPC
- 第 4、5 行构建了代理使用的空闲策略(IdleStrategy)。在这种情况下,只要 doWork 工作周期返回 0,空闲策略就会忙于旋转。(Line 4,5 constructs the
IdleStrategy
to be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.)- 第 6 行是一个屏障,用于协调样本的关闭。一旦 ReceiveAgent 总共接收到一个发送计数整数,它就会向屏障发出信号,触发关闭。(Line 6 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the
ReceiveAgent
has received a total ofsendCount
integers, it will signal the barrier, triggering the shutdown.)
Constructing the Media Driver
//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
.dirDeleteOnStart(true)
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(new BusySpinIdleStrategy())
.dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
本节代码使用定义的上下文构建Media Driver。上下文是一个对象,其中包含Media Driver的所有可选配置参数。在本例中,有两项配置被重写,以确保Media Driver在启动和关闭时整理Media Driver目录。一旦上下文准备就绪,Media Driver就会作为嵌入式代理启动。
See also: Media Driver
Constructing Aeron, the Publication and the Subscription
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
.aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);
这部分代码再次使用 Context 构建 Aeron 对象。有了这个上下文,我们就能让 Aeron 知道Media Driver的 Aeron 目录在哪里。一旦上下文准备就绪,Aeron 对象就会连接到Media Driver。接下来,我们将使用之前定义的通道和流 id 创建 IPC 发布和订阅(IPC publication and subscription)。
//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);
Constructing and scheduling the agents
//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);
//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
Throwable::printStackTrace, null, receiveAgent);
//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);
这部分代码构建发送代理(SendAgent)和接收代理(ReceiveAgent),创建代理运行程序来管理它们,然后在特定线程上启动它们。关键行如下:
- 第 6-7 行和第 8-9 行:这两行分别构建了发送和接收的代理运行程序。请注意,每行都给出了空闲策略,用于控制线程在 doWork 工作周期后如何使用资源。
- 第 12 和 13 行:这两行为每个代理创建新线程,并开始工作周期。
Shutting down cleanly
//wait for the final item to be received before closing
barrier.await();
//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();
代码的最后部分负责等待 ReceiveAgent 触发屏障,然后正确清理资源。首先关闭代理,然后关闭 aeron 对象,最后关闭Media Driver。如果不注意关闭过程中的执行顺序,可能会出现核心转储或其他看似严重的故障。
三、SendAgent.java
public class SendAgent implements Agent
{
private final Publication publication;
private final int sendCount;
private final UnsafeBuffer unsafeBuffer;
private int currentCountItem = 1;
private final Logger logger = LoggerFactory.getLogger(SendAgent.class);
public SendAgent(final Publication publication, int sendCount)
{
this.publication = publication;
this.sendCount = sendCount;
this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));
unsafeBuffer.putInt(0, currentCountItem);
}
@Override
public int doWork()
{
if (currentCountItem > sendCount)
{
return 0;
}
if (publication.isConnected())
{
if (publication.offer(unsafeBuffer) > 0)
{
currentCountItem += 1;
unsafeBuffer.putInt(0, currentCountItem);
}
}
return 0;
}
@Override
public String roleName()
{
return "sender";
}
}
send 对象负责通过提供的 Aeron Publication 发送 sendCount 整数。doWork 方法用于保持代理的工作周期,该方法会被持续调用,直至代理关闭。一旦达到 sendCount 限制,它就会停止向publication发送更多信息,并开始闲置。
这段代码中最有趣的部分是:
- Line 18 to 34: the doWork method holding the duty cycle for this agent
- 第 22 行和第 34 行:这两条返回语句都返回 0,这将导致选定的空闲策略
BusySpinIdleStrategy
调用ThreadHints.onSpinWait()
- 第 25 行:只有当publication已连接时,才会返回 true。一旦连接,就可以安全地向publication提供信息。
- 第 27 行:这将为publication提供缓冲数据。
- Line 30: this logs the last sent integer, for example
15:13:42.818 [sender] sent: 123
- Line 41: this sets the thread name to
sender
, as is visible in the log output.
四、ReceiveAgent.java
public class ReceiveAgent implements Agent
{
private final Subscription subscription;
private final ShutdownSignalBarrier barrier;
private final int sendCount;
private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);
public ReceiveAgent(final Subscription subscription,
ShutdownSignalBarrier barrier, int sendCount)
{
this.subscription = subscription;
this.barrier = barrier;
this.sendCount = sendCount;
}
@Override
public int doWork() throws Exception
{
subscription.poll(this::handler, 1000);
return 0;
}
private void handler(DirectBuffer buffer, int offset, int length,
Header header)
{
final int lastValue = buffer.getInt(offset);
if (lastValue >= sendCount)
{
logger.info("received: {}", lastValue);
barrier.signal();
}
}
@Override
public String roleName()
{
return "receiver";
}
}
接收代理负责轮询所提供的订阅并记录接收到的值。一旦达到 sendCount 值,接收代理就会发出屏障信号。该对象中最有趣的部分是:
- 第 17-21 行 - doWork 方法保持着该代理的duty cycle 。duty cycle由两部分组成,一部分是轮询订阅,将事件传递给提供的处理程序,另一部分是返回 0。通过配置的 IdleStrategy,返回 0 将导致线程停顿一微秒。
- Line 26 - this logs the integer value received, for example:
15:13:42.814 [receiver] received: 5
- Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
- Line 38 - this sets the role name to
receiver
, as visible in log output.
五、Performance
在英特尔笔记本电脑上,本示例每秒可传输约 1 千万条 4 字节信息。如果使用的是 Linux 系统,且有可用的 /dev/shm,代码会自动使用。通过交换
NoOpIdleStrategy
,并将media driver线程移至DEDICATED
,每秒可传输超过 2000 万条信息。主要更改见下文。请注意,您需要确保硬件上至少有 8 个物理内核。
final IdleStrategy idleStrategySend = new NoOpIdleStrategy();
final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
.dirDeleteOnStart(true)
.threadingMode(ThreadingMode.DEDICATED)
.conductorIdleStrategy(new BusySpinIdleStrategy())
.senderIdleStrategy(new NoOpIdleStrategy())
.receiverIdleStrategy(new NoOpIdleStrategy())
.dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
.idleStrategy(new NoOpIdleStrategy())
.aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);
有一个相关的 Two Agent example of OneToOneRingBuffer 非常相似,只不过它使用了 Agrona 的 OneToOneRingBuffer,并通过 BusySpinIdleStrategy
每秒发送大约 1800 万条 4 字节信息,或通过 NoOpIdleStrategy
每秒发送超过 5000 万条信息。
六、Using the C Media Driver
要使用 Aeron 的 C Media Driver测试此示例,您需要执行以下操作:
首先,从源代码中构建 C Media Driver(说明因操作系统而异,此部分参考博客即可,建议翻墙进行操作,贴出cookbook只是帮助借阅):
- Building the C Media Driver on macOS
- Building the C Media Driver on CentOS Linux 8
- Building the C Media Driver on Ubuntu 20.04
- Building the C Media Driver on Windows 10
Next, start the C Media Driver with default settings
./aeronmd
(Linux/macOS)aeronmd
(Windows)
Then, remove the Media Driver from StartHere.java
, and reduce the Aeron context to defaults:
//construct Media Driver, cleaning up media driver folder on start/stop
//final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
// .dirDeleteOnStart(true)
// .threadingMode(ThreadingMode.SHARED)
// .sharedIdleStrategy(new BusySpinIdleStrategy())
// .dirDeleteOnShutdown(true);
//final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context();
final Aeron aeron = Aeron.connect(aeronCtx);
Aeron 和Media Driver将默认使用同一目录。
最后,正常运行 StartHere.java。进程应正常运行,输出应包括类似内容:
14:30:00.293 [main] starting
14:30:00.758 [receiver] received: 10000000