srs集群下行edge处理逻辑

news2025/1/10 2:56:27

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{
    srs_error_t err = srs_success;
    
    consumer = new SrsLiveConsumer(this, conn);
    consumers.push_back(consumer);
    if (conn != NULL) {
        conn->srsConsumer = consumer;
    }

    // There should be one consumer, so reset the timeout.
    stream_die_at_ = 0;
    publisher_idle_at_ = 0;
    //通过配置文件中的参数,判断是否是边缘服务器
    //如果是边缘服务器,则调用 play_edge进行拉流播放
    //SrsPlayEdge* play_edge;
    // for edge, when play edge stream, check the state
    if (_srs_config->get_vhost_is_edge(req->vhost)) {
        // notice edge to start for the first client.
        if ((err = play_edge->on_client_play()) != srs_success) {
            return srs_error_wrap(err, "play edge");
        }
    }
    
    return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{
    srs_error_t err = srs_success;
    //SrsEdgeIngester ingester 启动一个新的协程去源站拉流
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
        err = ingester->start();
    } else if (state == SrsEdgeStateIngestStopping) {
        return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");
    }

    
    return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{
    srs_error_t err = srs_success;
    
    if ((err = source->on_publish()) != srs_success) {
        return srs_error_wrap(err, "notify source");
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("edge-igs", this);
    
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "coroutine");
    }
    
    return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{
     while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "do cycle pull");
        }

        // Use protocol in config.
        string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);

        // If follow client protocol, change to protocol of client.
        bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);
        if (follow_client && !req->protocol.empty()) {
            edge_protocol = req->protocol;
        }

        // Create object by protocol.
        srs_freep(upstream);
        //根据边缘协议创建对应的拉流类
        if (edge_protocol == "flv" || edge_protocol == "flvs") {
            upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");
        } else {
            upstream = new SrsEdgeRtmpUpstream(redirect);
        }
        
        if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
            return srs_error_wrap(err, "on source id changed");
        }
        //边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取
        //其中一个节点进行拉流
        //这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?
        //其实如果发现连接的源站没有流,会触发302 redirect重连逻辑
        if ((err = upstream->connect(req, lb)) != srs_success) {
            return srs_error_wrap(err, "connect upstream");
        }
        
        if ((err = edge->on_ingest_play()) != srs_success) {
            return srs_error_wrap(err, "notify edge play");
        }

        // set to larger timeout to read av data from origin.
        upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
        //拉流处理函数
        err = ingest(redirect);
        
        if (srs_is_client_gracefully_close(err)) {
            srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());
            srs_error_reset(err);
        }
        break;
    }
    
}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{
    //第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台
    //如果连接的源站没有流,触发302,再连接另一台
     if (redirect_depth == 0) {
        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);

        // @see https://github.com/ossrs/srs/issues/79
        // when origin is error, for instance, server is shutdown,
        // then user remove the vhost then reload, the conf is empty.
        if (!conf) {
            return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());
        }

        // select the origin.
        std::string server = lb->select(conf->args);
        int port = SRS_DEFAULT_HTTP_PORT;
        if (schema_ == "https") {
            port = SRS_DEFAULT_HTTPS_PORT;
        }
        srs_parse_hostport(server, server, port);

        // Remember the current selected server.
        selected_ip = server;
        selected_port = port;
    } else {
        // If HTTP redirect, use the server in location.
        schema_ = req->schema;
        selected_ip = req->host;
        selected_port = req->port;
    }
    
    sdk_ = new SrsHttpClient();
    if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {
        return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));
    }
    if ((err = sdk_->get(path, "", &hr_)) != srs_success) {
        return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());
    }

    if (hr_->status_code() == 404) {
        return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());
    }
    if ((err = sdk_->get(path, "", &hr_)) != srs_success) {
        return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());
    }

    if (hr_->status_code() == 404) {
        return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());
    }
  
    //如果状态码为302,开启重连另一台逻辑
    string location;
    if (hr_->status_code() == 302) {
        //获取302返回的地址
        location = hr_->header()->get("Location");
    }
    srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());

    if (hr_->status_code() == 302) {
        //最多重试三次
        if (redirect_depth >= 3) {
            return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);
        }

        string app;
        string stream_name;
        if (true) {
            string tcUrl;
            srs_parse_rtmp_url(location, tcUrl, stream_name);

            int port;
            string schema, host, vhost, param;
            srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);

            r->schema = schema; r->host = host; r->port = port;
            r->app = app; r->stream = stream_name; r->param = param;
        }
        //重连
        return do_connect(r, lb, redirect_depth + 1);
    }
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "thread quit");
        }
        
        pprint->elapse();
        
        // pithy print
        if (pprint->can_print()) {
            upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
        }
        
        // read from client.
        SrsCommonMessage* msg = NULL;
        //upstream拉流
        if ((err = upstream->recv_message(&msg)) != srs_success) {
            return srs_error_wrap(err, "recv message");
        }
        
        srs_assert(msg);
        SrsAutoFree(SrsCommonMessage, msg);
        //处理拉到的流
        if ((err = process_publish_message(msg, redirect)) != srs_success) {
            return srs_error_wrap(err, "process message");
        }
    }
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
    srs_error_t err = srs_success;
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume audio");
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume video");
        }
    }
   }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1461348.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2.1_1 进程的概念、组成、特征

