并发
什么是并发?并发,在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。
嗯,字认识,但是连在一起就有点够呛,哈哈,开个玩笑。
我们通过几个例子来较为深刻的理解一下:
第一个例子
我们用requests 成功请求一个网页,实际上requests做了三件事:
- 根据链接、参数登组合成一个请求
- 把这个请求发往要爬取的网站,等待网站响应
- 网站响应后,把结果包装成一个响应对象方便我们使用
从上面的图中我们可以看出,步骤2花费的时间是最长的,取决于被爬虫网站的性能,这个时间可能达到几十到几百毫秒。相比之下,步骤1、3可能只需要1毫秒左右的时间,比上面这张图的对比还要夸张。
针对这个程序:步骤2也可以代表程序是空闲的,因为在等待网站的响应,因此代码真正运行的时间很短。
第二个例子
我们连续用requests请求三个网页A、B、C,执行效果如下
这个相当于把三个请求串行起来执行,他们之间是互相依赖的,A执行完B执行然后C执行,时间上肯定是叠加的。
第三个例子
第一个例子中,顺序必须是1-2-3,因为他们之间是强依赖,但是在第二个例子中,步骤为什么必须是A1-A2-A3-B1-B2-B3-C1-C2-C3呢?B1和A3之间是没有依赖关系的。这个时候我们的并发出现了,步骤如下:
这张图是什么意思呢?其实就是:在「爬取网页 A」这个过程进行到步骤 2 的时候,程序空闲下来了,这时我们让「爬取网页 B」的步骤 1 开始执行;同样的,「爬取网页 B」的步骤 1 执行完,程序又空闲下来,于是我们安排「爬取网页 C」开始执行。
可以看到,仅仅是利用爬虫等待的时间,爬虫的效率就提升了数倍,当爬取的数据更大的时候,爬虫的效率是不是更加的显著。
并发和多线程
并发的结果看起来确实很好,但是前面例子三的步骤看起来很复杂,我们实际编写代码的时候难道要考虑计算机将事情 A 做得怎么样了,有没有空闲?如果空闲就去做事情 B?想想都头大。
这个时候多线程就派上用场了,这个可是操作系统赋予的最强能力之一。
操作系统提供了两个东西:进程和线程,利用他们两个我们可以轻易的实现并发,而不用去考虑上面头大的问题。
我们来看两个代码
import time
import requests
# 假设我们要爬取 30 个网页
urls = ["https://wpblog.x0y1.com/?p=34"] * 30
session = requests.Session()
start = time.time()
results = []
for url in urls:
r = session.get(url)
results.append(r.text)
end = time.time()
print("花费", end-start, "秒")
然后我们把这个代码用多线程形式改写一下
import time
import requests
from concurrent import futures
# 假设我们要爬取 30 个网页
urls = ["https://wpblog.x0y1.com/?p=34"] * 30
executor = futures.ThreadPoolExecutor(max_workers=5)
session = requests.Session()
start = time.time()
fs = []
for url in urls:
f = executor.submit(session.get, url)
fs.append(f)
futures.wait(fs)
result = [f.result().text for f in fs]
end = time.time()
print("花费", end-start, "秒")
大家可以在自己电脑上运行下,下面的运行时间比上面的运行时间缩短好几倍。
代码详解
针对上面改写的代码,我们做个详细的分析解读:
初始化一个线程池
# 导入 concurrent.futures 这个包
from concurrent import futures
# 初始化一个线程池,最大的同时任务数是 5
executor = futures.ThreadPoolExecutor(max_workers=5)
concurrent是python自带的库,这库具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程、线程同步等功能。
线程 池限制了最多同时运行的线程数。比如说我们初始化一个最大任务数为5的线程池,这样使我们提交了100个任务到这个池子里,同时运行的也只有5个,因此代码中max_workers=5的作用就是这个。
提交任务到线程池
fs = []
for url in urls:
# 提交任务到线程池
f = executor.submit(session.get, url)
fs.append(f)
executor是我们刚刚初始化的线程池,调用了executor的submit()方法往里面提交任务。第一个参数session.get是提交要运行的函数,第二个url是提交的函数运行时的参数。
executor.submit()方法会返回一个返回值,其是一个future对象,我们把他赋值给变量f。
future对象是什么
future 这个单词的原意是 未来。在并发编程的领域,future 对象这个东西通常保存着函数调用完成时的结果。
我们结合实例再试着理解一遍。
比如在上面我们告诉线程池,要调用 session.get
方法,参数为 url
。如果线程池还没满,程序就启动一个线程开始执行它;如果线程池满了,就等待有任务完成被挪出线程池,再把这个任务放到那个线程上运行。
但是我们不知道 session.get(url)
在多久之后被完成,那我们要的结果保存在什么地方呢?答案就是 future 对象。如果某一个任务已经完成,那么通过这个任务被提交时返回给我们的 future 对象,就可以拿到这个任务的结果。
等待代码全部完成
# 等待这些任务全部完成
futures.wait(fs)
fs 是保存了上面所有任务的 future 对象的列表,futures.wait()
方法可以等待直到 fs 里面所有的 future 对象都有结果为止。
获取所有任务的结果
# 获取任务的结果
result = [f.result().text for f in fs]
fs是保存了上面所有任务的future对象的列表,我们遍历所有任务的future对象,调用future对象的result()方法,就能得到任务的结果。