【从零开始一步步学习VSOA开发】VSOA数据流

news2025/1/12 9:04:58

VSOA数据流

概念

实际业务中常常存在既有实时命令通信,又有非实时的大数据通信,如文件、音视频传输服务等,如果使用常规的 RPC 或订阅/发布功能来实现,将实时命令和大数据传输混在一起,则会影响 RPC 通道响应的实时性,尤其在需要 RPC 实时控制的应用中就显得极不合理;另一种方式是针对此类大数据的每一个通信,均额外实现对应的服务,但这会给开发者带来额外的编程复杂度,同时增加整个系统的资源开销。

VSOA 中提供了全双工高速并行数据流,称为 “VSOA Stream”。它允许程序进行长时间、大量的数据传输,同时可保证 RPC 通道的实时响应。

VSOA Stream 是一个在已有服务链路上建立的传输隧道,具有以下功能特点:

  • 独立性:VSOA Stream 不会影响命令通道的数据交互,即不影响 RPC 的实时性。
  • 临时性:VSOA Stream 按需创建,在数据传输完成后可随时销毁,不持续占用连接资源。
  • 用户无感:VSOA Stream 的传输隧道在 VSOA 内部自动协商建立,用户无需关心创建隧道的细节(例如无需提前约定好一个确定的通信参数),从而避免复杂的设计实现。
  • 全双工:客户端和服务端均可进行读写操作,读写可并行进行。

要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建的流程如下:

  1. 客户端向服务端请求 RPC 服务。
  2. 服务端收到 RPC 请求后,创建 VSOA Server Stream,并获得通道 ID (tunid)。
  3. 服务端将通道 ID 返回客户端。
  4. 服务端等待客户端连接Server Stream。
  5. 客户端收到 RPC 应答,应答中包含 stream 通道 ID ,更具 ID 创建 VSOA Client Stream。
  6. 服务端收到客户端 TCP 连接请求,然后通过 TCP 连接发送数据,关闭服务端Stream。
  7. 客户端接收 TCP 发送内容,删除客户端Stream。

服务端开发

基于原 echo RPC 服务,增加 read RPC 服务。read RPC 服务收到请求后创建server stream,并将 ID号给客户端,然后在新建线程中等到客户端连接server stream 的 TCP 服务端。TCP 服务端收到连接后,发送数据,然后关闭server stream。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_platform.h"
#include "vsoa_server.h"

#define MY_SERVER_ADDR                      "0.0.0.0"
#define MY_SERVER_PORT                      (4001)
#define MY_SERVER_NAME                      "{\"name\":\"echo_server\"}"
#define MY_SERVER_PASSWD                    "123456"


static void *thread_write_stream(void *arg)
{
    vsoa_server_stream_t   *stream = (vsoa_server_stream_t *)arg;
    int                     client;
    uint8_t data[] = "hello vsoa stream";

    /*
     *等待tcp客户端连接
     */
    client = vsoa_server_stream_accept(stream, NULL, NULL, 0);

    /*
     * tcp 发送
     */
    send(client, data, sizeof(data), 0);

    /*
     * 关闭stream
     */
    vsoa_server_stream_close(stream);

    return NULL;
}

static void command_read (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                          vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
    struct sockaddr_in addr_in;
    uint32_t           seqno = vsoa_parser_get_seqno(vsoa_hdr);
    socklen_t          namelen = sizeof(addr_in);

    pthread_t               thread;
    vsoa_server_stream_t   *stream;

    stream = vsoa_server_stream_create(server);
    pthread_create(&thread, NULL, thread_write_stream, stream);

    vsoa_server_cli_address(server, cid, (struct sockaddr *)&addr_in, &namelen);
    printf("read client %d addr is %s:%d, url:%.*s\n", cid, inet_ntoa(addr_in.sin_addr),
           ntohs(addr_in.sin_port),(int)url->url_len, url->url);

    vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);
}

