手把手写深度学习(0):专栏文章导航
前言:当我们训练自己的视频生成模型时,现在大部分基于扩散模型架构都差不多,关键点在数据上!视频数据的预处理远远比图像数据复杂,其中有一点是如果静态数据、运动程度低的数据加入到数据集中,会对模型的效果产生极大的破坏!这篇博客手把手教读者如何清洗掉这些不合格的数据。
目录
理论基础
什么是光流?
稠密光流
稀疏光流
转换成灰度图
缩小分辨率
计算光流平均得分
完整Demo
理论基础
主流的方案是用光流法衡量运动的程度,光流又分成稠密光流和稀疏光流。
什么是光流?
光流是图像序列中像素点随时间变化的运动矢量场。它是由于观察者、场景或照明条件的移动而引起的图像对象的表观运动的模式。在计算机视觉中,光流技术用于估计两个连续视频帧之间的像素运动。
稠密光流
稠密光流是指为图像中的每个像素点计算光流矢量。这意味着与稀疏光流(只计算图像中某些特征点的光流)相比,稠密光流提供了一幅完整的运动场景,其中包含了图像中所有像素的运动信息。
基于这些假设,可以使用多种算法来计算稠密光流,如:
- Lucas-Kanade方法:适用于小窗口内的稠密光流估计。
- Horn-Schunck算法:通过全局平滑性约束来解决运动估计问题。
- Farneback算法:使用多尺度方法来计算光流。
尽管稠密光流技术非常有用,但在实际应用中也面临着一些挑战:
- 计算复杂度:为每个像素点计算光流是计算密集型的。
- 遮挡处理:当一个物体被另一个物体遮挡时,光流的计算会变得复杂。
- 大运动:当运动超出了算法的小运动假设时,计算准确的光流变得困难。
- 光照变化:光照的变化可能违反亮度恒定性假设,影响光流的准确性。
稀疏光流
与稠密光流不同,稀疏光流仅关注图像中的某些特征点或兴趣点的运动,而不是图像中的每个像素点。这些特征点通常是图像中容易识别和跟踪的点,例如角点、边缘或明显的纹理。
稀疏光流的计算方法通常涉及以下步骤:
- 特征检测:首先在第一帧图像中检测出特征点。
- 特征匹配:然后在下一帧图像中找到这些特征点的对应位置。
- 运动估计:最后,通过比较两帧之间特征点的位置变化来计算光流矢量。
常见的稀疏光流算法包括:
- Lucas-Kanade方法:一种基于局部窗口的迭代最小二乘法。
- Shi-Tomasi角点检测:与Lucas-Kanade方法结合使用,用于特征点的选取。
- KLT跟踪器:基于Lucas-Kanade方法的特征跟踪算法。
转换成灰度图
在计算光流得分之前转换成灰度图,当然也可以不转换:
cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)
缩小分辨率
直接计算的话非常耗时!所以建议缩小分辨率后再计算!
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 确定最短边为16像素的新分辨率
if width < height:
new_width = 16
new_height = int((new_width / width) * height)
else:
new_height = 16
new_width = int((new_height / height) * width)
desired_fps = 2
skip_frames = int(round(fps / desired_fps))
frame_idx = 0
frame_list = []
while True:
ret, frame = cap.read()
if not ret:
break
# 检查当前帧是否是需要处理的帧
if frame_idx % skip_frames == 0:
# cv2.imwrite(f'./results/frame_{frame_idx}.jpg', frame)
# 缩小分辨率并转换成灰度图
resized_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
frame_list.append(cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY))
frame_idx += 1
cap.release()
cv2.destroyAllWindows()
if frame_list is []:
return 0.0
计算光流平均得分
# 计算稠密光流
flow_scores = [] # 存放每一对连续帧的光流得分
# 初始化前一帧的图像(灰度)
prev_gray =frame_list[0]
# 遍历frame_list计算光流
for i in range(1, len(frame_list)):
# 获取当前帧并转换为灰度
# curr_gray = cv2.cvtColor(frame_list[i], cv2.COLOR_BGR2GRAY)
curr_gray = frame_list[i]
# 计算当前帧和前一帧的光流
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
# 计算光流的大小(速度)和角度(方向)
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# 可以根据需要对光流进行处理,例如计算平均光流大小作为得分
flow_score = np.mean(magnitude)
flow_scores.append(flow_score)
# 更新前一帧的图像
prev_gray = curr_gray
# flow_scores现在包含了每一对连续帧的光流得分
print("flow_scores", flow_scores, "np.mean(flow_scores): ", np.mean(flow_scores))
完整Demo
import cv2
import os
import numpy as np
# 获取文件夹下的所有video
def get_all_videos(path):
video_files = [] # 用于存储所有的视频文件
for filename in os.listdir(path):
base, ext = os.path.splitext(filename)
if ext in ['.mp4', '.avi', '.flv', '.mkv', '.mov']: # 根据你需要的视频格式来修改
video_files.append(os.path.join(path, filename))
return video_files
def optical_flow(video_path) -> float:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 确定最短边为16像素的新分辨率
if width < height:
new_width = 16
new_height = int((new_width / width) * height)
else:
new_height = 16
new_width = int((new_height / height) * width)
desired_fps = 2
skip_frames = int(round(fps / desired_fps))
frame_idx = 0
frame_list = []
while True:
ret, frame = cap.read()
if not ret:
break
# 检查当前帧是否是需要处理的帧
if frame_idx % skip_frames == 0:
# cv2.imwrite(f'./results/frame_{frame_idx}.jpg', frame)
# 缩小分辨率并转换成灰度图
resized_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
frame_list.append(cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY))
frame_idx += 1
cap.release()
cv2.destroyAllWindows()
if len(frame_list) == 0:
return 0.0
# 计算稠密光流
flow_scores = [] # 存放每一对连续帧的光流得分
# 初始化前一帧的图像(灰度)
prev_gray =frame_list[0]
# 遍历frame_list计算光流
for i in range(1, len(frame_list)):
# 获取当前帧并转换为灰度
# curr_gray = cv2.cvtColor(frame_list[i], cv2.COLOR_BGR2GRAY)
curr_gray = frame_list[i]
# 计算当前帧和前一帧的光流
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
# 计算光流的大小(速度)和角度(方向)
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# 可以根据需要对光流进行处理,例如计算平均光流大小作为得分
flow_score = np.mean(magnitude)
flow_scores.append(flow_score)
# 更新前一帧的图像
prev_gray = curr_gray
# flow_scores现在包含了每一对连续帧的光流得分
print("flow_scores", flow_scores, "np.mean(flow_scores): ", np.mean(flow_scores))
if __name__ == "__main__":
video_files = get_all_videos("/mnt/dolphinfs/ssd_pool/docker/user/hadoop-mtcv/wangqiang121/base_model/PaddleOCR/webvid-1000/")
for video_path in video_files:
optical_flow(video_path)