PyQt下载M3U8文件

news2025/1/6 5:33:03

下载模型

from pathlib import Path
from typing import Any, Union
from pydantic import Field, BaseModel

class DownloadUrlModel(BaseModel):
    title: str = Field(..., description="文件名")
    save_path: Union[Path, str] = Field(..., description="文件路径")
    url: str = Field(..., description="m3u8文件路径")
    isM3u8: bool = Field(True, description="是否为m3u8文件")

M3U8解密

pip install pycryptodome

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad


class DecodeByte:
    # 解密
    @staticmethod
    def do_decode(key, iv, data, method="AES-128") -> bytes:
        if isinstance(key, str):
            key = key.encode('utf-8')
        if isinstance(iv, str):
            iv = iv.encode('utf-8')
        if "AES-128" == method:
            aes = AES.new(key, AES.MODE_CBC, iv)
            if data and (len(data) % 16) != 0:
                data = pad(data, 16)
            return aes.decrypt(data)
        else:
            return None

下载线程

FFMPEG_EXE_PATHffmpeg.exe的绝对路径,若没有在官网上下载
ffmpeg下载M3U8没有进度显示,并且它是同步下载,会比较慢,使用httpx异步比较快

# coding = utf-8
import asyncio
import os
import re
import shutil
from pathlib import Path
from typing import Union

import httpx
from PySide6.QtCore import QThread, Signal

from ..models import DownloadUrlModel
from ..utils import HTTPRequest, DecodeByte
from ..config import FFMPEG_EXE_PATH