static void command_echo (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                          vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
    struct sockaddr_in addr_in;
    socklen_t          namelen = sizeof(addr_in);

    vsoa_server_cli_address(server, cid, (struct sockaddr *)&addr_in, &namelen);
    printf("echo client %d addr is %s:%d\n", cid, inet_ntoa(addr_in.sin_addr), ntohs(addr_in.sin_port));
    printf("echo message, url:%.*s, param:%.*s, data:%.*s\n",
           (int)url->url_len, url->url,
           (int)payload->param_len, payload->param,
           (int)payload->data_len, (char *)payload->data);

    vsoa_server_cli_reply(server, cid, 0, vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
}

int main (int argc, char **argv)
{
    vsoa_server_t *server;

    /*
    * 创建服务端
    */
    server = vsoa_server_create(MY_SERVER_NAME);
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }

    /*
    * 设置密码,设置为NULL,表示密码为空,客户端可以不输入密码
    */
    vsoa_server_passwd(server, MY_SERVER_PASSWD);

    /*
    * 创建RPC服务
    */
    vsoa_url_t url;
    url.url     = "/echo";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_echo, NULL);
    url.url     = "/read";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_read, NULL);
    /*
    * 启动微服务
    */
    struct sockaddr_in addr;
    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(MY_SERVER_PORT);
    addr.sin_addr.s_addr = inet_addr(MY_SERVER_ADDR);
    addr.sin_len         = sizeof(struct sockaddr_in);

    if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

    /*
    * 进入监听事件循环
    */
    while (1) {
        int     cnt;
        int     max_fd;
        fd_set  fds;
        struct timespec timeout = {1, 0 };

        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return (0);
}

客户端开发

基于原 echo RPC 客户端,将 echo 服务改为 read 服务。每秒发送一次 RPC 请求,请求应答中带有server stream 的 ID号,创建client stream 会自动连接服务端建立 TCP 连接,然后通过 TCP 接收数据,最后关闭client stream 。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"

#define MY_SERVER_PASSWD                    "123456"

static void on_command_read (void *arg, struct vsoa_client *client, vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
    int             tunid;
    struct timespec timeout = {1,0};
    int             stream;
    int             len;
    uint8_t         buffer[32];


    tunid = vsoa_parser_get_tunid(vsoa_hdr);

    stream = vsoa_client_stream_create(client, tunid, &timeout, 0);

    len = recv(stream, buffer, sizeof(buffer), 0);
    if (len > 0) {
        printf("Received %d bytes from VSOA stream: %s\n", len, buffer);
    }

    vsoa_client_stream_close(stream);
}

int main (int argc, char **argv)
{
    vsoa_client_t *client;
    vsoa_client_auto_t *cliauto;

    /*
     * 创建客户端机器人
     */
    cliauto = vsoa_client_auto_create(NULL, NULL);
    /*
     * 由客户端机器人获取客户端对象
     */
    client  = vsoa_client_auto_handle(cliauto);

    /*
     * 启动客户端机器人
     */
    vsoa_client_auto_start(cliauto, "vsoa://echo_server", MY_SERVER_PASSWD, NULL, 0, 1000, 1000, 1000);

    while (true) {
        /*
         * 检查客户端是否正常链接到服务端
         */
        if (vsoa_client_is_connect(client) == false) {
            continue;
        }
        /*
         * 注册异步RPC请求
         */
        vsoa_url_t url;
        url.url     = "/read";
        url.url_len = strlen(url.url);
        vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET, &url, NULL, on_command_read, NULL, NULL);

        sleep(1);
    }
}

运行效果

这里同时开启 4 个 telnet 命令窗口,分别执行 4 个程序。执行命令及执行效果如下:
命令行 1,启动 server_stream 服务端:
image.png
命令行 2,启动位置服务:
image.png
命令行 3,启动之前的 client_auto 程序,每秒请求一次 echo RPC 服务:
image.png
命令行 4,启动 client_stream 程序,每秒请求一次 read RPC 服务:
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1996946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言程序设计-[10] for语句循环结构

