(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

news2024/11/25 10:33:20

1. 背景介绍

在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动或停止摄像头的转码过程。

Docker部署 SRS rtmp/flv流媒体服务器-CSDN博客文章浏览阅读360次,点赞7次,收藏5次。SRS(Simple Realtime Server)是一款开源的流媒体服务器,具有高性能、高可靠性、高灵活性的特点,能够支持直播、点播、转码等多种流媒体应用场景。SRS 不仅提供了流媒体服务器,还提供了适用于多种平台的客户端 SDK 和在线转码等辅助服务,是一款十分强大的流媒体解决方案。https://blog.csdn.net/m0_56659620/article/details/135400510?spm=1001.2014.3001.5501

2. 技术选择

在选择技术方案时,考虑到构建视频流转码服务的需求,我们将采用Python编程语言,并结合asyncio和aiohttp库。这一选择基于异步框架的优势,以下是对异步框架和同步框架在视频流转码场景中的优缺点的明确总结:

异步框架的优势:

  • 高并发处理: 异步框架通过非阻塞方式处理请求,能够高效处理大量并发请求,确保系统在高负载下保持稳定性。
  • 异步I/O: 支持异步I/O操作,允许在等待I/O操作完成的同时继续处理其他请求,提高整体效率。
  • 资源利用率高: 能够更有效地利用系统资源,同时处理多个请求,提高视频转码效率。
  • 事件驱动: 采用事件驱动模型,适应实时性要求高的视频流处理,能够立即响应新的转码请求。

同步框架的缺点:

  • 阻塞: 阻塞调用可能导致整个程序停滞,尤其在处理大文件或网络请求时可能引发性能问题,特别是在高并发场景下。
  • 低并发: 每个请求需要独立的线程或进程,可能导致系统资源耗尽,降低并发处理能力,对于需要同时处理多个视频流的情况可能不够高效。

考虑到处理大量并发请求、提高系统性能和响应性的需求,采用异步框架是更为合适的选择。异步框架的高并发处理能力、异步I/O支持、高资源利用率以及事件驱动的特性使其更适用于实时性要求较高的视频流转码服务。

3. 代码实现(必须在linux系统运行,4步骤为部署攻略)

3.1 导入必要的库

首先,我们导入所需的库,包括asyncio、aiohttp、aiohttp_cors和logging。

import asyncio
from aiohttp import web
import aiohttp_cors
import logging

3.2 设置日志

logging.basicConfig(level=logging.INFO)

3.3 配置并发控制和任务跟踪

设置最大同时运行的ffmpeg子进程数量,并使用Semaphore限制并发进程数量。同时,使用字典跟踪正在进行的转码任务。

MAX_CONCURRENT_PROCESSES = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)
transcoding_tasks = {}

3.4 定义启动和停止转码任务的方法

定义启动和停止 RTSP 转码任务的方法

# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):
    # 检查相同RTSP是否已有子进程在处理
    if camera_id in transcoding_tasks:
        return transcoding_tasks[camera_id]

    # 使用Semaphore限制并发进程数量
    async with semaphore:
        # 实际的转码操作,这里需要调用ffmpeg或其他工具
        ffmpeg_command = [
            'ffmpeg',
            '-rtsp_transport', 'tcp',
            '-i', ip,
            '-c:v', 'libx264',
            '-c:a', 'aac',
            '-f', 'flv',
            f'{rtmp_server}/live/livestream{camera_id}'
        ]

        # 创建异步子进程
        process = await asyncio.create_subprocess_exec(*ffmpeg_command)

        # 将任务添加到字典中
        transcoding_tasks[camera_id] = process

        # 等待子进程完成
        await process.communicate()

        # 从字典中移除已完成的任务
        transcoding_tasks.pop(camera_id, None)

