在爬虫项目中,我们需要将目标站点数据进行持久化保存,一般数据保存的方式有两种:
- 文件保存
- 数据库保存
在数据保存的过程中需要对数据完成去重操作,所有需要使用 redis 中的 set 数据类型完成去重。
1.CSV文件存储
1.1 什么是csv
通俗直白的说:就是一个普通文件,里面的内容是每一行中的数据用逗号分隔,然后文件后缀为 csv。
1.2 python 对 csv 文件进行读写操作
写入列表数据到 csv 文件
import csv
headers=['班级','姓名','性别','手机号','qq']
rows= [
["21级Python", '小王', '男', '13146060xx1', '123456xx1'],
["23级Python", '小李', '男', '13146060xx2', '123456xx2'],
["24级Python", '小赵', '女', '13146060xx3', '123456xx3'],
["25级Python", '小红', '女', '13146060xx4', '123456xx4'],
]
with open('test.csv','w')as f:
# 创建一个csv的writer对象,这样才能够将写入csv格式数据到这个文件
f_csv=csv.writer(f)
# 写入一行(我们用第一行当做表头)
f_csv.writerow(headers)
# 写入多行(当做数据)
f_csv.writerows(rows)
写入字典数据到 csv 文件
import csv
rows = [
{
"class_name": "21级Python",
"name": '小王',
"gender": '男',
"phone": '13146060xx1',
"qq": '123456xx1'
},
{
"class_name": "23级Python",
"name": '小李',
"gender": '男',
"phone": '13146060xx2',
"qq": '123456xx2'
},
{
"class_name": "25级Python",
"name": '小赵',
"gender": '女',
"phone": '13146060xx3',
"qq": '123456xx3'
},
{
"class_name": "25级Python",
"name": '小红',
"gender": '女',
"phone": '13146060xx4',
"qq": '123456xx4'
},
]
with open('test2.csv','w')as f:
# 创建一个csv的DictWriter对象,这样才能够写入csv格式数据到这个文件
f_csv=csv.DicWriter(f,['class_name','name','gender','phone','qq'])
# 写入一行(我们用第一行当做表头)
f_csv.writeheader()
# 写入多行(当做数据)
f_csv.writerows(rows)
读取 csv 文件
import csv
with open('test.csv')as f:
# 创建一个reader对象,迭代时能够提取到每一行(包括表头)
f_csv=csv.reader(f)
for row in f_csv:
print(type(row),row)
读取 csv 文件内容并封装为字典
import csv
with open('test1.csv') as f:
# 创建一个reader对象,迭代时能够提取到每一行(包括表头)
f_csv=csv.DictReader(f)
for row in f_csv:
# print(type(row),row)
print(row.get('class_name'),row.get('name'),row.get('phone'),row.get('qq')
1.3 b站数据采集
目标网站地址:https://search.bilibili.com/video?keyword=篮球&from_source=webtop_search&spm_id_from=333.1007&search_source=5
import csv
import requests
class SaveVideoInfo():
def __init__(self):
self.url='https://api.bilibili.com/x/web-interface/wbi/search/type?category_id=&search_type=video&ad_resource=5654&__refresh__=true&_extra=&context=&page={}&page_size=42&pubtime_begin_s=0&pubtime_end_s=0&from_source=&from_spmid=333.337&platform=pc&highlight=1&single_column=0&keyword=%E7%AF%AE%E7%90%83&qv_id=UqmKhHZFJGpXJFFFysVbJhBLO4zYWxY2&source_tag=3&gaia_vtoken=&dynamic_offset=0&web_location=1430654&w_rid=c58c1bee2f07bcb5250d3833e77c22fc&wts=1740882524'
self.headers={'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
'Cookie':"header_theme_version=CLOSE; DedeUserID=1298980584; DedeUserID__ckMd5=5e3b2340397bdf6e; enable_web_push=DISABLE; buvid3=E09C3995-A4E5-1427-8C75-E1AB8023027295670infoc; rpdid=|(k)~YuRY)RR0J'u~u|YYumYu; FEED_LIVE_VERSION=V_WATCHLATER_PIP_WINDOW3; fingerprint=c6a73c0f0dda55fe0746058e948b8654; buvid_fp_plain=undefined; buvid_fp=c6a73c0f0dda55fe0746058e948b8654; hit-dyn-v2=1; home_feed_column=4; buvid4=D4D31387-5D1E-A689-10B9-E17B2599A15351637-024090611-3XXTIiL+zW/2j4Tgy4In/g%3D%3D; enable_feed_channel=DISABLE; b_nut=100; _uuid=A779410D4-3F32-5B8C-872D-22A37674D73C07883infoc; browser_resolution=1280-632; CURRENT_QUALITY=80; bp_t_offset_1298980584=1036726571777392640; b_lsid=7F947BD3_19554A85E19; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NDExNDEyNzUsImlhdCI6MTc0MDg4MjAxNSwicGx0IjotMX0.aIhl_FfT5aPDRq54w2lQD8qA76uyoqdCY6LRlo-xS5M; bili_ticket_expires=1741141215; SESSDATA=d3ec22cb%2C1756434076%2C4b232%2A32CjC1HLgvdM4y8bKAGeIfrMY5cm4LWa-65-GDqQyHmQVBvADeCGns4u2YfIICt5-J9ckSVnVOdXBWQU9xZi1NUFBLNHNkMGk2WExrQk5aYXp6bXNNVWlFNTZIdUM1UjB2M3oyNXE4NzZFMEVSTWpJSm84QXZBekp0RG4tQi1BX3I3MGZmX2tLOUdRIIEC; bili_jct=8c27e9ac7fb3ae6ed9c99ffc500d6c4c; CURRENT_FNVAL=2000; sid=86toff3u"
}
def save(self):
with open('test_b.csv','w',newline='')as f:
header = ['author', 'tag', 'arcurl']
f_csv=csv.DictWriter(f,fieldnames=header)
f_csv.writeheader()
for page in range(1,6):
res=requests.get(self.url.format(page),headers=self.headers).json()
for i in res['data']['result']:
item=dict()
item['author']=i['author']
item['tag']=i['tag']
item['arcurl']=i['arcurl']
# print(item)
f_csv.writerow(item)
s=SaveVideoInfo()
s.save()
2.JSON文件存储
2.1 json数据格式介绍
JSON 全称为 JavaScript Object Notation ,也就是 JavaScript 对象标记,它通过对象和数组的组合来表示数据,构造简洁但是结构化程度非常高,是一种轻量级的数据交换格式。
常见的 json 数据格式如下:
[
{
"name": "Bob",
"gender": "male",
"birthday": "1992-10-18"
},
{
"name": "Selina",
"gender": "female",
"birthday": "1995-10-18"
}
]
由中括号包围的就相当于列表类型,列表中的每一个元素可以是任意类型,这个示例中它是字典类型,由大括号包围。
json 可以由以上两种形式自由组合而成,可以无限次嵌套,结构清晰,是数据交换的极佳方式。
2.2 python 中的 json 模块
方法 | 作用 |
---|---|
json.dumps() | 把 python 对象转换成 json 对象,生成的是字符串 |
json.dump() | 用于将 dict 类型的数据转成 str ,并写入到 json 文件中 |
json.loads() | 将 json 字符串解码成 python 对象 |
json.load() | 用于从 json 文件中读取数据 |
2.3 爬虫案例——4399网站游戏信息采集
目标地址:https://www.4399.com/flash/
import json
import requests
from lxml import etree
class SaveInfo():
def __init__(self):
self.url='https://www.4399.com/flash/'
self.headers={'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
def save(self):
with open('data.json','w',encoding='utf-8') as f:
res=requests.get(self.url,headers=self.headers)
res.encoding='gbk'
# 数据解析
html=etree.HTML(res.text)
data_list=html.xpath('//ul[@class="n-game cf"]/li/a')
all_list=[]
for i in data_list:
item=dict()
item['title'] = i.xpath('b/text()')[0]
item['href']='https://www.4399.com'+i.xpath('@href')[0]
all_list.append(item)
# f.write(json.dumps(all_list))
# # 禁止ascii编码
f.write(json.dumps(all_list, indent=2, ensure_ascii=False))
s=SaveInfo()
s.save()
3.MySQL数据库存储
在处理 python web开发或者其他需要频繁进行数据库操作的项目时,重复的打开和关闭数据库连接既消费时间也浪费资源。为了解决这个问题我们采用数据库连接池的方式复用已经创建好的连接对象,从而无需频繁的开启连接和关闭连接。
3.1 pymysql 的使用
环境准备
- 安装pymysql
pip install pymysql -i https://pypi.douban.com/simple
- 创建数据库
create database py_spider charset=utf8;
PyMysql连接
import pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='000000',
database='python_mysql'
) # 数据库名字
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 关闭数据库连接
db.close()
PyMysql插入
data1,data2,data3 =2,'孜然','None'
# SQL 插入语句
sql = """INSERT INTO url_data(
url_id,
url_title,
url_author)
VALUES (%s,%s,%s)"""
try:
# 执行sql语句
cursor.execute(sql,(data1,data2,data3))
# 提交到数据库执行
db.commit()
print('成功')
except Exception as e:
# 如果发生错误则回滚
db.rollback()
print(f'失败{e}')
PyMysql查询
# SQL 查询语句
sql = "SELECT * FROM url_data WHERE url_id = 1"
try:
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
print(results)
except:
print("Error: unable to fetch data")
PyMysql更新
# 更新
"UPDATE url_data SET url_title = '番茄酱' WHERE url_id = 1;"
PyMysql删除
# 删除
"DELETE FROM url_data WHERE url_title = '番茄酱'"
案例:
网站地址:搜索 | 腾讯招聘
import requests
import pymysql
# 网址:https://careers.tencent.com/search.html?query=at_1,at_2&keyword=python
class SaveTxWork():
def __init__(self):
self.url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1740891587275&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=1,2&keyword=python&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
self.db=pymysql.connect(host='localhost',user='root',password='123456',db='py_spider')
self.cursor=self.db.cursor()
def get_work_info(self):
for page in range(1,15):
res=requests.get(self.url.format(page),headers=self.headers).json()
print(f'正在抓取第{page}页面')
yield res['Data']['Posts']
def create_table(self):
sql="""
create table if not exists tx_work(
id int primary key auto_increment,
work_name varchar(100) not null,
country_name varchar(100),
city_name varchar(100),
work_desc text
);
"""
try:
self.cursor.execute(sql)
print('数据表创建成功')
except Exception as e:
print('数据表创建失败:',e)
def insert_table(self,*args):
sql="""insert into tx_work(id,work_name,country_name,city_name,work_desc) values (%s,%s,%s,%s,%s)"""
try:
self.cursor.execute(sql,args)
self.db.commit()
print('数据插入成功')
except Exception as e:
self.db.rollback() # 回滚事务
print('数据插入失败:',e)
def main(self):
self.create_table()
all_work_generator_object=self.get_work_info()
work_id=0
for work_info_list in all_work_generator_object:
if work_info_list:
for work_info in work_info_list:
work_name = work_info['RecruitPostName']
country_name = work_info['CountryName']
city_name = work_info['LocationName']
work_desc = work_info['Responsibility']
self.insert_table(work_id,work_name,country_name,city_name,work_desc)
else:
print('数据为空')
continue
# 任务完成后关闭数据库连接
self.db.close()
if __name__=='__main__':
tx_work=SaveTxWork()
tx_work.main()
3.2 DBUtils 的简单使用
安装命令:
pip install DBUtils
导入模块:
from dbutils.pooled_db import PooledDB
创建数据库连接池:
使用 PooledDB 创建数据库连接池,连接池使用了一种新的 DB-API 连接方式,可以维护活动连接的池。当需要数据库连接时,直接从池中获取连接对象。完成操作后,将无需使用的连接对象返回到池中。无需频繁的关闭和开启连接。
pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6,# 连接池允许的最大连接数,0和None表示无限制连接数
mincached=2, # 初始化时,连接池中至少创建的空闲的链接,0表示不创建
maxcached=2, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS:无用,因为pymysql和mysqldb的模块都不支持共享链接
blocking=True, # 连接池中如果没有可用链接后,是否阻塞等待。False,不等待直接报错;True,等待直到有可用链接,再返回。
host='127.0.0.1',
port=3306,
user='<username>',
password='<password>',
database='<database>',
charset='utf8'
)
使用数据库连接池:
连接池对象创建成功后,可以从此对象中获取链接:
# 你可以使用这个游标进行所有的常规的数据库交互操作
db_cursor=pool.connection().cursor()
查询示例:
import pymysql
from dbutils.pooled_db import PooledDB
# 创建连接池对象
pool=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示无限制连接数
mincached=2, # 初始化时,连接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 连接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享
blocking=True, # 连接池中如果没有可用的链接后,是否阻塞等待。False,不等待直接报错,等待直到有可用链接,再返回。
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='py_spider',
charset='utf8'
)
# 获取数据库连接
conn=pool.connection()
# 获取游标
cursor=conn.cursor()
# 执行查询操作
cursor.execute('SELECT * FROM tx_work')
# 获取查询结果
result=cursor.fetchall()
# 打印结果
print(result)
# 关闭游标和连接
总结:
数据库连接池是一种节省资源并提高效率的方法,特别是在处理大量数据库操作的 web 程序和网络应用程序中。创建连接池对象并获取到游标后,游标的使用方式与 pymysql 中的游标使用方法一致,在后面的并发爬虫中,我们会利用数据库连接池完成数据的并发读写操作。
4.MongoDB数据库存储
MongoDB 是由 C++语言编写的非关系型数据库,是一个基于分布式文件存储的开源数据库系统,其内容存储形式类似 JSON 对象,它的字段值可以包含其他文档、数组及文档数组。
常用命令:
- 查询数据库:show dbs
- 使用数据库:use 库名
- 查看集合:show tables /show collections
- 查询表数据:db.集合名.find()
- 删除表:db.集合名.drop()
mongoDB在python中的基础使用
连接 MongoDB 时,我们需要使用 PyMongo 库里面的 MongoClient。一般来说,传入 MongoDB 的 IP 及端口即可,其中第一个参数为地址 host ,第二个参数为端口 port (如果不给它传递参数,默认是27017)。
import pymongo
# 0.连接到 MongoDB 服务器
client=pymogo.MongoClient(host='localhost',port=27017)
# 1.创建 SPIDER 数据库
db = client['SPIDER']
# 3.在 SPIDER 中创建集合
collection = db['spider_1']
# 5.插入一条示例数据
data1 = {
"name": "example",
"value": 42
}
data2 = {
"name": "example1",
"value": 43
}
insert_result = collection.insert_many([data1, data2])
# 2.查看有什么数据库
print("所有数据库:", client.list_database_names())
# 4.查看 SPIDER 中有什么集合
print("SPIDER 数据库中的集合:", db.list_collection_names())
# 6.查看 spider_1 中的数据
print("spider_1 集合中的数据:")
for i in collection.find():
print(i)
# 7.删除一条数据
delete_result = collection.delete_one({'name': 'example'})
print(f"删除了 {delete_result.deleted_count} 条文档")
# 再次查看 spider_1 中的数据
print("删除数据后 spider_1 集合中的数据:")
for i in collection.find():
print(i)
# 8.更新数据
# 假设我们要将 name 为 example1 的文档的 value 字段更新为 50
update_filter = {"name": "example1"}
update_data = {"$set": {"value": 50}}
update_result = collection.update_one(update_filter, update_data)
print(f"更新了 {update_result.modified_count} 条文档")
# 再次查看更新后 spider_1 中的数据
print("更新数据后 spider_1 集合中的数据:")
for i in collection.find():
print(i)
案例——爱奇艺视频数据信息
获取到爱奇艺视频数据信息:标题、播放地址、简介
目标地址:内地电视剧大全-好看的内地电视剧排行榜-爱奇艺
import pymongo
import requests
class AiqiyiInfo():
def __init__(self):
self.mongo_client=pymongo.MongoClient(host='localhost',port=27017)
self.collection=self.mongo_client['py_spider']['AiQiYi']
self.url='https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=101d987ca7d145e4a9d60b073b02d96e&three_category_id=15;must'
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
def get_movie_info(self):
for i in range(1,15):
print(f'正在爬取第{i}页')
res=requests.get(self.url.format(i),headers=self.headers).json()['data']['list']
yield res
def parse_movie_info(self,res):
item=dict()
for vedio_list in res:
if vedio_list:
for vedio_info in vedio_list:
item['name']=vedio_info ['name']
item['playUrl']=vedio_info ['playUrl']
item['description']=vedio_info ['description']
try:
self.save(item)
except Exception as e:
print('保存失败',e)
print('保存成功')
def save(self,item):
if '_id' in item:
del item['_id'] # 如果存在,删除该字段,让 MongoDB 自动生成唯一 _id
self.collection.insert_one(item)
def main(self):
res=self.get_movie_info()
self.parse_movie_info(res)
# 程序完成后关闭数据库链接
self.mongo_client.close()
if __name__=='__main__':
a=AiqiyiInfo()
a.main()
5.数据去重
在抓取数据的过程中可能因为网络原因造成爬虫程序崩溃退出,如果重新启动爬虫的话会造成数据入库重复的问题。下面我们使用redis来进行数据去重。
安装redis
在windows中安装redis的教程:Window下Redis的安装和部署详细图文教程(Redis的安装和可视化工具的使用)_redis安装-CSDN博客
pip install redis -i https://pypi.douban.com/simple
项目需求以及思路分析
目标网址:芒果TV
思路分析:
- 首先判断当前网站上的数据是否为动态数据,如果为动态数据则使用浏览器抓包工具获取数据接口,当前接口地址如下:https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&platform=pcweb&channelId=2&pn=1&pc=80&hudong=1&_support=10000000&kind=19&area=10&year=all&chargeInfo=a1&sort=c2
- 当获取到数据后对数据进行哈希编码,因为每一个哈希值是唯一的,所以可以利用这一特性判断数据是否重复。
- 将获取的数据存储到
mongodb
数据库中,在调用保存方法之前,先调用哈希方法将数据转为哈希并保存到redis
中,再判断当前获取的数据的哈希是否存在于redis
数据库,如果存在则不保存,反之亦然。
import requests
import pymongo
import redis
import hashlib
class MangGuoInfo():
def __init__(self):
self.mongo_client=pymongo.MongoClient(host='localhost',port=27017)
self.collection=self.mongo_client['MangGuo_TV']['movie_info']
self.redis_client=redis.Redis()
self.url='https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&platform=pcweb&channelId=2&pn={}&pc=80&hudong=1&_support=10000000&kind=3003&area=10&year=all&feature=all&chargeInfo=a1&sort=c2'
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
def get_movie_info(self):
for i in range(1,15):
print(f'正在爬取第{i}页')
res=requests.get(url=self.url.format(i),headers=self.headers).json()
yield res['data']['hitDocs']
def parse_movie_info(self,res):
for movie_list in res:
if movie_list:
item=dict()
for movie_info in movie_list:
item['title']=movie_info['title']
item['subtitle']=movie_info['subtitle']
item['story']=movie_info['story']
self.save(item)
@staticmethod
def get_md5(value):
# md5方法只能接收字节数据
# 计算哈希值,哈希值是唯一的,哈希值长度为32位
md5_hash=hashlib.md5(str(value).encode('utf-8')).hexdigest()
return md5_hash
def save(self,item):
value=self.get_md5(item)
# 当前返回的是redis是否成功保存md5数据,保存成功result=1,保存失败:result=0
result=self.redis_client.sadd('movie:filter',value)
if result:
# 确保插入的数据没有 _id 字段
item.pop('_id', None)
self.collection.insert_one(item)
print(item)
print('保存成功')
else:
print('数据重复...')
def main(self):
res=self.get_movie_info()
self.parse_movie_info(res)
if __name__=='__main__':
m=MangGuoInfo()
m.main()
6.图片的存储
案例:王者荣耀游戏壁纸
目标网址:https://pvp.qq.com/web201605/wallpaper.shtml
import requests
import pymongo
import redis
import hashlib
class MangGuoInfo():
def __init__(self):
self.mongo_client=pymongo.MongoClient(host='localhost',port=27017)
self.collection=self.mongo_client['MangGuo_TV']['movie_info']
self.redis_client=redis.Redis()
self.url='https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&platform=pcweb&channelId=2&pn={}&pc=80&hudong=1&_support=10000000&kind=3003&area=10&year=all&feature=all&chargeInfo=a1&sort=c2'
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
def get_movie_info(self):
for i in range(1,15):
print(f'正在爬取第{i}页')
res=requests.get(url=self.url.format(i),headers=self.headers).json()
yield res['data']['hitDocs']
def parse_movie_info(self,res):
for movie_list in res:
if movie_list:
item=dict()
for movie_info in movie_list:
item['title']=movie_info['title']
item['subtitle']=movie_info['subtitle']
item['story']=movie_info['story']
self.save(item)
@staticmethod
def get_md5(value):
# md5方法只能接收字节数据
# 计算哈希值,哈希值是唯一的,哈希值长度为32位
md5_hash=hashlib.md5(str(value).encode('utf-8')).hexdigest()
return md5_hash
def save(self,item):
value=self.get_md5(item)
# 当前返回的是redis是否成功保存md5数据,保存成功result=1,保存失败:result=0
result=self.redis_client.sadd('movie:filter',value)
if result:
# 确保插入的数据没有 _id 字段
item.pop('_id', None)
self.collection.insert_one(item)
print(item)
print('保存成功')
else:
print('数据重复...')
def main(self):
res=self.get_movie_info()
self.parse_movie_info(res)
if __name__=='__main__':
m=MangGuoInfo()
m.main()