1、for语句循环结构定义 for语句循环结构的一般形式、流程图和执行过程如下&#xff1a; ​ 注1&#xff1a;计算表达式2是循环的判定表达式。与前面一样&#xff0c;这个表达式可以是任意的&#xff0c;只要有值就行&#xff0c;遵循非0即真的原则。 注2&#xff1a;一个循…

(源码)Springboot项目集成Activiti工作流,前端Vue,Bpmn.js

前言 activiti工作流引擎项目&#xff0c;企业erp、oa、hr、crm等企事业办公系统轻松落地&#xff0c;一套完整并且实际运用在多套项目中的案例&#xff0c;满足日常业务流程审批需求。 一、项目形式 springbootvueactiviti集成了activiti在线编辑器&#xff0c;流行的前后端…

【Python】nn.nn.CircularPad1、2、3d函数和nn.ConstantPad1、2、3d函数详解和示例

前言 在深度学习中&#xff0c;尤其是在处理图像、音频或其他多维数据时&#xff0c;数据填充&#xff08;Padding&#xff09;是一个常见的操作。填充不仅可以保持数据的空间维度&#xff0c;还能在卷积操作中避免信息丢失。PyTorch提供了多种填充方式&#xff0c;其中nn.Cir…

unity 本地使用Json(全套)

提示&#xff1a;文章有错误的地方&#xff0c;还望诸位大神不吝指教&#xff01; 文章目录 前言一、Json是什么&#xff1f;二、创建Json文件1.在线编辑并转实体类&#xff08;C#&#xff09;2.Json文件 三、解析Json并使用四、报错&#xff1a;JsonError&#xff1a;JsonExce…

使用frp内网穿透将个人主机上的MySQL发布到公网上,再通过python管理MySQL

目录 1.frp内网穿透部署 1.frp服务器 1.开放端口 2.上传软件包 3.解压 4.配置文件 2.frp客户端 1.上传软件包 2.配置文件 3.启动测试 1.浏览器查看服务器上连接的客户端数量 2.启动测试 2.MySQL安装 3.python3的安装使用 4.python管理MySQL 1.pip 2.pandas 3.p…

Axure 变量魔法:揭秘局部与全局的动态协同

前言 在 Axure 的世界中&#xff0c;变量是连接设计者意图与用户行为的桥梁。 局部变量&#xff0c;以其独特的灵活性和针对性&#xff0c;允许我们在特定情境下快速响应用户的操作。 而全局变量&#xff0c;则以其广泛的覆盖范围&#xff0c;为跨页面的一致性和连贯性提供了…

003集——C#数据类型 及大小端序转换——C#学习笔记

如需得到一个类型或一个变量在特定平台上的准确尺寸&#xff0c;可以使用 sizeof 方法。表达式 sizeof(type) 产生以字节为单位存储对象或类型的存储尺寸。下面举例获取任何机器上 int 类型的存储尺寸&#xff1a; using System;namespace DataTypeApplication {class Program{…

第26课 Scratch入门篇:乘坐公交车

乘坐公交车 故事背景&#xff1a; 又是一天结束了&#xff0c;在繁忙的城市里&#xff0c;深夜加班的上班族们挤上最后一班公交车&#xff0c;回到自己温馨的家 程序原理&#xff1a; 这节课最大的难度就是角色的设计以及角色的切换&#xff0c;背景的不停移动其实跟“猫咪跑…

论MATLAB强大的容错性

如何看待“低代码”开发平台的兴起&#xff1f; “低代码”让非专业人士也能快速构建应用程序。这种新兴技术正在挑战传统软件开发模式&#xff0c;引发了IT行业的广泛讨论。低代码平台是提高效率的利器&#xff0c;还是降低了编程门槛导致质量下降&#xff1f;它会改变开发者…

【报错解决】MySQL报错:sql_mode=only_full_group_by

