Qnx IPC通信 Message-passing
Message-passing介绍
QNX提供了多种IPC(Interprocess Communication )通信方式,包括Message-passing、Plus(脉冲)、Event、Signal、共享内存、Pipe,当然还有socket。
Message-passing是Qnx IPC的主要形式。
Synchronous messaging is the main form of IPC in the QNX Neutrino RTOS.
Message-passing提供了主从直接同步的双向消息传输,类似于Android 同步Binder。
客户端向服务端请求(Request),服务端向客户端(Reply)。
官网链接:
https://www.qnx.com/developers/docs/7.1/#com.qnx.doc.neutrino.sys_arch/topic/ipc_Sync_messaging.html
通信状态迁移图
Qnx官网给出的客户端状态迁移图。
- 客户端调用MsgSend(Qnx提供的函数),客户执行函数的所在线程,进入SednBlocked的阻塞状态(等待服务端回复)
- 服务端执行MsgRecevice,接收客户端的请求后。Qnx kernel会将Client端的Thread调度为Repley blocked状态。
- 服务端处理完客户端的请求后,调用MsgReply或者MsgError,返回结果给Client端。Client从blocked阻塞状态,变为Ready非阻塞状态。
Qnx官网给出的服务端状态迁移图
- 服务端调用MsgReceive接收客户端请求(阻塞)
- 客户端端调用MessageSend后,服务端收到调用,从阻塞状态中退出。处理请求后,通过MsgReply()/MsgError回复客户端。
- 服务端重新调用MsgReceive,监听客户端请求。
Channels and connections
Qnx中消息传递(Message-passing)是基于通道/连接 这种方式,进行传输的。
- 一个thread(看做任务的基本调度单位)想要接收消息,必须创建一个通道(Channels)
- 一个thread 想要发送消息,必须连接(connections)到这个通道。
Qnx官网关于Channels and connections的说明图片。
关于性能
根据QNX官网的介绍,Message-passing时,Qnx Kernel基于地址空间,直接把消息从当前线程的地址,复制到接收线程的地址,没有额外的中间拷贝。所以接近于硬件上内存的带宽(很快)
Since our messaging services copy a message directly from the address space of one thread to another without intermediate buffering, the message-delivery performance approaches the memory bandwidth of the underlying hardware.
Message-passing例子
Qnx官网给出了例子,这里借用分析一下例子。
- 服务端
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/dispatch.h>
#define ATTACH_POINT "myname"
// 定义消息数据的类型
typedef struct _my_data
{
uint16_t type;
int value;
} my_data_t;
// 在特殊情况下,比如客户端断开,会收到来自Qnx Kernel的消息。
// 这个时候通过 qnx提供的_pulse 来接收
typedef union recv_buf
{
uint16_t type;
struct _pulse pulse;
my_data_t data;
} my_recv_buf_t;
int server() {
name_attach_t *attach;
my_recv_buf_t msg;
int rcvid;
// 创建一个通道
if ((attach = name_attach(NULL, ATTACH_POINT, 0)) == NULL) {
return EXIT_FAILURE;
}
// 开启循环,监听客户端请求
while (1) {
rcvid = MsgReceive(attach->chid, &msg, sizeof(msg), NULL);
if (rcvid == -1) {
// Error了
break;
}
// 表示接口到Plus消息(来自Qnx系统发送的消息)
if (rcvid == 0) {
switch (msg.pulse.code) {
case _PULSE_CODE_DISCONNECT:
/*
* A client disconnected all its connections (called
* name_close() for each name_open() of our name) or
* terminated
*/
// 客户端断开
ConnectDetach(msg.pulse.scoid);
break;
case _PULSE_CODE_UNBLOCK:
/*
* REPLY blocked client wants to unblock (was hit by
* a signal or timed out). It's up to you if you
* reply now or later.
*/
// 客户端长期阻塞在REPLY blocked状态。请求解除阻塞
break;
default:
/*
* A pulse sent by one of your processes or a
* _PULSE_CODE_COIDDEATH or _PULSE_CODE_THREADDEATH
* from the kernel?
*/
// 这段看qnx官方注释即可
break;
}
continue;
}
/* A system message was received, reject it. */
if (msg.type <= _IO_MAX ) {
MsgError( rcvid, ENOSYS );
continue;
}
/* A message (presumable ours) received, handle */
printf("Server receive %d \n", msg.data.value);
// 回复客户端消息
//
// int MsgReply( int rcvid,
// long status,
// const void* msg,
// size_t bytes );
// 这里只回复了客户端状态。第三的参数和第四个参数,是回复客户端的数据,及Size
MsgReply(rcvid, EOK, 0, 0);
}
/* Remove the name from the space */
name_detach(attach, 0);
return EXIT_SUCCESS;
}
int main(int argc, char **argv) {
int ret = server();
return ret;
}
- 客户端
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/dispatch.h>
#define ATTACH_POINT "myname"
// 定义消息数据的类型
typedef struct _my_data
{
uint16_t type;
int value;
} my_data_t;
int client() {
// 发送的Msg
my_data_t msg;
int server_coid;
// 连接通道
if ((server_coid = name_open(ATTACH_POINT, 0)) == -1) {
return EXIT_FAILURE;
}
/* We would have pre-defined data to stuff here */
msg.type = (_IO_MAX+5);
// 给Server发送消息
for (msg.value=0; msg.value < 5; msg.value++) {
printf("Client sending %d \n", msg.value);
// 这里只接受了服务端回复的状态(MsgSend的返回值)
// long MsgSend( int coid,
// const void* smsg,
// size_t sbytes,
// void* rmsg,
// size_t rbytes );
// 第三个参数和第四个参数,表示从服务端收到的数据。
if (MsgSend(server_coid, &msg, sizeof(msg), NULL, 0) == -1) {
break;
}
}
/* Close the connection */
name_close(server_coid);
return EXIT_SUCCESS;
}
int main(int argc, char **argv) {
int ret = client();
return ret;
}
如果有Qnx环境的上,编译上述程序后。在qnx环境下先运行Server,然后运行Client即可。