2.1_1 进程的概念、组成、特征 (一)进程的概念 程序:是静态的,就是一个存放在磁盘里的可执行文件,就是一系列的指令集合。 进程(Process):是动态的,是程序的一次执行过程…

Java学习--黑马SpringBoot3课程个人总结-2024-02-16

1.添加文章 1.1 富文本编辑器 文章内容需要使用到富文本编辑器,这里咱们使用一个开源的富文本编辑器 Quill 官网地址: https://vueup.github.io/vue-quill/ 安装: npm install vueup/vue-quilllatest --save导入组件和样式: …

基于springboot+vue的B2B平台的医疗病历交互系统(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

【git 使用】git 中head、工作树、和索引分别是什么,有什么关系和区别

HEAD 定义:HEAD 是指向当前所在分支(或者是某个特定的提交)的指针,它表示当前工作目录正在处于哪个提交或分支上。作用:HEAD 指示了当前工作目录的状态,可以通过 HEAD 来确定当前处于哪个分支上&#xff0…

【已解决】PPT无法复制内容怎么办?

想要复制PPT文件里的内容,却发现复制不了,怎么办? 这种情况,一般是PPT文件被设置了以“只读方式”打开,“只读方式”下的PPT无法进行编辑更改,也无法进行复制粘贴的操作。 想要解决这个问题,我…

百度地图接口 | 实现校验收货地址是否超出配送范围

目录 1. 环境准备 2. 代码开发 2.1 application.yml 2.2 OrderServiceImpl 🍃作者介绍:双非本科大三网络工程专业在读,阿里云专家博主,专注于Java领域学习,擅长web应用开发、数据结构和算法,初步涉猎Py…

单片机03--按键--寄存器版

GPIO端口相关寄存器(STM32F40x芯片) 目标: 开关KEY1控制开灯。 分析: KEY1---PA0--->输入---->浮空输入/下拉输入 KEY1不导通时,PA0输入为低电平,KEY1导通时,PA0输入为高电平。 实现…

Elasticsearch 别名(Aliases)的作用

Elasticsearch 8.4.3 别名(Aliases) 一. 介绍二. 别名的优势三. 别名的基本操作3.1 创建别名3.2 查询别名关联的索引3.3 删除别名3.4 更新别名3.5 通过别名查询数据 前言 这是我在这个网站整理的笔记,有错误的地方请指出,关注我,接…

2024.2.21

1、用多线程进行文件拷贝 #include<myhead.h>//参数结构体创建 typedef struct INFO {const char *srcfile;const char *destfile;int length; }Info;//定义获取文件长度的函数 int get_file_len(const char *srcfile,const char *destfile){int srcfd,destfd;//只读形式…

MySQL 查询遇到Illegal mix of collations的错误

业务同学线上业务执行 SQL 时报错&#xff0c; ### Error querying database. Cause: java.sql.SQLException: Illegal mix of collations (utf8_general_ci,IMPLICIT) and (utf8mb4_0900_ai_ci,COERCIBLE) for operation like含义是对like操作非法混合了排序规则(utf8mb4_ge…

qml 电池控件设计(手把手从零开始)

一、说明 做 qt 开发也有好几年了&#xff0c;一直基于QWidget 框架做的开发&#xff0c;使用重写 paint 函数实现各种显示效果&#xff0c;在复杂的 ui 开发中&#xff0c;控件一多或者刷新频率一高&#xff0c;其实也是存在性能限制。 一般来说&#xff0c;qt 的界面对象全部…

MKS T3BI集成蝶阀说明T3B-T3PRS-232Supplement

MKS T3BI集成蝶阀说明T3B-T3PRS-232Supplement

洛谷 P1016 [NOIP1999 提高组] 旅行家的预算【贪心】

原题链接&#xff1a;https://www.luogu.com.cn/problem/P1016 题目描述 一个旅行家想驾驶汽车以最少的费用从一个城市到另一个城市&#xff08;假设出发时油箱是空的&#xff09;。给定两个城市之间的距离 D1​、汽车油箱的容量 C&#xff08;以升为单位&#xff09;、每升汽…

计网day5

六 传输层 6.1 传输层概述 6.2 UDP协议 6.3 TCP协议 TCP连接管理&#xff1a; TCP可靠传输&#xff1a; TCP拥塞控制&#xff1a;

unity学习(32)——跳转到角色选择界面(父子类问题)

新问题 应该是两个脚本之间缺少继承关系 its children 解决起来很简单&#xff0c;把ResceneScript也绑到canvas上就可以了 。 此时&#xff0c;在账号密码正确的情况下&#xff0c;是可以完成场景切换。 对应的代码如下&#xff1a; TMP_Text d GameObject.FindWithTag(&…

【问题解决】删除node节点后如何把node节点重新加入

环境明细 docker版本&#xff1a; 25.0.3kubeadm 版本&#xff1a;v1.25.0 1 在master节点删除node节点 [rootk8s-master ~]# kubectl delete nodes k8s-node-02 node "k8s-node-02" deleted [rootk8s-master ~]# kubectl get nodes -o wide NAME STAT…

基于 GTSAM 的因子图简单实例

Title: 基于 GTSAM 的因子图简单实例 文章目录 I. 引言II. GTSAM 的安装与配置III. 基于 GTSAM 的因子图实例的 C 实现1. C 源码2. CMakeLists.txt 脚本3. 数值结果 IV. 基于 GTSAM 的因子图实例的 Python 实现1. Python 源码2. 数值结果3. 可视化结果 V. 总结 关联博文: 因子图…

day2:信号与槽

思维导图 使用手动连接&#xff0c;将登录框中的取消按钮使用t4版本的连接到自定义的槽函数中&#xff0c;在自定义的槽函数中调用关闭函数 将登录按钮使用qt5版本的连接到自定义的槽函数中&#xff0c;在槽函数中判断u界面上输入的账号是否为"123",密码是否为"…

springboot集成JWT实现token权限认证

vuespringboot登录与注册功能的实现 注&#xff1a;对于JWT的学习&#xff0c;首先要完成注册和登录的功能&#xff0c;本篇博客是基于上述博客的进阶学习&#xff0c;代码页也是在原有的基础上进行扩展 ①在pom.xml添加依赖 <!-- JWT --> <dependency><grou…

【Linux】git操作 - gitee

1.使用 git 命令行 安装 git yum install git 2.使用gitee 注册账户 工作台 - Gitee.com 进入gitee&#xff0c;根据提示注册并登录 新建仓库 仓库名称仓库简介初始换仓库 3.Linux-git操作 进入仓库&#xff0c;选择“克隆/下载” 复制下面的两行命令进行git配置 然后将仓库clo…