multiprocessing的三种模式
- fork,【拷贝几乎所有资源】【支持文件对象/线程锁等传参】【unix】【任意位置开始】【快】
- spawn,【run参数传参必备资源】【不支持文件对象/线程锁等传参】【unix、win】【main代码块开始】【慢】
- forkserver,【run参数传必备资源】【不支持文件对象/线程锁传参】【部分unix】【main代码块开始】
import multiprocessing
multiprocessing.set_start_method('spawn')
官方文档https://docs.python.org/zh-cn/3/library/multiprocessing.html
进程锁
-
Lock类与RLock类相同:由于进程之间随机调度:某进程可能执行n条后,CPU接着执行其他进程。为了多个进程同时操作一个内存中的资源时不产生混乱,我们使用锁。
-
Lock类与RLock类的区别:无论是Lock还是RLock,提供的方法都非常简单,acquire和release。但是Lock和RLock的区别是什么呢?RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
def work(filename, max_count):
for n in range(max_count):
f = open(filename, "r")
try:
nbr = int(f.read())
except ValueError as err:
print ("File is empty, starting to count from 0, error: " + str(err))
nbr = 0
f = open(filename, "w")
f.write(str(nbr + 1) + '\n')
f.close()
if __name__ == '__main__':
work('demo.txt', 5)
# 输出
File is empty, starting to count from 0, error: invalid literal for int() with base 10: ''
Process finished with exit code 0
工作函数被调用了 5 次,正如所期望的那样,它计数正确,没有损失任何数据。在第一次读取时,见到了一个空文件。这会为 int()抛出 invalid literal for int()的
错误(因为在一个空字符串上调用了 int())。这个错误只发生了一次,之后,我们总是会有一个合法的值用于读取并把它转变成一个整数。
def run_one_work(filename, lock):
lock.acquire()
f = open(filename, "r")
try:
nbr = int(f.read())
except ValueError as err:
print("File is empty, starting to count from 0, error: " + str(err))
nbr = 0
f = open(filename, "w")
f.write(str(nbr + 1) + '\n')
f.close()
lock.release()
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
lock = multiprocessing.Lock()
file_name = 'demo.txt'
for i in range(5):
p = multiprocessing.Process(target=run_one_work, args=(file_name,lock,))
p.start()
p.join()
time.sleep(7)
# spawn模式需要特殊处理
process_list = []
for i in range(5):
multiprocessing.Process(target=run_one_work, args=(file_name,lock,))
p.start()
process_list.append(p)
for p in process_list:
p.join()
锁实例程序:
from multiprocessing import Process
from multiprocessing import Lock
import time
import json
def show_ticket(i):
time.sleep(0.1)
with open('ticket') as f:
dic = json.load(f)
print('余票: %s' %dic.get('ticket'))
def buy_ticket(i, lock):
lock.acquire() #加锁
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)
if dic.get('ticket') >0:
dic['ticket'] -= 1
print('\033[32m%s买到票了\033[0m' %i)
else:
print('\033[31m%s没买到票\033[0m' %i)
time.sleep(0.1)
with open('ticket', 'w')as f:
json.dump(dic, f)
lock.release() #释放锁
if __name__ == '__main__':
for i in range(10):
p = Process(target=show_ticket, args=(i,) )
p.start()
lock = Lock()
for i in range(10):
p1 = Process(target=buy_ticket, args=(i,lock))
p1.start()