大家好,我是烤鸭:
最近在尝试做视频的质量分析,打算利用asr针对声音判断是否有人声,以及识别出来的文本进行进一步操作。asr看了几个开源的,最终选择了openai的whisper,后来发现性能不行,又换了whisperX。这是一篇实战和代码为主的文章。
引言
OpenAI的Whisper是一款强大的自动语音识别(ASR)模型,它支持多语种识别,包括中文,且经过大量的多语言和多任务监督数据训练,具有出色的鲁棒性和准确性。Python作为一种功能强大的编程语言,其丰富的库和简洁的语法使其成为实现语音识别功能的理想选择。本文将介绍如何利用Python集成Whisper,实现高效的语音识别。
目前一天小千的视频调用,平均时长3分钟。显卡是4090,平均识别耗时30s以内,业务无压力。
Whisper模型简介
Whisper是一个开源的语音识别模型,它基于Transformer架构,通过从网络上收集的680,000小时多语言数据进行训练,能够实现对多种语言的准确识别。此外,该模型对口音、背景噪音和技术语言具有很好的鲁棒性,使得其在实际应用中具有广泛的应用前景。
WhisperX 地址:
https://github.com/m-bain/whisperX
安装环境
linux
显卡是 4090
cuda pytorch
ffmpeg
python 需要的依赖
pip install --no-cache-dir flask -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir ffmpeg-python -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir wheel -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir zhconv -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir numpy -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir openai-whisper -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir kafka-python -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir fastapi -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir uvicorn -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir psutil -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir gputil -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir requests -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir use-nacos -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir pyyaml -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir rocketmq-client-python -i https://mirrors.aliyun.com/pypi/simple
预期的功能
我想实现的是单台机器性能打满,并行识别asr,接口可以无限制接收请求,异步返回结果。
接口层
使用的是 fastapi 框架
import concurrent.futures
import os
import time
import ffmpeg
import platform
import uvicorn
import asyncio
import psutil
from fastapi import FastAPI, BackgroundTasks, HTTPException, status, Query
from fastapi.responses import JSONResponse
import GPUtil
import requests
import json
from dict_time import TimedMap
from parse_video_param import VideoRequest
from parse_video_callback_param import VideoCallbackRequest
from api_result import ApiResult
from whisper_processor import video_process
from whisperX_processor20241119 import video_process_whisperX
from logging_config import KAFKA_LOGGER
from nacos_config20241119 import register_nacos
app = FastAPI()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) # 线程池
# 定义CPU使用率阈值
threshold_cpu_usage = 95 # 例如,你希望CPU使用率不超过95%
threshold_gpu_usage_MB = 2400 # 例如,你希望显存使用大小 MB
timed_map = TimedMap()
@app.post("/xxxx-video/whisperx")
async def parse_video(request: VideoRequest, background_tasks: BackgroundTasks):
if not request or not request.path:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No video URL provided")
print(f"parse_video, params:{request}")
# 将处理任务添加到后台任务中,以便不阻塞主线程
background_tasks.add_task(process_video_whisperx, request, background_tasks)
# 立即返回处理中响应,告诉客户端请求已经被接收并正在处理
api_result = ApiResult(1, "success", "", "")
return JSONResponse(api_result.to_dict(), status_code=status.HTTP_200_OK)
# 异步函数来下载和处理视频
async def process_video_whisperx(request: VideoRequest, background_tasks: BackgroundTasks):
def sync_process_video_whisperx(request):
text = ''
try:
# 记录方法耗时
start_time_single = time.time()
# 下载视频并保存到临时文件
url = request.path
chunk_size = request.chunk_size
# 如果当前cpu使用率超过80%,就把该数据重新加到任务里
# 获取当前CPU使用率
cpu_usage = psutil.cpu_percent(interval=1, percpu=False)
print(f"当前cpu利用率:{cpu_usage}")
KAFKA_LOGGER.info(f"当前cpu利用率:{cpu_usage}")
# 获取所有GPU的信息
gpus = GPUtil.getGPUs()
isGpuSuffiencent = True
# 判断CPU使用率是否达到阈值
if cpu_usage <= threshold_cpu_usage or isGpuSuffiencent:
# 解析音频地址
wavPath = getWav(url)
print(f"mp3 url={wavPath}")
# 不存在再去生成
# 异步处理方法,解析音频这块可以忽略,也可以直接用视频地址
if(not os.path.exists(wavPath)):
(
ffmpeg
.input(url)
.output(wavPath, acodec='mp3')
.global_args('-loglevel', 'quiet')
.run()
)
# 使用whisper处理音频
text = process_audio_with_whisperx(wavPath, chunk_size)
end_time_single = time.time()
# 创建任务并添加到事件循环中,通知业务方
asyncio.run(callback_task(request, text))
print(f"视频地址:{url}, 函数执行耗时: {end_time_single - start_time_single}秒")
KAFKA_LOGGER.info(f"视频地址:{url}, 函数执行耗时: {end_time_single - start_time_single}秒")
# 清理临时文件
os.remove(wavPath)
else:
print(f"当前cpu已超限,该视频重新加入队列:{url}")
KAFKA_LOGGER.info(f"当前cpu已超限,该视频重新加入队列:{url}")
# 暂停5秒
time.sleep(5)
# 重新加到队列里
# 将处理任务添加到后台任务中,以便不阻塞主线程
background_tasks.add_task(process_video_whisperx, request, background_tasks)
except Exception as ex:
print(f"sync_process_video error: {str(ex)}")
KAFKA_LOGGER.error(f"sync_process_video error: {ex}")
return text
loop = asyncio.get_running_loop()
# 使用线程池运行同步函数,避免阻塞异步事件循环
return await loop.run_in_executor(executor, sync_process_video_whisperx, request)
# 获取文件路径
def getWav(input_video):
try:
# 判断系统是windows还是linux
operating_system = platform.system()
# 判断操作系统类型
if operating_system == 'Windows':
print("当前系统是Windows")
audio_path = "C:\\Users\\xxx\\Downloads\\"
else :
audio_path = "/tmp/"
# 从原始路径中获取文件名
filename = os.path.basename(input_video)
# 生成新文件的完整路径
filename_without_extension = os.path.splitext(filename)[0]
# 使用ffmpeg-python提取音频
new_filename = os.path.join(audio_path, filename_without_extension) + ".mp3"
except Exception as ex1:
print("getWav ex:", str(ex1))
return new_filename
# 音频解析
def process_audio_with_whisperx(audio_file_path: str, chunk_size: int) -> str:
text = video_process_whisperX(audio_file_path, chunk_size)
return text
# 异步回调
async def callback_task(request: VideoRequest, text: str):
# 创建任务并添加到事件循环中
task = asyncio.create_task(callback(request, text))
# 等待任务完成
await task
# 回调请求方法
async def callback(request: VideoRequest, text: str):
# 目标URL
url = request.callback_url
# JSON格式的参数
data = {
'id': request.id,
'text': text,
# 添加更多键值对...
}
# 设置一些键值对
timed_map.set(request.path, json.dumps(data), timeout=1800)
# 设置请求头,告诉服务器我们发送的是JSON数据
headers = {'Content-Type': 'application/json'}
# 设置超时时间,这里设置为5秒
timeout = 5.0
# 发送POST请求
response = requests.post(url, data=json.dumps(data), headers=headers, timeout=timeout)
print(f"url:{url},data: {json.dumps(data)},headers:{headers},response:{response}")
# 检查请求是否成功
if response.status_code == 200:
# 请求成功,处理响应内容
print("请求成功")
print(response.json()) # 如果响应内容是JSON格式,可以直接解析
else:
# 请求失败,打印错误信息
print(f"请求失败,状态码:{response.status_code}")
print(response.text) # 打印响应的文本内容
# 启动应用
if __name__ == "__main__":
register_nacos()
uvicorn.run(app, host="0.0.0.0", port=5000)
whisperX
import whisperx
from whisperx.asr import FasterWhisperPipeline
import time
import torch
import gc
import os
ENV = os.environ.get('ENV', 'development')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if ENV == 'production':
batch_size = 16
compute_type = "float16"
model_name = "large-v2"
else:
# reduce if low on GPU mem
batch_size = 4
# compute_type = "float16" # change to "int8" if low on GPU mem (may reduce accuracy)
# change to "int8" if low on GPU mem (may reduce accuracy)
compute_type = "int8"
model_name = "medium"
class WhisperXProcessor:
fast_model: FasterWhisperPipeline
def loadModel(self):
# 1. Transcribe with original whisper (batched)
self.fast_model = whisperx.load_model("medium", device.type, compute_type=compute_type)
print("模型加载完成")
def asr(self, filePath: str, chunk_size: int):
print(f'asr start filePath:{filePath}')
start = time.time()
audio = whisperx.load_audio(filePath)
result = self.fast_model.transcribe(audio, batch_size=batch_size, chunk_size = chunk_size)
print(result)
end = time.time()
print('识别使用的时间:', end - start, 's')
torch.cuda.empty_cache()
gc.collect()
return result
def video_process_whisperX(audio_path, chunk_size):
app = WhisperXProcessor()
app.loadModel()
text = app.asr(audio_path, chunk_size)
return text
结果验证
发送请求
curl -XPOST 'http://localhost:5000/xxxx-video/whisperX' -H 'Content-Type: application/json' -d '{"id":1,"path":"https://vc16-bd1-pl-agv.autohome.com.cn/video-26/0A33363922E51BDE/2025-02-10/FC68CC971BB8B9A46F15C4841F4F2CE2-200-wm.mp4?key=F77E8D3251C4560FA47E36563A5D5668&time=1739187850","callback_url":"http://localhost:8088/xxx/demo/testParseVideo"}'
结果日志,2分钟的视频,大概用了60s。
文章参考
ASR强力模型「Whisper」:解密Whisper
Python实现语音识别(whisperX)