之前的下载 M3U8程序,有很多问题, 为此做了一些升级,分享给大家。
- 增加了存在播放列表的情况处理
- 播放列表路径和ts路径错误问题
- 多线程问题
- 对于电视剧多文件下载的处理
这里从网上找了一部的链接,可以参考这个网站https://www.zuida001.com/
import os
import urllib3
import requests
import subprocess
import m3u8
from urllib import parse
from tenacity import retry, wait_random, stop_after_attempt
import gevent
from gevent.threadpool import ThreadPool
urllib3.disable_warnings()
class M3u8Downloader:
def __init__(self, pool_size=10):
self.pool = ThreadPool(pool_size)
@retry(stop=stop_after_attempt(3), wait=wait_random(2, 5))
def request(self, url):
"""发送请求"""
try:
res = requests.get(url, verify=False, timeout=5)
return res
except Exception as e:
print(url, e)
raise e
def download_segment(self, url, file):
"""下载ts文件"""
if os.path.exists(file):
return
res = self.request(url)
with open(file, "wb")as f:
f.write(res.content)
def convert_mp4(self, path, output, key):
if not os.path.exists(output):
# 使用FFmpeg将所有.ts文件合并为一个MP4文件 ffmpeg -allowed_extensions ALL -i index.m3u8 -c copy xxx.mp4
if key:
subprocess.call(['ffmpeg', '-allowed_extensions', 'ALL', '-i', 'local.m3u8', '-c', 'copy', output], cwd=path)
else:
subprocess.call(['ffmpeg', '-i', 'local.m3u8', '-c', 'copy', output], cwd=path)
def download_m3u8(self, url, path):
"""下载M3U8文件,有些存在播放列表,默认选择第一个"""
m3u8_file_name = os.path.join(path, "index.m3u8")
res = self.request(url)
with open(m3u8_file_name, "w", encoding="utf-8")as f:
f.write(res.text)
# 解析M3U8文件
m3u8_obj = m3u8.loads(res.text)
# 如果存在清晰度列表,请求解析清晰度列表
m3u8_playlist = []
for playlist in m3u8_obj.playlists:
uri = parse.urljoin(url, playlist.uri)
bandwidth = playlist.stream_info.bandwidth
resolution= playlist.stream_info.resolution
m3u8_file_name = os.path.join(path, f"{'x'.join([str(i) for i in resolution])}_{bandwidth}.m3u8")
res = self.request(uri)
with open(m3u8_file_name, "w", encoding="utf-8")as f:
f.write(res.text)
cur_m3u8 = m3u8.loads(res.text)
cur_m3u8.uri = uri
m3u8_playlist.append(cur_m3u8)
if m3u8_playlist:
# 播放列表默认选择第一个
return m3u8_playlist[0]
else:
return m3u8_obj
def download(self, url, path, output):
"""
下载单个m3u8主程序
url: m3u8链接url
path: 单个m3u8目录
output: 转换输出文件路径名
"""
# 创建目录
segment_dir = os.path.join(path, "index")
if not os.path.exists(segment_dir):
os.makedirs(segment_dir)
output_dir = os.path.dirname(output)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 解析M3U8文件
m3u8_obj = self.download_m3u8(url, path)
# 下载key文件
for item in m3u8_obj.keys:
if item:
key_url = parse.urljoin(url, item.uri)
key_file_name = os.path.join(path, key_url.split("/")[-1])
res = self.request(key_url)
with open(key_file_name, "w", encoding="utf-8")as f:
f.write(res.text)
segments = []
# 生成新的本地M3U8文件内容
for index, segment in enumerate(m3u8_obj.segments):
uri = parse.urljoin(m3u8_obj.uri, segment.uri)
segments.append(uri)
# 有些ts文件名过长,对其以序号重新命名
segment.uri = f"index/{index}.{uri.split('.')[-1]}"
# 保存M3U8文件
local_file_name = os.path.join(path, "local.m3u8")
with open(local_file_name, 'w') as f:
f.write(m3u8_obj.dumps())
# 下载ts文件
for index, url in enumerate(segments):
file = os.path.join(segment_dir, f"{index}.{url.split('.')[-1]}")
self.pool.spawn(self.download_segment, url, file)
gevent.wait()
# ts文件下载完成, 转换成mp4文件
if len(segments) == len(os.listdir(segment_dir)):
self.convert_mp4(path, output, key=[item.uri for item in m3u8_obj.keys if item])
if __name__ == "__main__":
cur_path = os.path.abspath(os.path.dirname(__file__))
data = [
{"name": "凶劫601航班第01集", "url": "https://v4.mstopq.com/202404/11/4cSTt8dMgB7/video/index.m3u8"},
{"name": "凶劫601航班第02集", "url": "https://v4.mstopq.com/202404/11/JtNttP8HfS7/video/index.m3u8"},
{"name": "凶劫601航班第03集", "url": "https://v4.mstopq.com/202404/11/b10TUD8C4T7/video/index.m3u8"},
{"name": "凶劫601航班第04集", "url": "https://v4.mstopq.com/202404/11/eWLuKWqFBL7/video/index.m3u8"},
{"name": "凶劫601航班第05集", "url": "https://v4.mstopq.com/202404/11/Sj3pwtX0hN7/video/index.m3u8"},
{"name": "凶劫601航班第06集", "url": "https://v4.mstopq.com/202404/11/i62BN8wMfc7/video/index.m3u8"},
]
for item in data:
url = item["url"]
name = item["name"]
path = os.path.join(cur_path, "凶劫601航班_tmp", f"{name}")
output = os.path.join(cur_path, "凶劫601航班", f"{name}.mp4")
# 因为是多线程下载,可能存在某个线程下载失败的情况, 如果发现下载不完整,没有输出文件,可以尝试重新运行,已经下载过的不会再次下载。
download = M3u8Downloader(pool_size=20)
download.download(url, path, output)
查看原文:一键下载 M3U8 并转换为 MP4升级版
关注公众号 "字节航海家" 及时获取最新内容