说明
平时偶尔需要进行重复性的对文件进行重命名、格式转化等。假设以文件复制功能作为目标,设计一个小程序使用多线程对文件进行批量复制。(其实以后主要目标是针对Realsense的raw文件进行批量的转化,并借助多线程加速)
代码
import threading
import os
import time
# 多线程类
# 更改__init__和run供自己使用
class FileProcess(threading.Thread):
process_nums: int = 0 #外部查看进度使用
all_nums: int = 0 #记录总数量
source_path = ""
target_path = ""
files = []
def __init__(self, source_path: str, files: list, target_path: str = None):
super().__init__()
self.all_nums = len(files)
self.files = files
self.source_path = source_path
self.target_path = target_path
def run(self) -> None:
import shutil
while self.process_nums < self.get_all_nums():
if self.target_path is None:
pass
time.sleep(0.1)
else:
shutil.copyfile(
os.path.join(self.source_path, self.files[self.process_nums]),
os.path.join(self.target_path, self.files[self.process_nums]),
)
self.process_nums += 1
def get_process_nums(self):
return self.process_nums
def get_all_nums(self):
return self.all_nums
if __name__ == "__main__":
source_path = "D:\\SharedFolders\\DataSets\\office_room_traj0_loop\\rgb"
target_path = ""
# 获取需要批量处理的目录中的文件
source_file = os.listdir(source_path)
# 根据线程数量,计算每个线程负责处理文件数量
thread_nums = 3
nums_step = int(len(source_file) / 3)
# 初始化线程
thread_list = []
process_list_nums = []
for i in range(thread_nums):
if i == 0:
thread_list.append(
FileProcess(source_path=source_path, files=source_file[:nums_step])
)
process_list_nums.append(len(source_file[:nums_step]))
elif i == thread_nums - 1:
thread_list.append(
FileProcess(source_path=source_path, files=source_file[nums_step * i :])
)
process_list_nums.append(len(source_file[nums_step * i :]))
else:
thread_list.append(
FileProcess(
source_path=source_path,
files=source_file[nums_step * i : nums_step * i + nums_step],
)
)
process_list_nums.append(
len(source_file[nums_step * i : nums_step * i + nums_step])
)
# 启动线程
for i in range(thread_nums):
thread_list[i].start()
print("[status]:start")
print("\n" * thread_nums, end="")
while True:
all_end = 0
process_nums = []
time.sleep(0.5)
for i in range(thread_nums):
if thread_list[i].is_alive():
process_nums.append(thread_list[i].get_process_nums())
all_end += 1
else:
process_nums.append(process_list_nums[i])
# 进度显示部分
print("\033[{}F".format(thread_nums), end="") # 光标移动到上面n行开头部位
print("\033[0J", end="") # 从光标位置开始删除内容,一直到结尾
for i in range(thread_nums):
print(
"[status]:thread {}:{}/{} {:.2f}%".format(
i + 1,
process_nums[i],
process_list_nums[i],
process_nums[i] * 100 / process_list_nums[i],
)
)
# 所有线程结束则退出
if all_end == 0:
break
print("[status]:success")