文章目录
- 发现宝藏
- 一、 目标
- 二、 浅析
- 三、获取所有模块
- 四、请求处理模块、版面、文章
- 1. 分析切换页面的参数传递
- 2. 获取共有多少页标签并遍历版面
- 3.解析版面并保存版面信息
- 4. 解析文章列表和文章
- 5. 清洗文章
- 6. 保存文章图片
- 五、完整代码
- 六、效果展示
发现宝藏
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。
一、 目标
爬取news.mit.edu
的字段,包含标题、内容,作者,发布时间,链接地址,文章快照 (可能需要翻墙才能访问)
二、 浅析
1.全部新闻大致分为4个模块
2.每个模块的标签列表大致如下
3.每个标签对应的文章列表大致如下
4.具体每篇文章对应的结构如下
三、获取所有模块
其实就四个模块,列举出来就好,然后对每个分别解析爬取每个模块
class MitnewsScraper:
def __init__(self, root_url, model_url, img_output_dir):
self.root_url = root_url
self.model_url = model_url
self.img_output_dir = img_output_dir
self.headers = {
'Referer': 'https://news.mit.edu/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/122.0.0.0 Safari/537.36',
'Cookie': '替换成你自己的',
}
...
def run():
root_url = 'https://news.mit.edu/'
model_urls = ['https://news.mit.edu/topic', 'https://news.mit.edu/clp',
'https://news.mit.edu/department', 'https://news.mit.edu/']
output_dir = 'D:\imgs\mit-news'
for model_url in model_urls:
scraper = MitnewsScraper(root_url, model_url, output_dir)
scraper.catalogue_all_pages()
四、请求处理模块、版面、文章
先处理一个模块(TOPICS)
1. 分析切换页面的参数传递
如图可知是get请求,需要传一个参数page
2. 获取共有多少页标签并遍历版面
实际上是获取所有的page参数,然后进行遍历获取所有的标签
# 获取一个模块有多少版面
def catalogue_all_pages(self):
response = requests.get(self.model_url, headers=self.headers)
soup = BeautifulSoup(response.text, 'html.parser')
try:
match = re.search(r'of (\d+) topics', soup.text)
total_catalogues = int(match.group(1))
total_pages = math.ceil(total_catalogues / 20)
print('topics模块一共有' + match.group(1) + '个版面,' + str(total_pages) + '页数据')
for page in range(0, total_pages):
self.parse_catalogues(page)
print(f"========Finished catalogues page {page + 1}========")
except:
self.parse_catalogues(0)
3.解析版面并保存版面信息
前三个模块的版面列表
第四个模块的版面列表
# 解析版面列表里的版面
def parse_catalogues(self, page):
params = {'page': page}
response = requests.get(self.model_url, params=params, headers=self.headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
if self.root_url == self.model_url:
catalogue_list = soup.find('div',
'site-browse--recommended-section site-browse--recommended-section--schools')
catalogues_list = catalogue_list.find_all('li')
else:
catalogue_list = soup.find('ul', 'page-vocabulary--views--list')
catalogues_list = catalogue_list.find_all('li')
for index, catalogue in enumerate(catalogues_list):
# 操作时间
date = datetime.now()
# 版面标题
catalogue_title = catalogue.find('a').get_text(strip=True)
print('第' + str(index + 1) + '个版面标题为:' + catalogue_title)
catalogue_href = catalogue.find('a').get('href')
# 版面id
catalogue_id = catalogue_href[1:]
catalogue_url = self.root_url + catalogue_href
print('第' + str(index + 1) + '个版面地址为:' + catalogue_url)
# 根据版面url解析文章列表
response = requests.get(catalogue_url, headers=self.headers)
soup = BeautifulSoup(response.text, 'html.parser')
match = re.search(r'of (\d+)', soup.text)
# 查找一个版面有多少篇文章
total_cards = int(match.group(1))
total_pages = math.ceil(total_cards / 15)
print(f'{catalogue_title}版面一共有{total_cards}篇文章,' + f'{total_pages}页数据')
for page in range(0, total_pages):
self.parse_cards_list(page, catalogue_url, catalogue_id)
print(f"========Finished {catalogue_title} 版面 page {page + 1}========")
# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')
# 创建或选择数据库
db = client['mit-news']
# 创建或选择集合
catalogues_collection = db['catalogues']
# 插入示例数据到 catalogues 集合
catalogue_data = {
'id': catalogue_id,
'date': date,
'title': catalogue_title,
'url': catalogue_url,
'cardSize': total_cards
}
catalogues_collection.insert_one(catalogue_data)
return True
else:
raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
4. 解析文章列表和文章
寻找冗余部分并删除,例如
# 解析文章列表里的文章
def parse_cards_list(self, page, url, catalogue_id):
params = {'page': page}
response = requests.get(url, params=params, headers=self.headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
card_list = soup.find('div', 'page-term--views--list')
cards_list = card_list.find_all('div', 'page-term--views--list-item')
for index, card in enumerate(cards_list):
# 对应的版面id
catalogue_id = catalogue_id
# 操作时间
date = datetime.now()
# 文章标题
card_title = card.find('a', 'term-page--news-article--item--title--link').find('span').get_text(
strip=True)
# 文章简介
card_introduction = card.find('p', 'term-page--news-article--item--dek').find('span').get_text(
strip=True)
# 文章更新时间
publish_time = card.find('p', 'term-page--news-article--item--publication-date').find('time').get(
'datetime')
updateTime = datetime.strptime(publish_time, '%Y-%m-%dT%H:%M:%SZ')
# 文章地址
temp_url = card.find('div', 'term-page--news-article--item--cover-image').find('a').get('href')
url = 'https://news.mit.edu' + temp_url
# 文章id
pattern = r'(\w+(-\w+)*)-(\d+)'
match = re.search(pattern, temp_url)
card_id = str(match.group(0))
card_response = requests.get(url, headers=self.headers)
soup = BeautifulSoup(card_response.text, 'html.parser')
# 原始htmldom结构
html_title = soup.find('div', id='block-mit-page-title')
html_content = soup.find('div', id='block-mit-content')
# 合并标题和内容
html_title.append(html_content)
html_cut1 = soup.find('div', 'news-article--topics')
html_cut2 = soup.find('div', 'news-article--archives')
html_cut3 = soup.find('div', 'news-article--content--side-column')
html_cut4 = soup.find('div', 'news-article--press-inquiries')
html_cut5 = soup.find_all('div', 'visually-hidden')
html_cut6 = soup.find('p', 'news-article--images-gallery--nav--inner')
# 移除元素
if html_cut1:
html_cut1.extract()
if html_cut2:
html_cut2.extract()
if html_cut3:
html_cut3.extract()
if html_cut4:
html_cut4.extract()
if html_cut5:
for item in html_cut5:
item.extract()
if html_cut6:
html_cut6.extract()
# 获取合并后的内容文本
html_content = html_title
# 文章作者
author_list = html_content.find('div', 'news-article--authored-by').find_all('span')
author = ''
for item in author_list:
author = author + item.get_text()
# 增加保留html样式的源文本
origin_html = html_content.prettify() # String
# 转义网页中的图片标签
str_html = self.transcoding_tags(origin_html)
# 再包装成
temp_soup = BeautifulSoup(str_html, 'html.parser')
# 反转译文件中的插图
str_html = self.translate_tags(temp_soup.text)
# 绑定更新内容
content = self.clean_content(str_html)
# 下载图片
imgs = []
img_array = soup.find_all('div', 'news-article--image-item')
for item in img_array:
img_url = self.root_url + item.find('img').get('data-src')
imgs.append(img_url)
if len(imgs) != 0:
# 下载图片
illustrations = self.download_images(imgs, card_id)
# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')
# 创建或选择数据库
db = client['mit-news']
# 创建或选择集合
cards_collection = db['cards']
# 插入示例数据到 catalogues 集合
card_data = {
'id': card_id,
'catalogueId': catalogue_id,
'type': 'mit-news',
'date': date,
'title': card_title,
'author': author,
'card_introduction': card_introduction,
'updatetime': updateTime,
'url': url,
'html_content': str(html_content),
'content': content,
'illustrations': illustrations,
}
cards_collection.insert_one(card_data)
return True
else:
raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
5. 清洗文章
# 工具 转义标签
def transcoding_tags(self, htmlstr):
re_img = re.compile(r'\s*<(img.*?)>\s*', re.M)
s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义
return s
# 工具 转义标签
def translate_tags(self, htmlstr):
re_img = re.compile(r'@@##(img.*?)##@@', re.M)
s = re_img.sub(r'<\1>', htmlstr) # IMG 转义
return s
# 清洗文章
def clean_content(self, content):
if content is not None:
content = re.sub(r'\r', r'\n', content)
content = re.sub(r'\n{2,}', '', content)
content = re.sub(r' {6,}', '', content)
content = re.sub(r' {3,}\n', '', content)
content = re.sub(r'<img src="../../../image/zxbl.gif"/>', '', content)
content = content.replace(
'<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ', '')
content = content.replace(
''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''',
'') \
.replace(' <!--enpcontent', '').replace('<TABLE>', '')
content = content.replace('<P>', '').replace('<\P>', '').replace(' ', ' ')
return content
6. 保存文章图片
# 下载图片
def download_images(self, img_urls, card_id):
# 根据card_id创建一个新的子目录
images_dir = os.path.join(self.img_output_dir, card_id)
if not os.path.exists(images_dir):
os.makedirs(images_dir)
downloaded_images = []
for index, img_url in enumerate(img_urls):
try:
response = requests.get(img_url, stream=True, headers=self.headers)
if response.status_code == 200:
# 从URL中提取图片文件名
img_name_with_extension = img_url.split('/')[-1]
pattern = r'^[^?]*'
match = re.search(pattern, img_name_with_extension)
img_name = match.group(0)
# 保存图片
with open(os.path.join(images_dir, img_name), 'wb') as f:
f.write(response.content)
downloaded_images.append([img_url, os.path.join(images_dir, img_name)])
except requests.exceptions.RequestException as e:
print(f'请求图片时发生错误:{e}')
except Exception as e:
print(f'保存图片时发生错误:{e}')
return downloaded_images
# 如果文件夹存在则跳过
else:
print(f'文章id为{card_id}的图片文件夹已经存在')
return []
五、完整代码
import os
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient
import re
import math
class MitnewsScraper:
def __init__(self, root_url, model_url, img_output_dir):
self.root_url = root_url
self.model_url = model_url
self.img_output_dir = img_output_dir
self.headers = {
'Referer': 'https://news.mit.edu/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/122.0.0.0 Safari/537.36',
'Cookie': '_fbp=fb.1.1708485200443.423329752; _ga_HWFLFTST95=GS1.2.1708485227.1.0.1708485227.0.0.0; '
'_hp2_id.2065608176=%7B%22userId%22%3A%228766180140632296%22%2C%22pageviewId%22%3A'
'%223284419326231258%22%2C%22sessionId%22%3A%228340313071591018%22%2C%22identity%22%3Anull%2C'
'%22trackerVersion%22%3A%224.0%22%7D; _ga_RP0185XJY9=GS1.1.1708485227.1.0.1708485301.0.0.0; '
'_ga_PW4Z02MCFS=GS1.1.1709002375.3.0.1709002380.0.0.0; '
'_ga_03E2REYYWV=GS1.1.1709002375.3.0.1709002380.0.0.0; _gid=GA1.2.2012514268.1709124148; '
'_gat_UA-1592615-17=1; _gat_UA-1592615-30=1; '
'_ga_342NG5FVLH=GS1.1.1709256315.12.1.1709259230.0.0.0; _ga=GA1.1.1063081174.1708479841; '
'_ga_R8TSBG6RMB=GS1.2.1709256316.12.1.1709259230.0.0.0; '
'_ga_5BGKP7GP4G=GS1.2.1709256316.12.1.1709259230.0.0.0',
}
# 获取一个模块有多少版面
def catalogue_all_pages(self):
response = requests.get(self.model_url, headers=self.headers)
soup = BeautifulSoup(response.text, 'html.parser')
try:
match = re.search(r'of (\d+) topics', soup.text)
total_catalogues = int(match.group(1))
total_pages = math.ceil(total_catalogues / 20)
print('topics模块一共有' + match.group(1) + '个版面,' + str(total_pages) + '页数据')
for page in range(0, total_pages):
self.parse_catalogues(page)
print(f"========Finished catalogues page {page + 1}========")
except:
self.parse_catalogues(0)
# 解析版面列表里的版面
def parse_catalogues(self, page):
params = {'page': page}
response = requests.get(self.model_url, params=params, headers=self.headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
if self.root_url == self.model_url:
catalogue_list = soup.find('div',
'site-browse--recommended-section site-browse--recommended-section--schools')
catalogues_list = catalogue_list.find_all('li')
else:
catalogue_list = soup.find('ul', 'page-vocabulary--views--list')
catalogues_list = catalogue_list.find_all('li')
for index, catalogue in enumerate(catalogues_list):
# 操作时间
date = datetime.now()
# 版面标题
catalogue_title = catalogue.find('a').get_text(strip=True)
print('第' + str(index + 1) + '个版面标题为:' + catalogue_title)
catalogue_href = catalogue.find('a').get('href')
# 版面id
catalogue_id = catalogue_href[1:]
catalogue_url = self.root_url + catalogue_href
print('第' + str(index + 1) + '个版面地址为:' + catalogue_url)
# 根据版面url解析文章列表
response = requests.get(catalogue_url, headers=self.headers)
soup = BeautifulSoup(response.text, 'html.parser')
match = re.search(r'of (\d+)', soup.text)
# 查找一个版面有多少篇文章
total_cards = int(match.group(1))
total_pages = math.ceil(total_cards / 15)
print(f'{catalogue_title}版面一共有{total_cards}篇文章,' + f'{total_pages}页数据')
for page in range(0, total_pages):
self.parse_cards_list(page, catalogue_url, catalogue_id)
print(f"========Finished {catalogue_title} 版面 page {page + 1}========")
# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')
# 创建或选择数据库
db = client['mit-news']
# 创建或选择集合
catalogues_collection = db['catalogues']
# 插入示例数据到 catalogues 集合
catalogue_data = {
'id': catalogue_id,
'date': date,
'title': catalogue_title,
'url': catalogue_url,
'cardSize': total_cards
}
catalogues_collection.insert_one(catalogue_data)
return True
else:
raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
# 解析文章列表里的文章
def parse_cards_list(self, page, url, catalogue_id):
params = {'page': page}
response = requests.get(url, params=params, headers=self.headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
card_list = soup.find('div', 'page-term--views--list')
cards_list = card_list.find_all('div', 'page-term--views--list-item')
for index, card in enumerate(cards_list):
# 对应的版面id
catalogue_id = catalogue_id
# 操作时间
date = datetime.now()
# 文章标题
card_title = card.find('a', 'term-page--news-article--item--title--link').find('span').get_text(
strip=True)
# 文章简介
card_introduction = card.find('p', 'term-page--news-article--item--dek').find('span').get_text(
strip=True)
# 文章更新时间
publish_time = card.find('p', 'term-page--news-article--item--publication-date').find('time').get(
'datetime')
updateTime = datetime.strptime(publish_time, '%Y-%m-%dT%H:%M:%SZ')
# 文章地址
temp_url = card.find('div', 'term-page--news-article--item--cover-image').find('a').get('href')
url = 'https://news.mit.edu' + temp_url
# 文章id
pattern = r'(\w+(-\w+)*)-(\d+)'
match = re.search(pattern, temp_url)
card_id = str(match.group(0))
card_response = requests.get(url, headers=self.headers)
soup = BeautifulSoup(card_response.text, 'html.parser')
# 原始htmldom结构
html_title = soup.find('div', id='block-mit-page-title')
html_content = soup.find('div', id='block-mit-content')
# 合并标题和内容
html_title.append(html_content)
html_cut1 = soup.find('div', 'news-article--topics')
html_cut2 = soup.find('div', 'news-article--archives')
html_cut3 = soup.find('div', 'news-article--content--side-column')
html_cut4 = soup.find('div', 'news-article--press-inquiries')
html_cut5 = soup.find_all('div', 'visually-hidden')
html_cut6 = soup.find('p', 'news-article--images-gallery--nav--inner')
# 移除元素
if html_cut1:
html_cut1.extract()
if html_cut2:
html_cut2.extract()
if html_cut3:
html_cut3.extract()
if html_cut4:
html_cut4.extract()
if html_cut5:
for item in html_cut5:
item.extract()
if html_cut6:
html_cut6.extract()
# 获取合并后的内容文本
html_content = html_title
# 文章作者
author_list = html_content.find('div', 'news-article--authored-by').find_all('span')
author = ''
for item in author_list:
author = author + item.get_text()
# 增加保留html样式的源文本
origin_html = html_content.prettify() # String
# 转义网页中的图片标签
str_html = self.transcoding_tags(origin_html)
# 再包装成
temp_soup = BeautifulSoup(str_html, 'html.parser')
# 反转译文件中的插图
str_html = self.translate_tags(temp_soup.text)
# 绑定更新内容
content = self.clean_content(str_html)
# 下载图片
imgs = []
img_array = soup.find_all('div', 'news-article--image-item')
for item in img_array:
img_url = self.root_url + item.find('img').get('data-src')
imgs.append(img_url)
if len(imgs) != 0:
# 下载图片
illustrations = self.download_images(imgs, card_id)
# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')
# 创建或选择数据库
db = client['mit-news']
# 创建或选择集合
cards_collection = db['cards']
# 插入示例数据到 catalogues 集合
card_data = {
'id': card_id,
'catalogueId': catalogue_id,
'type': 'mit-news',
'date': date,
'title': card_title,
'author': author,
'card_introduction': card_introduction,
'updatetime': updateTime,
'url': url,
'html_content': str(html_content),
'content': content,
'illustrations': illustrations,
}
cards_collection.insert_one(card_data)
return True
else:
raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
# 下载图片
def download_images(self, img_urls, card_id):
# 根据card_id创建一个新的子目录
images_dir = os.path.join(self.img_output_dir, card_id)
if not os.path.exists(images_dir):
os.makedirs(images_dir)
downloaded_images = []
for index, img_url in enumerate(img_urls):
try:
response = requests.get(img_url, stream=True, headers=self.headers)
if response.status_code == 200:
# 从URL中提取图片文件名
img_name_with_extension = img_url.split('/')[-1]
pattern = r'^[^?]*'
match = re.search(pattern, img_name_with_extension)
img_name = match.group(0)
# 保存图片
with open(os.path.join(images_dir, img_name), 'wb') as f:
f.write(response.content)
downloaded_images.append([img_url, os.path.join(images_dir, img_name)])
except requests.exceptions.RequestException as e:
print(f'请求图片时发生错误:{e}')
except Exception as e:
print(f'保存图片时发生错误:{e}')
return downloaded_images
# 如果文件夹存在则跳过
else:
print(f'文章id为{card_id}的图片文件夹已经存在')
return []
# 工具 转义标签
def transcoding_tags(self, htmlstr):
re_img = re.compile(r'\s*<(img.*?)>\s*', re.M)
s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义
return s
# 工具 转义标签
def translate_tags(self, htmlstr):
re_img = re.compile(r'@@##(img.*?)##@@', re.M)
s = re_img.sub(r'<\1>', htmlstr) # IMG 转义
return s
# 清洗文章
def clean_content(self, content):
if content is not None:
content = re.sub(r'\r', r'\n', content)
content = re.sub(r'\n{2,}', '', content)
content = re.sub(r' {6,}', '', content)
content = re.sub(r' {3,}\n', '', content)
content = re.sub(r'<img src="../../../image/zxbl.gif"/>', '', content)
content = content.replace(
'<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ', '')
content = content.replace(
''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''',
'') \
.replace(' <!--enpcontent', '').replace('<TABLE>', '')
content = content.replace('<P>', '').replace('<\P>', '').replace(' ', ' ')
return content
def run():
root_url = 'https://news.mit.edu/'
model_urls = ['https://news.mit.edu/topic', 'https://news.mit.edu/clp',
'https://news.mit.edu/department', 'https://news.mit.edu/']
output_dir = 'D:\imgs\mit-news'
for model_url in model_urls:
scraper = MitnewsScraper(root_url, model_url, output_dir)
scraper.catalogue_all_pages()
if __name__ == "__main__":
run()
六、效果展示