VSOA数据流
概念
实际业务中常常存在既有实时命令通信,又有非实时的大数据通信,如文件、音视频传输服务等,如果使用常规的 RPC 或订阅/发布功能来实现,将实时命令和大数据传输混在一起,则会影响 RPC 通道响应的实时性,尤其在需要 RPC 实时控制的应用中就显得极不合理;另一种方式是针对此类大数据的每一个通信,均额外实现对应的服务,但这会给开发者带来额外的编程复杂度,同时增加整个系统的资源开销。
VSOA 中提供了全双工高速并行数据流,称为 “VSOA Stream”。它允许程序进行长时间、大量的数据传输,同时可保证 RPC 通道的实时响应。
VSOA Stream 是一个在已有服务链路上建立的传输隧道,具有以下功能特点:
- 独立性:VSOA Stream 不会影响命令通道的数据交互,即不影响 RPC 的实时性。
- 临时性:VSOA Stream 按需创建,在数据传输完成后可随时销毁,不持续占用连接资源。
- 用户无感:VSOA Stream 的传输隧道在 VSOA 内部自动协商建立,用户无需关心创建隧道的细节(例如无需提前约定好一个确定的通信参数),从而避免复杂的设计实现。
- 全双工:客户端和服务端均可进行读写操作,读写可并行进行。
要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建的流程如下:
- 客户端向服务端请求 RPC 服务。
- 服务端收到 RPC 请求后,创建 VSOA Server Stream,并获得通道 ID (
tunid
)。 - 服务端将通道 ID 返回客户端。
- 服务端等待客户端连接Server Stream。
- 客户端收到 RPC 应答,应答中包含 stream 通道 ID ,更具 ID 创建 VSOA Client Stream。
- 服务端收到客户端 TCP 连接请求,然后通过 TCP 连接发送数据,关闭服务端Stream。
- 客户端接收 TCP 发送内容,删除客户端Stream。
服务端开发
基于原 echo RPC 服务,增加 read RPC 服务。read RPC 服务收到请求后创建server stream,并将 ID号给客户端,然后在新建线程中等到客户端连接server stream 的 TCP 服务端。TCP 服务端收到连接后,发送数据,然后关闭server stream。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_platform.h"
#include "vsoa_server.h"
#define MY_SERVER_ADDR "0.0.0.0"
#define MY_SERVER_PORT (4001)
#define MY_SERVER_NAME "{\"name\":\"echo_server\"}"
#define MY_SERVER_PASSWD "123456"
static void *thread_write_stream(void *arg)
{
vsoa_server_stream_t *stream = (vsoa_server_stream_t *)arg;
int client;
uint8_t data[] = "hello vsoa stream";
/*
*等待tcp客户端连接
*/
client = vsoa_server_stream_accept(stream, NULL, NULL, 0);
/*
* tcp 发送
*/
send(client, data, sizeof(data), 0);
/*
* 关闭stream
*/
vsoa_server_stream_close(stream);
return NULL;
}
static void command_read (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
struct sockaddr_in addr_in;
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
socklen_t namelen = sizeof(addr_in);
pthread_t thread;
vsoa_server_stream_t *stream;
stream = vsoa_server_stream_create(server);
pthread_create(&thread, NULL, thread_write_stream, stream);
vsoa_server_cli_address(server, cid, (struct sockaddr *)&addr_in, &namelen);
printf("read client %d addr is %s:%d, url:%.*s\n", cid, inet_ntoa(addr_in.sin_addr),
ntohs(addr_in.sin_port),(int)url->url_len, url->url);
vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);
}
static void command_echo (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
struct sockaddr_in addr_in;
socklen_t namelen = sizeof(addr_in);
vsoa_server_cli_address(server, cid, (struct sockaddr *)&addr_in, &namelen);
printf("echo client %d addr is %s:%d\n", cid, inet_ntoa(addr_in.sin_addr), ntohs(addr_in.sin_port));
printf("echo message, url:%.*s, param:%.*s, data:%.*s\n",
(int)url->url_len, url->url,
(int)payload->param_len, payload->param,
(int)payload->data_len, (char *)payload->data);
vsoa_server_cli_reply(server, cid, 0, vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
}
int main (int argc, char **argv)
{
vsoa_server_t *server;
/*
* 创建服务端
*/
server = vsoa_server_create(MY_SERVER_NAME);
if (!server) {
fprintf(stderr, "Can not create VSOA server!\n");
return (-1);
}
/*
* 设置密码,设置为NULL,表示密码为空,客户端可以不输入密码
*/
vsoa_server_passwd(server, MY_SERVER_PASSWD);
/*
* 创建RPC服务
*/
vsoa_url_t url;
url.url = "/echo";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, command_echo, NULL);
url.url = "/read";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, command_read, NULL);
/*
* 启动微服务
*/
struct sockaddr_in addr;
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(MY_SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(MY_SERVER_ADDR);
addr.sin_len = sizeof(struct sockaddr_in);
if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(server);
fprintf(stderr, "Can not start VSOA server!\n");
return (-1);
}
/*
* 进入监听事件循环
*/
while (1) {
int cnt;
int max_fd;
fd_set fds;
struct timespec timeout = {1, 0 };
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}
客户端开发
基于原 echo RPC 客户端,将 echo 服务改为 read 服务。每秒发送一次 RPC 请求,请求应答中带有server stream 的 ID号,创建client stream 会自动连接服务端建立 TCP 连接,然后通过 TCP 接收数据,最后关闭client stream 。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#define MY_SERVER_PASSWD "123456"
static void on_command_read (void *arg, struct vsoa_client *client, vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
int tunid;
struct timespec timeout = {1,0};
int stream;
int len;
uint8_t buffer[32];
tunid = vsoa_parser_get_tunid(vsoa_hdr);
stream = vsoa_client_stream_create(client, tunid, &timeout, 0);
len = recv(stream, buffer, sizeof(buffer), 0);
if (len > 0) {
printf("Received %d bytes from VSOA stream: %s\n", len, buffer);
}
vsoa_client_stream_close(stream);
}
int main (int argc, char **argv)
{
vsoa_client_t *client;
vsoa_client_auto_t *cliauto;
/*
* 创建客户端机器人
*/
cliauto = vsoa_client_auto_create(NULL, NULL);
/*
* 由客户端机器人获取客户端对象
*/
client = vsoa_client_auto_handle(cliauto);
/*
* 启动客户端机器人
*/
vsoa_client_auto_start(cliauto, "vsoa://echo_server", MY_SERVER_PASSWD, NULL, 0, 1000, 1000, 1000);
while (true) {
/*
* 检查客户端是否正常链接到服务端
*/
if (vsoa_client_is_connect(client) == false) {
continue;
}
/*
* 注册异步RPC请求
*/
vsoa_url_t url;
url.url = "/read";
url.url_len = strlen(url.url);
vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET, &url, NULL, on_command_read, NULL, NULL);
sleep(1);
}
}
运行效果
这里同时开启 4 个 telnet 命令窗口,分别执行 4 个程序。执行命令及执行效果如下:
命令行 1,启动 server_stream 服务端:
命令行 2,启动位置服务:
命令行 3,启动之前的 client_auto 程序,每秒请求一次 echo RPC 服务:
命令行 4,启动 client_stream 程序,每秒请求一次 read RPC 服务: