观察翻页时详情页面url地址变化规律
import json
from os import makedirs
from os.path import exists
import requests
import logging
import re
from urllib.parse import urljoin
import multiprocessing
# 定义了下日志输出级别和输出格式
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
# 爬取列表页
def scrape_page(url):
"""
scrape page by url and return its html
:param url: page url
:return: html of page
"""
logging.info('scraping %s...', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid status code %s while scraping %s',
response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
# 爬取详情列表页
def scrape_index(page):
"""
scrape index page and return its html
:param page: page of index page
:return: html of index page
"""
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
# 解析详情列表页
def parse_index(html):
"""
parse index page and return detail url
:param html: html of index page
"""
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern, html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url
# 爬取详情页
def scrape_detail(url):
"""
scrape detail page and return its html
:param page: page of detail page
:return: html of detail page
"""
return scrape_page(url)
# 解析详情页正则匹配方法
def parse_detail(html):
"""
parse detail page
:param html: html of detail page
:return: data
"""
cover_pattern = re.compile(
'class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
categories_pattern = re.compile(
'<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
# 结果只有一个,用 search 方法提取即可
# 结果有多个,使用 findall 方法提取,结果是一个列表
cover = re.search(cover_pattern, html).group(
1).strip() if re.search(cover_pattern, html) else None
name = re.search(name_pattern, html).group(
1).strip() if re.search(name_pattern, html) else None
categories = re.findall(categories_pattern, html) if re.findall(
categories_pattern, html) else []
published_at = re.search(published_at_pattern, html).group(
1) if re.search(published_at_pattern, html) else None
drama = re.search(drama_pattern, html).group(
1).strip() if re.search(drama_pattern, html) else None
score = float(re.search(score_pattern, html).group(1).strip()
) if re.search(score_pattern, html) else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
# 解析详情页pq方法
from pyquery import PyQuery as pq
def parse_detail(html):
"""
parse detail page
:param html: html of detail page
:return: data
"""
doc = pq(html)
cover = doc('img.cover').attr('src')
name = doc('a > h2').text()
categories = [item.text() for item in doc('.categories button span').items()]
published_at = doc('.info:contains(上映)').text()
published_at = re.search('(\d{4}-\d{2}-\d{2})', published_at).group(1) \
if published_at and re.search('\d{4}-\d{2}-\d{2}', published_at) else None
drama = doc('.drama p').text()
score = doc('p.score').text()
score = float(score) if score else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
# 保存数据
def save_data(data):
"""
save to json file
:param data:
:return:
"""
name = data.get('name')
data_path = f'{RESULTS_DIR}/{name}.json'
json.dump(data, open(data_path, 'w', encoding='utf-8'),
ensure_ascii=False, indent=2)
# main 方法
def main(page):
"""
main process
:return:
"""
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info('get detail data %s', data)
logging.info('saving data to json file')
save_data(data)
logging.info('data saved successfully')
# 多进程加速
if __name__ == '__main__':
pool = multiprocessing.Pool()
pages = range(1, TOTAL_PAGE + 1)
pool.map(main, pages)
pool.close()