一、【写在前面】
最近csdn每天写两篇文章有推广券,趁这个机会写一个python相关的文章吧。
一般我们的任务都可以分为计算密集型任务和IO密集型任务。
python因为全局GIL锁的存在,任何时候只有一个python线程在运行,所以说不能利用多核CPU的优势,这意味着python做计算密集型任务不是很合适。如果提升多核能力,使用multiprocess库或者其他语言写的库
但是在IO密集型任务中,在对速度和效率要求比较高的场景中,我们可以将python的多线程功能用起来。但是多线程必定bug多,这个是不可避免的,所以笔者也只敢在操作影响不大的地方写多线程
这篇文章写一个尽可能通用的python多线程非阻塞的代码示例
二、【名词解释】
1. 多线程
一个进程中可以有多个线程,好比饭店如果只有一个人,炒菜端菜都是一个人干,这就是单线程;
一个饭店有人切菜有人炒菜有人端菜,这就是三线程。
2. 线程之间的通信
而切菜的人把菜丢给炒菜的人,炒菜的人做好丢给端菜的人,这个过程叫做线程之间的通信
3. 线程阻塞
如果切菜的人是个浑浊懵愣的货,切完一个菜之后,厨师没有将菜收走,案板满了。这个切菜的人不愿意干活了,这就是阻塞。
4. 线程池
饭店老板非常有钱,每次有新的工作都会重新招一个人来干,这就叫创建新的线程。
饭店老板如果比较精明,有了新活都抓饭店里闲着的人干,而不招新的人,这就叫线程池。
5. 锁
饭店里只有一个卫生间,如果大家一起上卫生间会有人害羞而尿不出来。所以在员工上卫生间的时候需要加锁。程序中一般是在访问公共变量的时候需要加锁。
6. 线程导致的主程序退出
切菜的师傅切了个炸弹,然后厨师没有确认直接开炒,整个饭店都炸掉了。一般来说线程崩溃就崩溃了,但是如果主程序的某个流程是依赖线程结果的,那么线程崩溃就会把主程序带崩。
三、【代码编写】
这段代码是针对多台机器,每台机器的任务类似的一个多线程的主函数。照着这个往里塞东西就行
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def func1():
pass
def func2():
pass
def func3():
pass
if __name__ == "__main__":
# init global vars
vars_dict = {}
futures = {}
# init locker 线程共用的变量要加锁否则会有奇奇怪怪的问题
vars_dict_lock = threading.Lock()
futures_lock = threading.Lock()
# machine list 考虑有机器无法连接的情况
Machine_List = ['machine1','machine2','machine3','cant_connected_machine']
# prometheus metrix init 针对每个机器都有的变量,避免冗余代码可以这样初始化
for host in Machine_List:
## trading flag monitor below
vars_dict[f'var1_{host}'] = 'var1'
vars_dict[f'var2_{host}']= 'var2'
## end trading flag mon
# 定义任务函数映射
task_functions = {
'describe for func1': func1,
'describe for func2': func2,
'describe for func3':func3
}
# multi thread task 线程池 10线程
with ThreadPoolExecutor(max_workers=10) as executor:
while True:
with futures_lock: # 每次操作公共变量需要操作锁
for host in Machine_List:
# 分别为每个任务创建一个唯一的键
for task_name, task_func in task_functions.items():
task_key = f"{task_name}_{host}"
# 如果线程没有创建或没有完成,防止没跑完就起新的
if task_key not in futures or futures[task_key].done():
futures[task_key] = executor.submit(task_func, host)
time.sleep(30)