class DownloadM3U8Thread(QThread):
    loggerSignal = Signal(str)
    progressSignal = Signal(int)
    stopSignal = Signal(bool)

    def __init__(self, parent=None):
        super().__init__(parent)
        self.__is_stop = False
        self.__model: DownloadUrlModel = None
        self.finished_file: Path = None
        self.failed_file: Path = None
        self.temp_path: Path = None

        self.num = 0
        self.list_length = 0
        self.retry_max = 7
        # 解密信息
        self.cry = {
            "key": "",
            "iv": "",
            "method": "",
        }

        self.started.connect(self.startedSlot)

    def sendRequest(self, url: str, *, method='GET', **kwargs):
        response = httpx.request(method, url, verify=False, headers=HTTPRequest.headers, timeout=HTTPRequest.timeout,
                                 **kwargs)
        response.raise_for_status()
        return response

    async def aiohttp_send(self, client: httpx.AsyncClient, url: str, index: int, *, retry_count=0):
        ts_name = f'{str(index).zfill(6)}.ts'
        try:
            response = await client.get(url)
            self.save_data_file(response.content, ts_name)
            progress = round(self.num / self.list_length * 100, 3)
            self.loggerSignal.emit(f'下载进度: {progress} %')
            self.progressSignal.emit(int(progress))
        except httpx.ConnectError as e:
            await asyncio.sleep(4)
            if retry_count < self.retry_max:
                retry_count += 1
                await self.aiohttp_send(client, url, index, retry_count=retry_count)
            else:
                self.save_failed_file(f'{url}_{ts_name}')

    async def aiohttp_download(self, urls):
        async with httpx.AsyncClient(headers=HTTPRequest.headers, timeout=HTTPRequest.timeout) as client:
            tasks = []
            self.list_length = len(urls)
            for index, url in enumerate(urls, 1):
                if self.__is_stop:
                    self.wait()
                    self.quit()
                    break
                ts_name = f'{str(index).zfill(6)}.ts'
                if ts_name in self.open_finished_file():
                    continue
                task = asyncio.ensure_future(self.aiohttp_send(client, url, index))
                tasks.append(task)
            await asyncio.gather(*tasks)

    def get_full_ts_url(self, url: str, ts_name: str) -> str:
        """
        获取完整的ts文件url
        :param url: 原始url
        :param ts_name: ts文件名
        :return: str
        """
        if ts_name.startswith('http'):
            return ts_name
        tl = ts_name.split('/')
        new_url = []
        # 循环url,去掉ts name中重复的部分
        for s in url.split('/')[:-1]:
            if s in tl:
                tl.remove(s)
            new_url.append(s)
        # 拼接ts name
        new_url.extend(tl)
        result = '/'.join(new_url)
        return result

    def setCryInfo(self, text, url):
        # 获取加密参数
        x_key = re.findall('#EXT-X-KEY:(.*?)\n', text)
        cry_obj = dict()
        if len(x_key) > 0:
            # 提取
            for item in x_key[0].split(','):
                key = item.split('=')[0]
                value = item.replace(key, '')[1:].replace('"', '')
                cry_obj[key] = value
            # format
            if cry_obj.get('URI') and not cry_obj['URI'].startswith('http'):
                cry_obj['URI'] = self.get_full_ts_url(url, cry_obj['URI'])
            elif not cry_obj.get('URI'):
                cry_obj['URI'] = ''
            # 获取key
            res = self.sendRequest(cry_obj['URI'])
            self.cry['key'] = res.content
            # 加密方式
            self.cry['method'] = cry_obj.get('METHOD')
            # iv值
            if cry_obj.get('IV'):
                self.cry['iv'] = cry_obj['IV'][2:18]
        else:
            pass

    def save_data_file(self, data: bytes, ts_name: str):
        # 如果有加密,需要data解密后再存储
        if self.cry.get('key'):
            # 如果源文件有iv就读取,如果没有就用文件名
            iv = self.cry["iv"] if self.cry.get("iv") else ts_name.split('.')[0].zfill(16)
            data = DecodeByte.do_decode(self.cry["key"], iv, data, self.cry["method"])
            if not data:
                raise Exception('解密失败')
        # 保存
        with open(self.temp_path / ts_name, 'wb') as f:
            f.write(data)
            self.save_finished_file(ts_name)
            self.num += 1

    def open_failed_file(self) -> list:
        return self.failed_file.read_text(encoding='utf-8').split('\n')

    def save_failed_file(self, ts_name: str):
        with self.failed_file.open('a', encoding='utf-8') as f:
            f.write(ts_name + '\n')

    def open_finished_file(self) -> list:
        return self.finished_file.read_text(encoding='utf-8').split('\n')

    def save_finished_file(self, ts_name: str):
        with self.finished_file.open('a', encoding='utf-8') as f:
            f.write(ts_name + '\n')

    def save_ffmpeg_file(self, file_list: list):
        # 保存ffmpeg合并文件
        ffmpeg_file = self.temp_path / 'ffmpeg.txt'
        ffmpeg_file.touch(exist_ok=True)
        with ffmpeg_file.open('a+', encoding='utf-8') as f:
            for file in file_list:
                f.write(f"file '{file}'\n")
        return ffmpeg_file

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
        # 获取所有缓存文件
        file_list = [str(file.name) for file in source_path.glob('**/*.ts')]
        if not file_list:
            return
        # 名称排序
        file_list.sort(key=lambda s: s.split('.')[0])
        ffmpeg_txt = self.save_ffmpeg_file(file_list)
        # 文件总数
        length = len(file_list)
        # 开始合并文件
        cmd = f'{FFMPEG_EXE_PATH} -f concat -safe 0 -i  {ffmpeg_txt}  -c  copy {dest_file}'
        os.system(cmd)
        # with open(dest_file, 'ab') as f:
        #     # 循环文件列表
        #     for i, file in enumerate(file_list, 1):
        #         # 读取每个文件
        #         with open(os.path.join(source_path, file), 'rb') as rf:
        #             # 把每个文件的内容 追加到同一个文件
        #             data = rf.read()
        #             f.write(data)
        #         # 打印进度
        #         self.loggerSignal.emit('合并中: {:3.2f}%'.format(i / length * 100))
        # # 移除缓存文件夹
        self.sleep(2)
        try:
            shutil.rmtree(source_path.parent)
        except Exception as e:
            self.loggerSignal.emit(f'删除文件夹错误:{e}')

    def run(self):
        url = self.__model.url
        save_path = Path(self.__model.save_path)
        video_path = save_path / self.__model.title
        self.temp_path = video_path / 'temp'
        self.finished_file = self.temp_path / 'finished.txt'
        self.failed_file = self.temp_path / 'failed.txt'

        self.temp_path.mkdir(parents=True, exist_ok=True)
        self.finished_file.touch(exist_ok=True)
        self.failed_file.touch(exist_ok=True)

        try:
            self.loggerSignal.emit(f"开始加载m3u8文件")
            m3u8_text = self.sendRequest(url).text
            self.setCryInfo(m3u8_text, url)
            new_urls = []
            self.loggerSignal.emit("解析m3u8文件")
            for line in m3u8_text.split('\n'):
                if not line.startswith('#'):
                    new_urls.append(self.get_full_ts_url(url, line))
            self.loggerSignal.emit('开始下载视频...')
            asyncio.run(self.aiohttp_download(new_urls))
            self.loggerSignal.emit('开始合并文件...')
            self.combine_ts(self.temp_path, str(save_path / f'{self.__model.title}.mp4'))
            self.loggerSignal.emit(f"视频下载完成")
        except Exception as e:
            self.loggerSignal.emit(f"视频下载失败")
            return

    def stop(self):
        self.__is_stop = True
        self.stopSignal.emit(True)
        self.loggerSignal.emit('下载已停止')

    def startedSlot(self):
        self.__is_stop = False
        self.stopSignal.emit(False)

    def setDownloadTask(self, model: DownloadUrlModel):
        self.__model = model

