准备工作:
1. 安装好python3 最低为3.6以上, 并成功运行pyhthon3 程序
2. 了解python 多进程原理
3. 了解 python HTTP 请求库 requests 的基本使用
4. 了解正则表达式的用法和python 中 re 库的基本使用
爬取目标
目标网站: https://ssr1.scrape.center/
一个静态网站
目标:
利用 requests 爬取这个站点每一页的电影列表, 顺着列表再爬取每个电影的详情页
利用正则表达式提取每部电影的名称, 封面, 类别, 上映时间, 评分, 剧情简介等内容
把以上爬取的内容存储为JSON文本文件
使用多进程实现爬取加速
爬取列表页
引入必要的库:
import requests
import logging
import re
from urllib.parse import urljoin
requests 库用来抓取页面, logging 库用来输出信息, re 库用来实现正则表达式解析,urljoin用来做URL拼接
设置一些基础变量:
logging.basicConfig(level=logging.INFO, format='%(asctime)s- %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE =10
接着定义了日志输出级别和输出格式, 以及BASE_URL 为当前站点的根URL
TOTAL_PAGE为需要爬取的总页码量
def scrape_page(url):
logging.info('scraping %s....', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid staus code %s while scraping %s', response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
考虑到不仅要爬取列表页面,也要爬取详情页, 所以这里我们定义了一个比较通用的爬取页面的方法, 叫做 scrape_page, 它接收一个参数 url , 返回页面的 HTML 代码,。 首先判断状态码是否是200, 如果是就直接返回页面的HTML 代码, 如果不是,则输出错误日志。 另外这里实现了 requests 的异常处理, 如果出现爬取异常, 就输出对应的错误信息。 我们将logging 中的 error 方法里的 exc_info 设置为 True 可以打印出 Traceback 错误堆栈信息
在 scrape_page 的基础上, 我们来定义列表页的爬取方法:
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
方法名: scrape_index 这个方法接收一个 page 参数, 即列表页的页码, 我们在方法里面实现页面URL 的拼接, 然后调用 scrape_page 的方法即可, 这样就就可以获取HTML代码了
下一步就是获取列表页, 并得到每部电影详情的URL
def parse_index(html):
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern, html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url
在这里定义了 parse_index 方法, 它接收一个参数 html , 即列表页的代码。
在这个方法里 , 我们首先 定义了一个提取标题超链接href 属性的正则表达式,
<a.*?href="(.*?)".*?class="name">
我们使用非贪婪通用匹配符 .*? 来匹配任意字符, 同时在 href 属性的引号之间使用了分组匹配 (.*?) 正则表达式, 这样我们就能在匹配结果中获取到 href 里面的值了。 正则表达式后紧跟着
class="name" 用来表示这个 <a> 节点 式代表电影名称的节点
然后使用 re 库中的 findall 来提取所有 href 中的值, 第一个参数是 pattern 对象, 第二个参数传入html , 这样findall 就能搜素所有 html中与该正则表达式相匹配的内容, 之后把匹配结果返回。 并赋值为 items 如果items 为空就直接返回空列表, 如果不为空 那么直接遍历处理即可,遍历items 得到的 item 就是类似 /detail/1 这样的结果, 由于这并不是一个完整的URL, 所以需要借助 urljoin 把 BASE_URL 和 href 拼接到一起, 获得详情页的完整 URL , 得到的结果就是类似 https://ssr1.scrape.conter/detail/1 这样完整URL, 最后 yield 返回即可
现在我们调用 parse_index 方法 往其中传入列表页的 HTML 代码, 就可以获取所有电影的详情页的 URL 了
def main():
for page in range(1, TOTAL_PAGE + 1):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
logging.info('detail urls %s', list(detail_urls))
if __name__ == '__main__':
main()
这里定义了 main 方法, 以完成所有上面方法的调用, main 方法中首先调用 range 方法遍历 了所有页码, 得到的 page 就是 1-10 接着把page 变量传给scrape_index 方法, 得到列表页 HTML 把得到的 HTML 赋值给 index_html , 接着将 index_html 传给 parse_index 方法, 得到列表页所有电影详情页的URL , 并赋值 给 detail_urls , 结果是一个生成器, 我们调用 list 方法就可将其输出
部分结果、
爬取详情页
封面: 是一个 img 节点 其class 属性为 cover
名称: 是一个 h2 节点, 其内容为电影名称
类别: 是 span 节点, 其内容是电影类别。 span节点外侧是 button 节点,再外侧是class为 categories的div 节点
上映时间: 是span节点, 其内容包含上映时间, 外侧是 class 为 info 的div 节点。 另外提取结果中还多了 "上映" 二字, 我们可以用正则表达式把 日期提取出来
评分: 是一个p 节点, 其内容为 电影评分, p 节点的class属性为 score
剧情简介: 是一个 p 节点, 其内容便是剧情简介。 其外侧式class 为 drama 的div 节点
我们前面已经获取了详情页的 URL , 下面定义一个详情页的爬取方法,
def scrape_detail(url):
return scrape_page(url)
这里定义了一个 scrape_detail 的方法, 接收一个 url 参数, 并通过 scape_page 方法获取详情页的HTML代码, 前面已经实现过了, 这里另外调用一次,一是这样会显得逻辑清晰,二是为了方便日后维护, 例如如果要输出错误日志, 增加预处理等, 都可以再这里实现
获取到详情页的代码后就是对详情页的解析了
def parse_detail(html):
"""
parse detail page
:param html: html of detail page
:return: data
"""# 这里是对每个内容的提取,做了正则化
cover_pattern = re.compile(
'class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
categories_pattern = re.compile(
'<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)# 这里将对应的正则带入页面代码, 提取出相应的内容
cover = re.search(cover_pattern, html).group(
1).strip() if re.search(cover_pattern, html) else None
name = re.search(name_pattern, html).group(
1).strip() if re.search(name_pattern, html) else None
categories = re.findall(categories_pattern, html) if re.findall(
categories_pattern, html) else []
published_at = re.search(published_at_pattern, html).group(
1) if re.search(published_at_pattern, html) else None
drama = re.search(drama_pattern, html).group(
1).strip() if re.search(drama_pattern, html) else None
score = float(re.search(score_pattern, html).group(1).strip()
) if re.search(score_pattern, html) else None# 将提取出的内容以字典形式返回
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
这里定义了一个 parsse-detail 的方法, 接收一个参数 html 解析其中的内容,并以字典形式返回结果
保存数据
这里将提取到的数据以 JSON的格式保存到文本
import json
from os import makedirs
from os.path import existsRESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
def save_data(data):
"""
save to json file
:param data:
:return:
"""
name = data.get('name')
data_path = f'{RESULTS_DIR}/{name}.json'
json.dump(data, open(data_path, 'w', encoding='utf-8'),
ensure_ascii=False, indent=2)
首先定义了保存数据的文件夹 RESULTS_DIR , 判断这个文件夹是否存在, 如果不存在则新建一个。
接着定义了保存数据的方法 save_data 其中首先是获取数据的name 字段,即电影的名称。将其作为JSON文件的名称, 然后构造JSON文件的路径 接着用JSON 的dump 方法将数据保存成文本格式,dump由两个参数,一个是 ensure_ascii 值为False, 可以保证中文字符在文件中能以正常的中文文本呈现, 而不是 unicode 字符, 另一个是 indent, 值为2, 设置了JSON数据的结果由两行缩进。
接下来改变一下 main 方法
def main():
for page in range(1, TOTAL_PAGE + 1):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info('get detail dat %s', data)
logging.info('saving data to json file')
save_data(data)
logging.info('data saved successfully')
if __name__ == '__main__':
main()
这里只是将前面输出的数据传入了保存数据的函数并没有多少改变
多进程加速
import multiprocessing
def main(page):
"""
main process
:return:
"""
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info('get detail data %s', data)
logging.info('saving data to json file')
save_data(data)
logging.info('data saved successfully')
if __name__ == '__main__':
pool = multiprocessing.Pool()
pages = range(1, TOTAL_PAGE + 1)
pool.map(main, pages)
pool.close()
这里首先改变了一下 main 方法,在里面添加了一个page 参数, 用以表示页的页码。接着声明了一个进程池, 并声明pages 为所需要遍历的页码, 即 1-10 最后调用map 方法, 其第一个参数就是需要被调用的参数, 第二个参数就是 pages 即需要遍历的页码
这样就会一次遍历 pages 中的内容, 把1-10 这10个页码分别传递给main方法,并把每次的调用分别变成一个进程,加入进程池中,进程池根据当前的运行环境来决定运行多少个进程。 例如 8 核的, 那么进程池的大小就会默认为 8 这样就有 8 个进程并行运作