# 停止转码方法
async def stop_transcoding(camera_id):
    # 停止转码任务
    if camera_id in transcoding_tasks:
        process = transcoding_tasks[camera_id]
        process.terminate()  # 发送终止信号
        await process.wait()  # 等待进程结束

        # 从字典中移除已停止的任务
        transcoding_tasks.pop(camera_id, None)

3.5 定义Web接口路由

定义Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

# 开始转码任务
async def play_camera(request):
    data = await request.post()

    # 从表单数据中获取摄像头的ID和rtsp流
    camera_id = data.get('id')
    rtsp = data.get('ip')

    # 这里设置你的 RTMP 服务器地址
    rtmp_server = 'rtmp://192.168.14.93:1935'

    # 执行实际的转码操作
    task = await perform_transcoding(rtsp, camera_id, rtmp_server)

    # 返回包含转码后的RTMP URL的JSON响应
    rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'
    return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})


# 停止转码任务
async def stop_camera(request):
    data = await request.post()
    camera_id = data.get('id')

    # 停止指定摄像头的转码任务
    await stop_transcoding(camera_id)

    return web.json_response({'code':200,'message': '转码停止成功'})


# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):

    # 获取所有正在运行的任务的列表
    tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]

    # 并发停止所有任务
    await asyncio.gather(*tasks)

    # 清空字典,表示所有任务都已停止
    transcoding_tasks.clear()

    return web.json_response({'code':200,'message': '转码停止成功'})

3.6 创建Web应用和配置CORS

创建Web应用,配置CORS(跨域资源共享)中间件,以确保接口可以被跨域访问。

app = web.Application()

# CORS配置
cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_headers="*",
    )
})

3.7 添加Web接口路由

添加Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由

3.8 添加CORS中间件

添加CORS中间件,确保接口可以被跨域访问。

# 添加 CORS 中间件
for route in list(app.router.routes()):
    cors.add(route)

3.9 运行Web应用

运行Web应用,监听指定的主机和端口。

if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

 3.10 完整代码

import asyncio
from aiohttp import web
import aiohttp_cors
import logging

# 设置日志级别
logging.basicConfig(level=logging.INFO)

# 最大同时运行的ffmpeg子进程数量
MAX_CONCURRENT_PROCESSES = 5

# 使用Semaphore限制并发进程数量
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)

# 字典用于跟踪正在进行的转码任务
transcoding_tasks = {}

# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):
    # 检查相同RTSP是否已有子进程在处理
    if camera_id in transcoding_tasks:
        return transcoding_tasks[camera_id]

    # 使用Semaphore限制并发进程数量
    async with semaphore:
        # 实际的转码操作,这里需要调用ffmpeg或其他工具
        ffmpeg_command = [
            'ffmpeg',
            '-rtsp_transport', 'tcp',
            '-i', ip,
            '-c:v', 'libx264',
            '-c:a', 'aac',
            '-f', 'flv',
            f'{rtmp_server}/live/livestream{camera_id}'
        ]

        # 创建异步子进程
        process = await asyncio.create_subprocess_exec(*ffmpeg_command)

        # 将任务添加到字典中
        transcoding_tasks[camera_id] = process

        # 等待子进程完成
        await process.communicate()

        # 从字典中移除已完成的任务
        transcoding_tasks.pop(camera_id, None)

# 停止转码方法
async def stop_transcoding(camera_id):
    # 停止转码任务
    if camera_id in transcoding_tasks:
        process = transcoding_tasks[camera_id]
        process.terminate()  # 发送终止信号
        await process.wait()  # 等待进程结束

        # 从字典中移除已停止的任务
        transcoding_tasks.pop(camera_id, None)


# 开始转码任务
async def play_camera(request):
    data = await request.post()

    # 从表单数据中获取摄像头的ID和rtsp流
    camera_id = data.get('id')
    rtsp = data.get('ip')

    # 这里设置你的 RTMP 服务器地址
    rtmp_server = 'rtmp://192.168.14.93:1935'

    # 执行实际的转码操作
    task = await perform_transcoding(rtsp, camera_id, rtmp_server)

    # 返回包含转码后的RTMP URL的JSON响应
    rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'
    return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})