combine_ts

使用open方法写的合并函数,会出现格式错误的问题,ffmpeg没有此问题

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
        # 获取所有缓存文件
        pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2271246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git的使用流程(详细教程)

目录 01.Git是什么&#xff1f; 1.1 Git简介 1.2 SVN与Git的最主要的区别 1.3 GIt主要特点 02.Git是干什么的&#xff1f; 2.1.Git概念汇总 2.2 工作区/暂存区/仓库 2.3 Git使用流程 03.Git的安装配置 3.1 Git的配置文件 3.2 配置-初始化用户 3.3 Git可视化…

ImageNet 2.0?自动驾驶数据集迎来自动标注新时代

引言&#xff1a; 3DGS因其渲染速度快和高质量的新视角合成而备受关注。一些研究人员尝试将3DGS应用于驾驶场景的重建。然而&#xff0c;这些方法通常依赖于多种数据类型&#xff0c;如深度图、3D框和移动物体的轨迹。此外&#xff0c;合成图像缺乏标注也限制了其在下游任务中的…

npm install --global windows-build-tools --save 失败

注意以下点 为啥下载windows-build-tools&#xff0c;是因为node-sass4.14.1 一直下载不成功&#xff0c;提示python2 没有安装&#xff0c;最终要安装这个&#xff0c;但是安装这个又失败&#xff0c;主要有以下几个要注意的 1、node 版本 14.21.3 不能太高 2、管理员运行 …

Beamer-LaTeX学习(教程批注版)【1】

该文档总体由beamer-latex的教程而来&#xff0c;由耳东小白以自身学习路径整理。因其中要点基本按照教程的顺序和结构整理&#xff0c;故而不能称之为完全原创&#xff0c;但也不是翻译&#xff0c;更不是抄袭&#xff0c;是个人自学笔记和批注&#xff0c;其中添加了小白个人…

wx005基于springboot+vue+uniapp的大学生心理健康测评管理系统小程序

开发语言&#xff1a;Java框架&#xff1a;springbootuniappJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#…

SpringBoot整合springmvc、扩展springmvc

目录 一、 SpringMVC三大组件二、 Spring MVC 组件的自动管理2.1 中央转发器&#xff08;DispatcherServlet&#xff09;2.2 控制器2.3 视图解析器自动管理2.4 静态资源访问2.5 消息转换和格式化2.6 欢迎页面的自动配置 三、Springboot扩展springmvc3.1 视图控制器注册&#xf…

STM32使用UART发送字符串与printf输出重定向

首先我们先看STM32F103C8T6的电路图 由图可知&#xff0c;其PA9和PA10引脚分别为UART的TX和RX(注意&#xff1a;这个电路图是错误的&#xff0c;应该是PA9是X而PA9是RX&#xff0c;我们看下图的官方文件可以看出)&#xff0c;那么接下来我们应该找到该引脚的定义是什么&#xf…

力扣28找出字符串中第一个匹配项的下标

class Solution:def strStr(self, haystack: str, needle: str) -> int:# 特殊情况处理if not needle:return 0# 获取 haystack 和 needle 的长度a len(needle)b len(haystack)# 遍历 haystack&#xff0c;检查每个子字符串是否与 needle 匹配for i in range(b - a 1):if…

8、RAG论文笔记(Retrieval-Augmented Generation检索增强生成)