文章目录 报错信息DataGrip 报错还原Navicat 报错还原 报错原因解决方案查看当前 sql mode方案一&#xff1a;临时解决方案二&#xff1a;永久解决方案三&#xff1a;使用 any_value() 或 group_concat()方案四&#xff1a;调整实现思路&#xff0c;避开 GROUP BY 使用 我是一名…

赛博朋克未来的第一个创想,低空飞行走近现实

英特尔创始人Andy Grove曾在《Only the Paranoid Survive》&#xff08;只有偏执狂才能生存&#xff09;这本书中提到一个观点&#xff1a;战略拐点往往发生在一个竞争因素&#xff0c;或者多个竞争因素突然变成原来10倍的时候&#xff0c;这时候往往预示着生意本质已经发生改变…

基于大数据的气象数据分析与可视化系统设计与实现【爬虫海量数据,LSTM预测】

文章目录 有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主项目介绍研究目的研究意义研究思路可视化展示每文一语 有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主 项目介绍 本课题主要针对气象数据进行分析以及可视化…

【CPP】slt-list由认识到简化模拟实现深度理解~

关于我&#xff1a; 睡觉待开机&#xff1a;个人主页 个人专栏: 《优选算法》《C语言》《CPP》 生活的理想&#xff0c;就是为了理想的生活! 作者留言 PDF版免费提供&#xff1a;倘若有需要&#xff0c;想拿我写的博客进行学习和交流&#xff0c;可以私信我将免费提供PDF版。…

大数据信用报告查询哪家平台的比较好?

相信在搜索大数据信用的你&#xff0c;已经因为大数据信用不好受到了挫折&#xff0c;想详细了解一下自己的大数据信用&#xff0c;但是找遍了网络上的平台之后才发现&#xff0c;很多平台都只提供查询服务&#xff0c;想要找一个专业的平台查询和讲解很困难。下面本文就为大家…

【MongoDB】2.MongoDB导入文件

目录 一、MongoDB Compass 二、mongoimport 1、安装 2、语法&#xff1a; 3、可能出现的错误 三、MongoDB的GridFS 1、介绍 2、语法 一、MongoDB Compass 这个简单&#xff0c;不做赘述 二、mongoimport Mongoimport是一个用于导入数据到MongoDB的工具&#xff0c;默…

IWDG 溢出时间计算

iwdg看门狗溢出时间&#xff0c;就是之前算过的&#xff0c;但是再记录一次 计算过程如下&#xff1a;因为iwdg是独立看门狗&#xff0c;是用的LSI, 所以在f1系列lsi的时钟频率是40khz&#xff0c;也就是Fiwdg的频率是40khz&#xff0c;频率除以psc&#xff08;分频系数&#…

计算机毕业设计 校园志愿者管理系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

C++ | Leetcode C++题解之第331题验证二叉树的前序序列化

题目&#xff1a; 题解&#xff1a; class Solution { public:bool isValidSerialization(string preorder) {int n preorder.length();int i 0;int slots 1;while (i < n) {if (slots 0) {return false;}if (preorder[i] ,) {i;} else if (preorder[i] #){slots--;i…

全面介绍 Apache Doris 数据灾备恢复机制及使用示例

引言 Apache Doris 作为一款 OLAP 实时数据仓库&#xff0c;在越来越多的中大型企业中逐步占据着主数仓这样的重要位置&#xff0c;主数仓不同于 OLAP 查询引擎的场景定位&#xff0c;对于数据的灾备恢复机制有比较高的要求&#xff0c;本篇就让我们全面的介绍和示范如何利用这…

红黑树的插入与删除

文章目录 红黑树概念红黑树的性质&#xff1a; 红黑树的插入操作情况一情况二情况三 小总结红黑树的验证红黑树的删除一.删除单孩子节点1. 删除节点颜色为黑色2. 删除颜色为红色 二. 删除叶子节点1. 删除节点为红色2.删除节点为黑色2.1兄弟节点为黑色&#xff0c;有孩子节点&am…