# 停止转码任务
async def stop_camera(request):
    data = await request.post()
    camera_id = data.get('id')

    # 停止指定摄像头的转码任务
    await stop_transcoding(camera_id)

    return web.json_response({'code':200,'message': '转码停止成功'})


# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):

    # 获取所有正在运行的任务的列表
    tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]

    # 并发停止所有任务
    await asyncio.gather(*tasks)

    # 清空字典,表示所有任务都已停止
    transcoding_tasks.clear()

    return web.json_response({'code':200,'message': '转码停止成功'})


app = web.Application()

# CORS配置
cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_headers="*",
    )
})

app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由

# 添加 CORS 中间件
for route in list(app.router.routes()):
    cors.add(route)


if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

4. 部署(Docker环境)

部署所需Dockerfile文件代码如下

FROM python:3.7-slim

WORKDIR /app

COPY requirements.txt .

RUN apt-get update \
    && apt-get install -y ffmpeg \
    && rm -rf /var/lib/apt/lists/* \
    && pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "async_io_io.py"]

部署所需requirements.txt如下

aiohttp
aiohttp-cors
ffmpeg

根目录进行打包及启动

请求接口实现转码

5. 总结

通过以上的步骤,我们成功构建了一个流媒体服务器控制接口,可以通过Web接口实现对摄像头的 RTSP 转码任务的动态管理。这个接口可以集成到现有的流媒体服务器中,提供更多控制和管理的可能性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1366519.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OPPO Find X7 Ultra 发布,搭载双潜望四主摄摄影技术

2024年1月8日,深圳——OPPO发布旗舰Find X7 Ultra,定义移动影像的终极形态。Find X7 Ultra 首创的双潜望四主摄构成哈苏大师镜头群,以六个光学品质焦段提供目前手机最强大、品质最高的多摄变焦能力。首次搭载专为超光影图像引擎定制的一英寸传…

基于黑猩猩算法优化的Elman神经网络数据预测 - 附代码

基于黑猩猩算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于黑猩猩算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于黑猩猩优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要&#x…

Halcon灰度的平均值和偏差intensity

Halcon灰度的平均值和偏差 intensity 算子用于计算单张图像上多个区域的灰度值的平均值和偏差。该算子的原型如下: intensity (Regions, Image ::: Mean, Deviation )其各参数的含义如下。 参数1:Regions(输入参数),…

Golang : Bson\Json互转

代码 package bson_jsonimport ("encoding/json""errors""fmt""gopkg.in/mgo.v2/bson""os""testing" )type User struct {Name string json:"name,omitempty" bson:"name,omitempty"CSD…

探讨一下WebINFO 下的一些思考

在平时的开发中,我们经常看到一个/WEB-INF 这个目录,这个是web 容器初始化加载的一个标准路径。官方解释:WEB-INF 是 Java 的 web 应用的安全目录。所谓安全就是客户端无法访问,只有服务端可以访问的目录。也就是说,这…

虾皮上传产品软件:如何使用虾皮平台上传产品

在虾皮(Shopee)平台上,卖家可以通过多种方法来上传产品,以简化商品上架过程。本文将介绍一些常用的产品上传方法,帮助卖家选择最适合自己的方式。 先给大家推荐一款shopee知虾数据运营工具 知虾免费体验地址&#xff…

【MATLAB】ICEEMDAN_LSTM神经网络时序预测算法

有意向获取代码,请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 ICEEMDAN-LSTM神经网络时序预测算法是一种结合了改进的完全扩展经验模态分解(ICEEMDAN)和长短期记忆神经网络(LSTM)的时间序列预测方法。 …

xss-labs(1-5)

环境准备: 靶场下载:下载仓库 zhangmanhui/xss-labs - Gitee.com 启动phpStudy 搭建将文件解压拉到phpStudy的www目录下就行 最后直接访问:127.0.0.1/xss-labs-master/ 最后再准备一个浏览器的插件用来发送请求:HackBar 插件都配置好了,直接加载到你的浏览器的扩展…

了解激光打标机:技术原理、应用领域与优势

激光打标机是一种利用激光技术进行打标的高科技设备。其技术原理是,通过将高能量密度的激光照射在工件表面,使表面的材料发生物理或化学变化,从而形成永久性的标记。下面将分别介绍激光打标机的技术原理、应用领域和优势。 一、技术原理 激光…

【Verilog】组合电路的设计和时序电路的设计

系列文章 数值(整数,实数,字符串)与数据类型(wire、reg、mem、parameter) 运算符 数据流建模 行为级建模 结构化建模 系列文章组合电路的设计时序电路的设计 组合电路的设计 组合电路的特点是&#xff0c…

python(17)--文件的输入/输出

前言 在Python中,文件文本操作是非常重要的,主要有以下几个原因: 数据持久性:当你需要长期存储数据,如用户的个人信息、交易记录或数据库元数据等,将数据保存在文件中是一种常见的方法。文件系统提供了持…

STL容器之vector基本操作

目录 vector基本操作 vector构造函数 vector的遍历操作 1.重载[ ]进行遍历。 2.使用迭代器进行遍历。 3.使用范围for循环进行遍历。 4.使用at成员函数进行遍历 。 vector空间增长 1.size:获取当前元素的个数。 2.capacity:获取能存储的元素的个…

ceres在优化过程中保持指定参数块不变

ceres在优化过程中保持指定参数块不变 在solve前利用SetParameterBlockConstant()设置想固定不变的参数块 example: //添加误差方程 ceres::CostFunction* cost_function nullptr;cost_function BundleAdjustmentGCPsCostFunction::Create(px, py, ptGCP.second.x_c, ptGC…

kettle分页抽取数据

背景 kettle抽取数据大家还是比较熟悉的,kettle在抽取数据的时候会开启很多通道,同时抽取,但是我现在遇到一个场景: 从一个mysql数据库里获取“已办”状态的数据id,然后拿这些id去一个oracle数据库里查询&#xff0c…

CCNP课程实验-06-EIGRP-Trouble-Shooting

目录 实验条件网络拓朴 环境配置开始排错错误1:没有配置IP地址,IP地址宣告有误错误2:R3配置了与R1不同的K值报错了。错误3:R4上的AS号配置错,不是1234错误4:R2上配置的Key-chain的R4上配置的Key-chain不一致…

3.7 THREAD SCHEDULING AND LATENCY TOLERANCE

线程调度严格来说是一个实现概念。因此,它必须在特定硬件实现的背景下进行讨论。在迄今为止的大多数实现中,分配给SM的块被进一步分为32个称为warps的线程单元。warps的大小是特定于实现的。warps不是CUDA规范的一部分;然而,了解w…

python+playwright 学习-1.环境准备与快速开始

前言 说到 web 自动化,大家最熟悉的就是 selenium 了,selenium 之后又出现了三个强势的框架Puppeteer、CyPress、TestCafe, 但这3个都需要掌握 JavaScript 语言,所以只是少部分人在用。 2020年微软开源一个 UI 自动化测试工具 P…

基于供需算法优化的Elman神经网络数据预测 - 附代码

基于供需算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于供需算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于供需优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要:针…

scala 安装和创建项目

Scala,一种可随您扩展的编程语言:从小型脚本到大型多平台应用程序。Scala不是Java的扩展,但它完全可以与Java互操作。在编译时,Scala文件将转换为Java字节码并在JVM(Java虚拟机)上运行。Scala被设计成面向对…

[De1ctf 2019]SSRF Me

目录 具体做题分析: 字符串拼接: 哈希拓展攻击: 点开是一段flask代码,经过还原后格式如下: #!/usr/bin/env python# encodingutf-8from flask import Flask, requestimport socketimport hashlibimport urllibimpo…