RAG论文笔记 1、 **研究背景与动机**2、方法概述3、RAG 模型架构3.1总体架构3.2 Generator&#xff08;生成器&#xff09;3.3 检索器&#xff08;Retriever&#xff09;3.4训练&#xff08;Training&#xff09;3.5**解码方法**&#xff08;求近似 &#xff09;3.6微调的参数 …

PCA降维算法详细推导

关于一个小小的PCA的推导 文章目录 关于一个小小的PCA的推导1 谱分解 (spectral decomposition)2 奇异矩阵(singular matrix)3 酉相似(unitary similarity)4 酉矩阵5 共轭变换6 酉等价7 矩阵的迹的计算以及PCA算法推导8 幂等矩阵(idempotent matrix)9 Von Neumanns 迹不等式 [w…

Android studio 旧版本下载,NDK旧版本下载

记录一下旧版的ndk 和 Android studio 官方下载备份。 1.NDK 旧版本下载地址 下载地址&#xff1a;https://github.com/android/ndk/wiki/Unsupported-Downloads 2.Android studio 旧版本下载 下载地址 https://developer.android.com/studio/archive 如果出现以下页面 点击…

开源存储详解-分布式存储与ceph

ceph体系结构 rados&#xff1a;reliable, autonomous, distributed object storage, rados rados采用c开发 对象存储 ceph严格意义讲只提供对象存储能力&#xff0c;ceph的块存储能力实际是基于对象存储库librados的rbd 对象存储特点 对象存储采用put/get/delete&#xf…

Midjourney Imagine API 使用

Midjourney Imagine API 申请及使用 Midjourney 是一款非常强大的 AI 绘图工具&#xff0c;只要输入关键字&#xff0c;就能在短短一两分钟生成十分精美的图像。Midjourney 以其出色的绘图能力在业界独树一帜&#xff0c;如今&#xff0c;Midjourney 早已在各个行业和领域广泛…

docker从下载到Python项目打包到容器中运行(解决下拉超时问题)

docker安装&#xff08;如果第一步或者第二步没有成功&#xff0c;说明是你的镜像源有问题&#xff0c;私聊我获取镜像源&#xff09;镜像位置_/etc/yum.repos.d/CentOS-Base.repo sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/dock…

运算指令(PLC)

加 ADD 减 SUB 乘 MUL 除 DIV 浮点运算 整数运算

Linux高级--3.2.5 “外挂式”死锁监测设计

一、生活中“死锁”的场景 三个人&#xff0c;甲乙丙&#xff0c; 甲借了丙的钱&#xff0c;丙借了乙的钱&#xff0c;乙借了甲的钱。 甲找乙还钱&#xff0c;乙说&#xff1a;“别人还我 我就还你 ”&#xff0c;甲说&#xff1a;“好&#xff0c;那我等你” 乙找丙还钱&am…

图像去雾 | 基于Matlab的图像去雾系统(四种方法)

图像去雾 | 基于Matlab的图像去雾系统&#xff08;四种方法&#xff09; 目录 图像去雾 | 基于Matlab的图像去雾系统&#xff08;四种方法&#xff09;效果一览基本介绍程序设计参考资料 效果一览 基本介绍 基于Matlab的图像去雾系统&#xff08;四种方法&#xff09; 关于图像…

解决Vue中设置el-select的高度不生效问题

el-select是Element UI框架中的一个选择器组件&#xff0c;它允许用户从多个选项中选择一个或多个项目。但这里确存在一个小坑&#xff0c;我们可以看到直接修改el-select的高度是无法生效的 <template><div id"login"><el-select v-model"role…

嵌入式驱动开发详解8(阻塞/非阻塞/异步通信)

文章目录 前言阻塞非阻塞异步通知后续 前言 首先来回顾一下“中断”&#xff0c;中断是处理器提供的一种异步机制&#xff0c;我们配置好中断以后就 可以让处理器去处理其他的事情了&#xff0c;当中断发生以后会触发我们事先设置好的中断服务函数&#xff0c; 在中断服务函数…

人工智能之数学基础:向量内积以及应用

本文重点 向量的点积(Dot Product),又称数量积(Scalar Product)或内积,是线性代数中的一个重要概念。它接受两个向量作为输入,并返回一个实数作为输出。点积不仅在数学和物理学中有广泛应用,而且在人工智能领域也扮演着重要角色。 内积 在数学上,向量x和向量y的内积…