文章目录
- 简介
- cookie
- 爬取雪球热帖
- 代理
- 模拟登陆
- 防盗链
- 异步爬虫
- 协程
- asyncio
- M3U8
- HLS
- 爬取
- selenium
- bilibili
- 无头浏览器
- 规避检测
- MySQL
- MongoDB
- Redis
简介
- 这个系列分四部分
- 基础进阶
- Scrapy 框架
- 逆向分析
- 实战运用
- 先补充一些爬虫需要的基础知识和技能
- 预热,爬取个简历模板网站
import requests from lxml import etree import os headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36', } # 创建一个新的文件夹 dirName = 'jianli' if not os.path.exists(dirName): os.mkdir(dirName) # 通用的url模板 url = 'https://sc.chinaz.com/jianli/free_%d.html' for page in range(1, 11): if page == 1: new_url = 'https://sc.chinaz.com/jianli/free.html' else: new_url = format(url%page) response = requests.get(url=new_url,headers=headers) response.encoding = 'utf-8' page_text = response.text #数据解析:详情页url和简历名称 tree = etree.HTML(page_text) div_list = tree.xpath('//*[@id="container"]/div') #局部解析 for div in div_list: detail_url = div.xpath('./a/@href')[0] title = div.xpath('./p/a/text()')[0]+'.rar' # print(title,detail_url) #对详情页的url发起请求,解析出简历的下载地址 detail_page_text = requests.get(url=detail_url,headers=headers).text #数据解析:解析下载地址 detail_tree = etree.HTML(detail_page_text) li_list = detail_tree.xpath('//*[@id="down"]/div[2]/ul/li') down_list = [] #存储不同的12个下载地址 for li in li_list: download_link = li.xpath('./a/@href')[0] down_list.append(download_link) #随机选择一个下载地址进行简历模板的下载 import random #从列表中随机选出一个下载地址 link = random.choice(down_list) #对下载地址进行请求发送,下载简历模板的压缩包 data = requests.get(url=link,headers=headers).content filePath = dirName+'/'+title with open(filePath,'wb') as fp: fp.write(data) print(title,'下载保存成功!')
cookie
- cookie的本质就是一组数据(键值对的形式存在)
- 是由服务器创建,返回给客户端,最终会保存在客户端浏览器中
- 如果客户端保存了cookie,则下次再次访问该服务器,就会携带 cookie
- 典型案例:网站的免密登录
爬取雪球热帖
- 需求分析
- 分析:浏览发现,热帖的内容是 ajax 动态加载的,通过开发者工具可以拿到动态链接
- 发起请求
import requests import os headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36', 'cookie':'xxx' # 手动复制过来 } url = 'https://xueqiu.com/statuses/hot/listV2.json' # https://xueqiu.com/statuses/hot/listV2.json?since_id=-1&max_id=432727&size=15 param = { "since_id": "-1", "max_id": "432727", "size": "15", } response = requests.get(url=url,headers=headers,params=param) data = response.json() print(data)
- 爬虫拿不到你想要的数据,主要的原因是爬虫程序模拟浏览器的力度不够!一般来讲,模拟的力度重点放置在请求头中
- 但是手动设置 cookie 是有局限的,编写麻烦
- 会存在有效时长
- 可能会存在实时变化的局部数据
- 使用 session 对象自动处理 cookie
- 创建一个空白的session对象
- 使用 session 对象发起请求,捕获 cookie
- 使用携带 cookie 的 session 对象,对目的网址发起请求
import requests #1.创建一个空白的session对象 session = requests.Session() headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36', } main_url = 'https://xueqiu.com/' #2.使用session发起请求,目的是为了捕获到服务器返回的cookie,将其存储到session对象中 session.get(url=main_url,headers=headers) url = 'https://xueqiu.com/statuses/hot/listV2.json' param = { "since_id": "-1", "max_id": "432727", "size": "15", } #3.使用携带了cookie的session对象发起的请求 response = session.get(url=url,headers=headers,params=param) data = response.json() print(data)
- 和直接使用 requests 的区别是,使用 session 对象发起请求,会保留这次会话的上下文环境到对象中,一般处理动态参数都用它,能够保证在多次请求中(多次使用 session 对象的方法时)环境一致(始终是同一个会话,动态参数不变)
- 每次运行程序都会新建 session,刷新 cookie,然后在这次运行中保持不变
代理
- 原理
- 为什么使用代理
- 有些时候,需要对网站服务器发起高频的请求,网站的服务器会检测到这样的异常现象,将请求对应机器的ip地址加入黑名单,就无法再次爬取该网站的数据
- 使用代理后,网站服务器接收到的请求,最终是由代理服务器发起,网站服务器获取的 ip 就是代理服务器的,并不是我们客户端本身的 ip
- 使用代理厂商提供的 ip 都需要实名认证,别想干坏事
- 代理的匿名度
- 透明:网站的服务器知道你使用了代理,也知道你的真实ip
- 匿名:网站服务器知道你使用了代理,但是无法获知你真实的ip
- 高匿:网站服务器不知道你使用了代理,也不知道你的真实ip(推荐)
- 代理的类型(重要)
- http:该类型的代理服务器只可以转发 http 协议的请求
- https:可以转发 https 协议的请求
- 如何获取代理?
- 代理精灵
- 芝麻代理(推荐,有新人福利)
- 如何使用?
- 获取本机 ip
import requests from lxml import etree headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36 Edg/108.0.1462.54', 'Cookie':'__bid_n=xxx' } # 从开发者工具获取这个地址,直接用 ip138 拿不到东西 url = 'https://2023.ip138.com/' page_text = requests.get(url=url, headers=headers).text tree = etree.HTML(page_text) data = tree.xpath('/html/body/p[1]/a[1]/text()')[0] print(data)
- 使用代理,芝麻代理,格式选一般 json
import requests from lxml import etree headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36', } url = 'https://2023.ip138.com/' # 使用代理服务器发起请求 # proxies={'代理类型':'ip:port'} page_text = requests.get(url=url, headers=headers, proxies={'https':'115.219.160.xxx:4275'}).text tree = etree.HTML(page_text) data = tree.xpath('/html/body/p[1]/a[1]/text()')[0] print(data) # 115.219.160.xxx
- 代理池,访问“快代理”
from bs4 import BeautifulSoup import requests import time import random headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36' } # 构建一个代理池 proxy_url = 'http://webapi.http.zhimacangku.com/getip?num=10&type=2&pro=&city=0&yys=0&port=1&pack=285030&ts=1&ys=0&cs=1&lb=1&sb=0&pb=4&mr=1®ions=' json_data = requests.get(url=proxy_url, headers=headers).json() # 解析 json 数据 json_list = json_data['data'] proxy_list = [] # 代理池,每次请求可以随机从代理池中选择一个代理来用 for dic in json_list: ip = dic['ip'] port = dic['port'] n_dic = { # 代理类型:ip:端口号 'https':ip+':'+str(port) # {'https':'111.1.1.1:1234'} } proxy_list.append(n_dic) #爬取多页 #1.创建一个通用的url(可以变换成任意页码的url) url = 'https://www.kuaidaili.com/free/inha/%d/' #2.通过循环以此生成不同页码的url for page in range(1,2): print('----------正在爬取第 %d 页的数据!-----------'%page) #format用来格式化字符串的(不可以修改url这个字符串本身) new_url = format(url%page) #循环发送每一页的请求 #注意:get方法是一个阻塞方法! 随机选择代理 ip page_text = requests.get(url=new_url, headers=headers, proxies=random.choice(proxy_list)).text time.sleep(1) soup = BeautifulSoup(page_text, 'lxml') # 使用 lxml 解析 trs = soup.select('tbody > tr') for tr in trs: t1 = tr.findAll('td')[0] t2 = tr.findAll('td')[1] ip = t1.string port = t2.string print(ip, port)
- 获取本机 ip
模拟登陆
- 这里 demo 一个模拟登陆的案例,只是最简单的情况,并不适合大部分网站的登陆,但可以说明模拟登陆的大致流程
- 以古诗文网为例
- 我们先搞定验证码部分
- 推荐的验证码识别网站
-
图鉴平台,充1块玩一玩,开发文档找到 Python,获取测试脚本
-
超级鹰
-
- 测试一下能否识别成功
import base64 import json from lxml import etree import requests # 一、图片文字类型(默认 3 数英混合): # 1 : 纯数字 # 1001:纯数字2 # 2 : 纯英文 # 1002:纯英文2 # 3 : 数英混合 # 1003:数英混合2 # 4 : 闪动GIF # 7 : 无感学习(独家) # 11 : 计算题 # 1005: 快速计算题 # 16 : 汉字 # 32 : 通用文字识别(证件、单据) # 66: 问答题 # 49 :recaptcha图片识别 # 二、图片旋转角度类型: # 29 : 旋转类型 # # 三、图片坐标点选类型: # 19 : 1个坐标 # 20 : 3个坐标 # 21 : 3 ~ 5个坐标 # 22 : 5 ~ 8个坐标 # 27 : 1 ~ 4个坐标 # 48 : 轨迹类型 # # 四、缺口识别 # 18 : 缺口识别(需要2张图 一张目标图一张缺口图) # 33 : 单缺口识别(返回X轴坐标 只需要1张图) # 五、拼图识别 # 53:拼图识别 def base64_api(uname, pwd, img, typeid): with open(img, 'rb') as f: base64_data = base64.b64encode(f.read()) b64 = base64_data.decode() data = {"username": uname, "password": pwd, "typeid": typeid, "image": b64} result = json.loads(requests.post("http://api.ttshitu.com/predict", json=data).text) if result['success']: return result["data"]["result"] else: return result["message"] return "" # 我们再封装一下,只需要传待识别图片地址和类型 def getImgCodeText(imgPath, imgType):# 直接返回验证码内容 #imgPath:验证码图片地址 #imgType:验证码图片类型 result = base64_api(uname='Roy', p='xxx', img=imgPath, typeid=imgType) return result headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36' } # 将验证码图片请求后保存到本地 login_url = 'https://so.gushiwen.cn/user/login.aspx?from=http://so.gushiwen.cn/user/collect.aspx' page_text = requests.get(url=login_url, headers=headers).text tree = etree.HTML(page_text) img_src = 'https://so.gushiwen.cn'+tree.xpath('//*[@id="imgCode"]/@src')[0] code_data = requests.get(url=img_src,headers=headers).content with open('./code.jpg','wb') as fp: fp.write(code_data) # 识别验证码图片内容 result = getImgCodeText('./code.jpg', 3) print(result)
- 我们先搞定验证码部分
- 开始模拟登陆
- 分析:要发起 post 请求传递登陆数据,携带 cookie 等 header 参数;输入信息,点击登陆,一般发起登录请求的 URL 是 loginxxx,找到它就能看到 post 应该带什么数据了
- 尝试发现登陆失败,原因分析:可能有动态参数;哪个是动态的呢?对比登陆前后 post 里面这几个参数,发现
__VIEWSTATE
是变的 - 动态参数怎么办?上面自动处理 cookie 可以借鉴,全程使用 session 对象发起请求,就可以保留上下文环境,再带上我们登陆前
session.get()
获取的动态参数,成功from lxml import etree import requests import demo1 headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36' } #创建session对象 session = requests.Session() #将验证码图片请求后保存到本地 login_url = 'https://so.gushiwen.cn/user/login.aspx?from=http://so.gushiwen.cn/user/collect.aspx' page_text = session.get(url=login_url, headers=headers).text tree = etree.HTML(page_text) img_src = 'https://so.gushiwen.cn'+tree.xpath('//*[@id="imgCode"]/@src')[0] code_data = session.get(url=img_src, headers=headers).content with open('./code.jpg', 'wb') as fp: fp.write(code_data) # 并解析出动态变化的请求参数 __VIEWSTATE = tree.xpath('//*[@id="__VIEWSTATE"]/@value')[0] # 识别验证码图片内容 result = demo1.getImgCodeText('./code.jpg', 3) print(result) #模拟登录 url = 'https://so.gushiwen.cn/user/login.aspx?from=http%3a%2f%2fso.gushiwen.cn%2fuser%2fcollect.aspx' data = { "__VIEWSTATE": __VIEWSTATE, "__VIEWSTATEGENERATOR": "C93BE1AE", "from": "http://so.gushiwen.cn/user/collect.aspx", "email": "xxx", "p": "xxx", "code":result, "denglu": "登录" } #获取了登录成功后的页面源码数据 login_page_text = session.post(url=url, headers=headers, data=data).text with open('wushiwen.html', 'w', encoding='utf-8') as fp: fp.write(login_page_text)
- 分析:要发起 post 请求传递登陆数据,携带 cookie 等 header 参数;输入信息,点击登陆,一般发起登录请求的 URL 是 loginxxx,找到它就能看到 post 应该带什么数据了
- 打开保存的网页文件,能看到登录成功后的界面
防盗链
- 以图片为例,访问图片要从提供图片的网站才可以,直接访问图片地址不行
- 以抓取微博的图片为例,我们点进去某个图集
- 通过 network 查看网页源码发现需要解析
real_src
的值,但直接访问发现图片不能显示,说明做了防盗链 - 怎么破解呢?还是在 Header 中找原因,会发现,我们少了
Referer
请求头import requests from lxml import etree headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36', "Referer": "http://blog.sina.com.cn/", # "Referer": "https://blog.sina.com.cn/s/blog_4a793a200102z4rk.html?tj=1", } url = 'https://blog.sina.com.cn/s/blog_4a793a200102z4rk.html?tj=1' page_text = requests.get(url, headers=headers).text tree = etree.HTML(page_text) img_src = tree.xpath('//*[@id="sina_keyword_ad_area2"]/p/a/img/@real_src') for src in img_src: img_name = src.split('/')[-1]+'.jpg' data = requests.get(src, headers=headers).content # 下载 with open(img_name, 'wb') as fp: fp.write(data)
- 通过 network 查看网页源码发现需要解析
异步爬虫
- 我们用 flask 自己搭个服务器做测试,不要直接搞别人网站
- 学习 flask 的高质量文章,看一遍基本就够用了
- 启动服务器
from flask import Flask,render_template from time import sleep #1.实例化app对象 app = Flask(__name__) #装饰器中的参数就是路由地址 @app.route('/main') def main():#视图函数 sleep(2) return 'i am main' #一旦启动了服务器后,在浏览器中访问路由地址,在服务器端就会执行视图函数 @app.route('/roy') def index1(): sleep(2) return render_template('test.html') @app.route('/allen') def index2(): sleep(2) return render_template('test.html') @app.route('/jack') def index3(): sleep(2) return render_template('test.html') if __name__ == "__main__": app.run()
- 直接请求
import requests import time start = time.time()#程序执行开始的时间 urls = ['http://127.0.0.1:5000/roy', 'http://127.0.0.1:5000/allen', 'http://127.0.0.1:5000/jack'] def get_request(url): page_text = requests.get(url=url).text print(len(page_text)) for url in urls: #发起了三次请求 get_request(url) print('总耗时:',time.time()-start) #程序运行的总耗时 6.04
- 多线程
import requests import time from threading import Thread #线程模块 start = time.time()#程序执行开始的时间 urls = ['http://127.0.0.1:5000/roy', 'http://127.0.0.1:5000/allen', 'http://127.0.0.1:5000/jack'] def get_request(url): page_text = requests.get(url=url).text # print(len(page_text)) ts = [] for url in urls: t = Thread(target=get_request, args=(url,)) # tuple ts.append(t) t.start() for t in ts: # 让主线程等所有的子线程结束后再结束 t.join() print('总耗时:',time.time()-start) #程序运行的总耗时 2.01
- 线程池,减少创建和销毁线程的频率
import requests import time from multiprocessing.dummy import Pool #线程池模块 start = time.time()#程序执行开始的时间 urls = ['http://127.0.0.1:5000/roy', 'http://127.0.0.1:5000/allen', 'http://127.0.0.1:5000/jack'] def get_request(url): page_text = requests.get(url=url).text print(len(page_text)) pool = Pool(3) # 池子里有3个初始化好的线程 pool.map(get_request, urls) # func args print('总耗时:',time.time()-start) # 程序运行的总耗时 2.03
- 但多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈
协程
- 爬虫是 IO 密集型操作,因为需要下载数据
- 但对于 IO 阻塞,无论是多进程还是多线程,都会被操作系统强行剥夺走 CPU 的执行权限
- CPU 执行其他操作,可能是我们程序的其他部分,也可能是其他的应用程序
- 我们程序的执行效率因此就降了下来
- 怎么处理呢?
- 我们在自己的应用程序检测到 IO 阻塞时,让 CPU 切换到我们自己程序的其他部分执行(这里的任务指的是当前我们自己程序表示的进程或线程中的某一组操作/子程序)
- 这样可以把我们程序的 IO 阻塞降到最低,程序处于就绪态的部分增多,操作系统便以为我们的程序是 IO 阻塞比较少的,从而会尽可能多的分配 CPU 给我们,也就达到了提升程序执行效率的目的
- 上面说的这个方案其实就是协程的原理
- 一个线程/进程可以表示一组指定行为的操作,这个操作可以由多个执行步骤组成,这些执行步骤有的是阻塞操作有的非阻塞操作,那么,当 cpu 遇到了阻塞的执行步骤时,如果不对其处理,则包含当前执行步骤的进程/线程就会被挂起进入到阻塞状态,且交出 cpu 的使用权
- 协程可以检测出它是阻塞的,且可以将 cpu 切换到我们自己程序非阻塞的执行步骤,则包含这些执行步骤的进程/线程就不会进入到阻塞状态,减少阻塞状态,增加就绪状态(牢牢抢占cup),极大幅度提升程序执行的效率
asyncio
- 在python3.5之后新增了
asyncio
模块,可以帮我们检测 IO- 只能是网络 IO 操作
- Python 3.4 开始,就加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 asyncio,使得协程的实现更加方便
- 先了解以下概念
- 特殊函数
- 函数定义前添加一个
async
关键字 - 特殊函数被调用后,没有被立即执行,而是会返回一个协程对象
- 注意:特殊函数内部,不可以出现不支持异步模块的代码,否则会中断整个异步效果
- 大部分模块都不支持异步,后续会做说明
- 函数定义前添加一个
- 协程
- 协程 = 协程对象 = 特殊的函数 = 一组指定形式的操作
- 任务
- 任务对象是一个高级的协程对象
task = asyncio.ensure_future(c)
- 任务对象 = 高级协程对象 = 一组指定形式的操作
- 高级在哪呢?可以给任务对象绑定一个回调函数,当任务对象被执行结束后,会立即调用这个回调函数
- 任务对象是一个高级的协程对象
- 事件循环
- 事件循环对象(Event Loop),可以将其当做是一个容器,该容器是用来装载任务对象的
- 启动事件循环对象,则其内部的任务对象对应的相关操作就会被立即执行
loop = asyncio.get_event_loop()
,loop.run_until_complete(task)
- 特殊函数
- 看段代码串一下概念
import asyncio import time #特殊函数 async def get_request(url): print('正在请求的网址是:',url) time.sleep(2) print('请求网址结束!') return 123 #回调函数:必须有一个参数 def t_callback(t): # 参数t就是任务对象 data = t.result() # result()函数就可以返回特殊函数内部的返回值 print('我是任务对象的回调函数!,获取到特殊函数的返回值为:', data) #协程对象 c = get_request('www.1.com') #任务对象 task = asyncio.ensure_future(c) #给任务对象绑定回调函数 task.add_done_callback(t_callback) #事件循环对象 loop = asyncio.get_event_loop() loop.run_until_complete(task)
wait()
函数- 给任务列表中的每一个任务对象赋予一个可被挂起的权限
- 当 CPU 执行的任务对象遇到阻塞操作的时候,当前任务对象就会被挂起,可以执行其他任务对象,提高整体程序运行的效率
await
关键字- 执行挂起操作;凡是阻塞操作的前面都必须加上 await 关键字进行修饰
- 完整的多任务异步操作
import asyncio import time start = time.time() urls = [ 'www.a.com','www.b.com','www.c.com' ] async def get_request(url): print('正在请求:',url) # time.sleep(2) # time模块不支持异步 await asyncio.sleep(2) # 支持异步,阻塞操作前加 await print('请求结束:',url) if __name__ == '__main__': # 三个任务对象 tasks = [] for url in urls: c = get_request(url) task = asyncio.ensure_future(c) tasks.append(task) # 一个事件循环对象 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:',time.time()-start)
- 可以发现,异步爬虫是比较麻烦的
- 本人认为,应该把握住阻塞操作这个关键点,在爬虫中使用异步,也是为了把阻塞操作异步执行,上面用 sleep 模拟阻塞,真实情况一般是下载某个东西
- 核心在于特殊函数的编写,全部支持异步+关键字;一般在这里发起下载请求(也是阻塞所在)
- 最后是回调函数和异步的固定流程
- demo:异步爬虫
- 在每一个with前加上
async
关键字 - 在阻塞操作前加上
await
关键字import requests import asyncio import time from lxml import etree import aiohttp start = time.time() urls = [ 'http://127.0.0.1:5000/roy', 'http://127.0.0.1:5000/allen', 'http://127.0.0.1:5000/jack' ] # 特殊函数,不能有不支持异步的代码 async def get_request(url): # requests是不支持异步的模块,需要使用 aiohttp # response = requests.get(url=url) # page_text = response.text # 创建请求对象(sess) async with aiohttp.ClientSession() as sess: # 基于请求对象发起请求 # get请求,常用参数:url,headers,params,proxy # post请求,常用参数:url,headers,data,proxy(注意),此处代理使用proxy='http://ip:port' # 这里的阻塞操作就是下载页面 async with await sess.get(url=url) as response: #text():获取字符串形式的响应数据 #read():获取二进制形式的响应数据 page_text = await response.text() return page_text # 回调函数,专门用于数据解析,正常操作,和异步无关了 def parse(t): #获取任务对象请求到的页面数据 page_text = t.result() tree = etree.HTML(page_text) a = tree.xpath('//a[@id="feng"]/@href')[0] print(a) tasks = [] for url in urls: c = get_request(url) # 协程对象 task = asyncio.ensure_future(c) # 任务对象 task.add_done_callback(parse) # 绑定回调 tasks.append(task) # 任务对象列表 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # 加入事件循环对象 # 直接运行 print('总耗时:',time.time()-start)
- 在每一个with前加上
- 简单来说,协程是一个用户态轻量级线程,因为 IO 操作不需要占用 CPU,它可以绕过 IO 执行其他代码,让 IO 和其他操作并行,充分利用CPU,也就提升了爬虫效率
- 也可以通过
gevent
第三方库方便的实现异步 - 异步的确能提升效率,但是不必花太多时间研究,使用起来比较复杂,也会大概率触发反爬机制,还要花时间破解
- 思考:协程能使用多核吗?
- 不能
- python 的 CPython 解释器有 GIL(不允许进程内的线程并行),所以协程是基于单线程运行的
- 也就是说绕过 IO 并不是切到另一个线程执行其他操作,还是在当前线程使用 CPU,这也就是为什么说迷惑了 CPU 让它以为这个线程是计算密集型操作,能分配更多时间片给这个线程
- 协程基于某个线程操作,因此也被称为微线程
- 为了避免 GIL 的影响,我们能否在多进程使用异步呢?也就是 多进程+单线程(协程),当然可以,你需要了解一下 gevent 了,推荐文章;但要注意控制进程数量,还不能使用进程池(有冲突),可以通过控制每个进程处理任务的数量来控制进程数
- 练习
- 尝试使用高性能异步库 uvloop 加速
M3U8
- M3U8 是流视频数据文件,在此之前先了解下 HLS 技术
HLS
- 现在大部分视频客户端都采用 HTTP Live Streaming 技术(Apple为了提高流播效率开发的技术),而不是直接播放MP4等视频文件
- HLS技术的特点是将流媒体切分为若干TS片段(比如几秒一段),然后通过 M3U8列表文件 将这些TS片段批量下载供客户端播放器实现实时流式播放
- 因此,在爬取HLS的流媒体文件的思路一般是
- 先【下载M3U8文件】并分析其中内容,类似于种子/Metadata
- 然后批量下载文件中定义的【TS片段】
- 最后将其【组合】成mp4文件或者直接保存 TS 片段
- 说着简单,其实在实际操作中,会遇到很多复杂的问题,例如 m3u8 文件下载不下来,ts 片段文件被加密了,甚至加密 TS 片段的密钥也被加密了
- M3U8 文件初识
- 标识(关键字)
#EXTM3U:每个M3U文件第一行必须是这个tag标识。(简单了解) #EXT-X-VERSION:版本,此属性可用可不用。(简单了解) #EXT-X-TARGETDURATION:目标持续时间,是用来定义每个TS的【最大】duration(持续时间)。(简单了解) #EXT-X-ALLOW-CACHE是否允许允许高速缓存。(简单了解) #EXT-X-MEDIA-SEQUENCE定义当前M3U8文件中第一个文件的序列号,每个ts文件在M3U8文件中都有固定唯一的序列号。(简单了解) #EXT-X-DISCONTINUITY:播放器重新初始化(简单了解) #EXT-X-KEY定义加密方式,用来加密的密钥文件key的URL,加密方法(例如AES-128),以及IV加密向量。(记住) #EXTINF:指定每个媒体段(ts文件)的持续时间,这个仅对其后面的TS链接有效,每两个媒体段(ts文件)间被这个tag分隔开。(简单了解) #EXT-X-ENDLIST表明M3U8文件的结束。(简单了解)
- 该文件中有大量的 ts 文件的链接地址,这个就是我们之前描述的真正的视频文件,可以单独播放
- demo
#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:19 #EXT-X-ALLOW-CACHE:YES #EXT-X-MEDIA-SEQUENCE:0 #EXT-X-KEY:METHOD=AES-128,URI="https://edu.aliyun.com/hls/1109/clef/YnBGq7zAJf1Is7xIB5v8vI7AIORwwG9W",IV=0x0fe82567a6be41afda68d82d3724976a #EXTINF:8.583, https://xuecdn2.aliyunedu.net/headLeader-0/20170519032524-ggauw1x00qo0okgk-conv/e_20170519032524-ggauw1x00qo0okgk-conv_hd_seg_0.ts #EXT-X-DISCONTINUITY #EXT-X-KEY:METHOD=AES-128,URI="https://edu.aliyun.com/hls/2452/clef/0VqtrHq9IkTfOsLqy0iC1FP9342VZm1s",IV=0xdebe4353e61b56e4ecfe0240ca3f89f5 #EXTINF:10.080, https://xuecdn2.aliyunedu.net/courselesson-50224/20170630095028-3xsfwyxw20cgwws8-conv/e_20170630095028-3xsfwyxw20cgwws8-conv_hd_seg_0.ts #EXT-X-KEY:METHOD=AES-128,URI="https://edu.aliyun.com/hls/2452/clef/0VqtrHq9IkTfOsLqy0iC1FP9342VZm1s",IV=0x8a3ce90cf18587963953b948487c1729 #EXT-X-KEY:METHOD=AES-128,URI="https://edu.aliyun.com/hls/2452/clef/0VqtrHq9IkTfOsLqy0iC1FP9342VZm1s",IV=0x3f1c20b9dd4459d0adf972eaba85e0a2 #EXTINF:10.000, https://xuecdn2.aliyunedu.net/courselesson-50224/20170630095028-3xsfwyxw20cgwws8-conv/e_20170630095028-3xsfwyxw20cgwws8-conv_hd_seg_104.ts #EXT-X-ENDLIST
- M3U8 视频一般是不加密的,有些重要的视频服务商才会加密,目前的标准加密方式是使用
AES-128
,KEY
是密钥文件的下载地址,IV
是加密向量,如果没有 IV 值则使用 b"0000000000000000" 填充即可 - 关于加密不需要深入了解,只需要知道 KEY 和 IV 是解密所必须的,不带
EXT-X-KEY
那肯定没加密
- 标识(关键字)
爬取
- 先简单演示一下爬取流视频的步骤
- 首先,我们找个带加密的,找到视频的 m3u8 文件地址,操作步骤:
-
访问视频,开发者工具,在 Source 部分
-
这就是 m3u8 文件的地址,点击变量,复制链接,下载后发现这只是一级文件,具体的 ts 文件链接在二级
import requests from urllib.parse import urljoin import re import os # 需要安装. pip install pycryptodome from Crypto.Cipher import AES dirName = 'tsLib' if not os.path.exists(dirName): os.mkdir(dirName) headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36' } # 一级m3u8地址 m1_url = "https://vod11.bdzybf.com/20220127/5iTXjEev/index.m3u8" m1_page_text = requests.get(url=m1_url,headers=headers).text # print(m1_page_text) # 从一级m3u8文件中解析出二级m3u8地址 m1_page_text = m1_page_text.strip() #二级m3u8地址 m2_url = '' for line in m1_page_text.split('\n'): if not line.startswith('#'): m2_url = line #完整二级地址 m2_url m2_url = urljoin(m1_url,m2_url) #请求二级 m3u8 文件内容 m2_page_text = requests.get(url=m2_url,headers=headers).text m2_page_text = m2_page_text.strip() #解析出解密秘钥 key 的地址 key_url = re.findall('URI="(.*?)"', m2_page_text, re.S)[0] key_url = urljoin(m1_url,key_url) print(key_url) #请求key的地址,获取秘钥 # 注意:key和iv需要为 bytes 类型 key = requests.get(url=key_url, headers=headers).content iv = b"0000000000000000" #解析出每一个 ts 切片视频的地址 ts_url_list = [] for line in m2_page_text.split('\n'): if not line.startswith('#'): ts_url = line ts_url = urljoin(m1_url,ts_url) ts_url_list.append(ts_url) #请求到每一个ts切片的数据 for url in ts_url_list: #获取ts片段的数据 ts_data = requests.get(url=url,headers=headers).content #需要对ts片段数据进行解密(需要用到key和iv) aes = AES.new(key=key,mode=AES.MODE_CBC,iv=iv) desc_data = aes.decrypt(ts_data)#获取了解密后的数据 ts_name = url.split('/')[-1] ts_path = dirName+'/'+ts_name with open(ts_path,'wb') as fp: #需要将解密后的数据写入文件进行保存 fp.write(desc_data) print(ts_name, '下载保存成功!') #ts文件的合并,最好网上找专业的工具进行合并,自己手动合并会经常出问题
-
推荐个无加密的网站大家趴一趴
import requests import os dirName = 'tsLib' if not os.path.exists(dirName): os.mkdir(dirName) headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36' } # 获取 m3u8 文件 m1_url = "https://new.qqaku.com/20220821/Wzeoviw1/1100kb/hls/index.m3u8" m1_page_text = requests.get(url=m1_url,headers=headers).text # 解析出 ts 地址 m1_page_text = m1_page_text.strip() #解析出每一个 ts 切片视频的地址 ts_url_list = [] for line in m1_page_text.split('\n'): # 除了 ts 地址,其他都是#开头 if not line.startswith('#'): ts_url = line ts_url_list.append(ts_url) print("请求地址:", ts_url) #请求到每一个ts切片的数据 for url in ts_url_list: ts_data = requests.get(url=url,headers=headers).content ts_name = url.split('/')[-1] ts_path = dirName+'/'+ts_name with open(ts_path,'wb') as fp: #需要将解密后的数据写入文件进行保存 fp.write(ts_data) print(ts_name, '下载保存成功!') # 合并ts的软件一般都需要钱,而且,,,
-
- 上面的代码爬的很慢
- 测试看看异步能不能加速
import requests import os import asyncio import aiohttp dirName = 'tsLib' if not os.path.exists(dirName): os.mkdir(dirName) headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36' } # 获取 m3u8 文件 m1_url = "https://new.qqaku.com/20220821/Wzeoviw1/1100kb/hls/index.m3u8" m1_page_text = requests.get(url=m1_url,headers=headers).text # 解析出 ts 地址 m1_page_text = m1_page_text.strip() # 保存每一个 ts 切片视频的地址 ts_url_list = [] for line in m1_page_text.split('\n'): # 除了 ts 地址,其他都是#开头 if not line.startswith('#'): ts_url = line ts_url_list.append(ts_url) print("请求地址:", ts_url) #异步请求到每一个 ts 切片的数据 async def get_ts(url, i): # 可以先写框架,不考虑异步,再都换成异步代码,加上关键字 # trust_env=True 参数含义可以看文档 async with aiohttp.ClientSession(trust_env=True) as sess: async with await sess.get(url=url,headers=headers) as response: ts_data = await response.read() #获取byte形式的响应数据 return ts_data, url, i # tuple def download(t): _r = t.result() data = _r[0] url = _r[1] # ts文件的地址 ts_ord = _r[2] # 保证顺序 ts_name = str(ts_ord) + "-" + url.split('/')[-1] ts_path = dirName + '/' + ts_name with open(ts_path, 'wb') as fp: fp.write(data) print(ts_name, '下载保存成功!') tasks = [] # 任务对象列表 for i, url in enumerate(ts_url_list): # 传入序号,虽然下载 ts 是异步的,但这里是for,循环遍历入口,顺序不会错 c = get_ts(url, i) # 协程对象 task = asyncio.ensure_future(c) # 任务对象 task.add_done_callback(download) # 任务回调 tasks.append(task) loop = asyncio.get_event_loop() # 事件循环对象 loop.run_until_complete(asyncio.wait(tasks))
- 测试看看异步能不能加速
selenium
- 把这里的三篇文章看一看,selenium 的基操就没问题了
- 主要作用是完成一些 requests 不能做的事,因为它能轻松的拿到动态加载数据,所见即所得
- 它不是模拟浏览器,它是驱动浏览器,就像一双无形的手在那点(狗头)
- 但也就是说,它只能帮我们跨过某些特殊障碍(验证码需要点击图片,滑动滑块等),并不是爬虫,最终还是要回到 requests 才能大量请求
- 常见的用法有:输入文字时用
send_keys()
方法,清空文字时用clear()
方法,点击按钮时用click()
方法 - 元素定位方法
find_element_by_id() find_element_by_name() find_element_by_class_name() find_element_by_tag_name() find_element_by_link_text() find_element_by_xpath() find_element_by_css_selector()
- 执行 js
from selenium import webdriver from time import sleep #1.创建一个浏览器对象,executable_path指定当前浏览器的驱动程序 #注意:我当前是mac系统,驱动程序也是mac版本的,如果是window系统注意更换驱动 bro = webdriver.Chrome(executable_path='./chromedriver') #2.浏览器的请求发送 bro.get('https://www.jd.com/') #3.标签定位:调用find系列的函数进行标签定位 search_box = bro.find_element_by_xpath('//*[@id="key"]') #4.节点交互 search_box.send_keys('mac pro m1')#向指定标签中录入内容 sleep(2) btn = bro.find_element_by_xpath('//*[@id="search"]/div/div[2]/button') btn.click() #点击按钮 sleep(2) # js bro.execute_script('document.documentElement.scrollTo(0,2000)') sleep(5) #关闭浏览器 bro.quit()
- 动作链:有一些操作,它们没有特定的执行对象,比如鼠标拖拽、键盘按键等,这些动作用另一种方式来执行,那就是动作链
from selenium.webdriver import ActionChains from selenium import webdriver from time import sleep bro = webdriver.Chrome(executable_path='./chromedriver') bro.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable') sleep(1) #注意:如果定位的标签是存在于iframe表示的子页面中,则常规的标签定位报错 #处理:使用如下指定操作 bro.switch_to.frame('iframeResult') div_tag = bro.find_element_by_id('draggable') #实例化一个动作链对象且将该对象绑定到指定的浏览器中 action = ActionChains(bro) action.click_and_hold(div_tag) #对指定标签实现点击且长按操作 for i in range(5): action.move_by_offset(10,10).perform() #perform让动作链立即执行 sleep(0.5) sleep(3) bro.quit()
bilibili
- 登陆 B 站
- 用上面 模拟登陆 部分封装过的验证码接口,识别出后用动作链点击,完成登录(跳过模拟登陆)
from selenium import webdriver from selenium.webdriver import ActionChains from time import sleep import demo1 # 1.创建浏览器对象 bro = webdriver.Chrome(executable_path='./chromedriver') # 2.发起请求 login_url = 'https://passport.bilibili.com/login?from_spm_id=333.851.top_bar.login_window' bro.get(login_url) sleep(1) # 3.定位到指定标签填充用户名和密码 user_box = bro.find_element_by_xpath('//*[@id="login-username"]') user_box.send_keys('186') sleep(1) pwd_box = bro.find_element_by_xpath('//*[@id="login-passwd"]') pwd_box.send_keys('1234567890') sleep(1) login_btn = bro.find_element_by_xpath('//*[@id="geetest-wrap"]/div/div[5]/a[1]') login_btn.click() sleep(1) # 4.定位完整的验证码对话框 # 注意:在开发者工具中是可以定位到多个div表示验证码对话框的,因此将这几个div都定位到,以此去尝试 # 不一定是iframe的原因 code_tag = bro.find_element_by_xpath('/html/body/div[2]/div[2]/div[6]/div/div') sleep(1) #5.识别验证码(使用打码平台进行验证码识别) code_tag.screenshot('./code.png') #将验证码对话框截图保存 sleep(1) #使用图鉴接口识别 result = demo1.getImgCodeText('./code.png',27) #获取了识别的结果 # result = '154,251|145,167' # print(result) result_list = result.split('|') #result_list == ['154,251','145,167'] 坐标列表 # 6.根据识别出验证码的结果进行处理 for pos in result_list: x = int(pos.split(',')[0]) y = int(pos.split(',')[1]) # 使用调用链,因为要移动过去点击,而不是普通的定位某个element再点击,图片上没element ActionChains(bro).move_to_element_with_offset(code_tag,x,y).click().perform() # 立即执行 sleep(0.5) confirm_btn = bro.find_element_by_xpath('/html/body/div[2]/div[2]/div[6]/div/div/div[3]/a/div') confirm_btn.click() sleep(3) bro.quit()
- 对于 requests,我们跳过了模拟登陆,现在可以获取登录后的 cookies,开始请求登录后的页面
- 当然,这个 cookie 是有时效的,需要自动获取,之前说过用 session;这里在代码中解析并存在字典
- 注:这个时效说的不是一两分钟,而是比较长的时间后,之前复制粘贴过来的 cookie 失效,所以在每次执行爬虫前都自动获取,避免每次手动再去 CV;关键在自动
- 同时,这里需要将 jsonCookies 解析成浏览器携带的 cookie 形式(键值),不能是一大串字符
from selenium.webdriver import Chrome import time import json web = Chrome('./chromedriver') web.get('https://www.17k.com/') time.sleep(3) # 登录 web.find_element_by_xpath('//*[@id="header_login_user"]/a[1]').click() # 切换iframe iframe = web.find_element_by_xpath('/html/body/div[20]/div/div[1]/iframe') web.switch_to.frame(iframe) web.find_element_by_xpath('/html/body/form/dl/dd[2]/input').send_keys("15027900535") web.find_element_by_xpath('/html/body/form/dl/dd[3]/input').send_keys("bobo328410948") web.find_element_by_xpath('/html/body/form/dl/dd[5]/input').click() time.sleep(3) cookies = web.get_cookies() # 存文件里 with open("cookies.txt", mode="w", encoding='utf-8') as f: f.write(json.dumps(cookies)) # 组装cookie字典, 直接给requests用 dic = {} for cook in cookies: dic[cook['name']] = cook['value'] # 衔接. 把cookie直接怼进去 import requests #访问的书架(获取书架内容) url = "https://user.17k.com/ck/author/shelf?page=1&appKey=2406394919" headers = { 'cookie':dic # 这么传可能不行,得是字符串 } resp = requests.get(url,cookies=dic) # 这里可以直接传字典参数 print(resp.text) web.close()
- OK,现在通过 selenium,让我们能够轻松的绕过检测,获取信息
无头浏览器
- 就是无界面模式的浏览器,不重要
from selenium import webdriver from selenium.webdriver.chrome.options import Options import time # 创建一个参数对象,用来控制chrome以无界面模式打开 chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') # 驱动路径 path = 'chromedriver' # 创建浏览器对象 browser = webdriver.Chrome(executable_path=path, options=chrome_options) # 上网 url = 'http://www.baidu.com/' browser.get(url) time.sleep(3) browser.save_screenshot('baidu.png') browser.quit()
规避检测
- selenium 挺好用的,以致于现在不少大网站有对selenium采取了监测机制
- 正常情况下我们用浏览器访问淘宝等网站的
window.navigator.webdriver
的值为 undefined 或者为false。而使用selenium访问则该值为 true - 解决方案:JS 注入(开发者工具中执行某个 js 文件)
from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import Options chrome_options = Options() # 这个也是必要的 chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36') driver = Chrome('./chromedriver',options=chrome_options) # Selenium 在打开任何页面之前,先运行这个Js文件。 with open('./stealth.min.js') as f: js = f.read() # 进行 js 注入,绕过检测 #execute_cdp_cmd执行cdp命令(在浏览器开发者工具中执行相关指令,完成相关操作) #Page.addScriptToEvaluateOnNewDocument执行脚本 driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": js }) driver.get('https://www.taobao.com')
- 需要规避检测的地方挺多,比如 12306 登录时的滑块,这玩意还不能滑太快!
from selenium.webdriver import Chrome from selenium.webdriver import ActionChains from time import sleep from selenium.webdriver.chrome.options import Options chrome_options = Options() chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36') web = Chrome(executable_path='./chromedriver',options=chrome_options) web.get("https://kyfw.12306.cn/otn/resources/login.html") sleep(2) web.find_element('xpath', '//*[@id="toolbar_Div"]/div[2]/div[2]/ul/li[1]/a').click() web.find_element('xpath', '//*[@id="J-userName"]').send_keys("hehehe@126.com") sleep(1) web.find_element('xpath', '//*[@id="J-password"]').send_keys("111111") sleep(1) web.find_element('xpath', '//*[@id="J-login"]').click() sleep(5) # 需要用动作链完成多次拖动 action = ActionChains(web) # 找到滑块 btn = web.find_element('xpath', '//*[@id="nc_1_n1z"]') action.click_and_hold(btn) for i in range(7): action.move_by_offset(50,0).perform() sleep(0.5) sleep(5) web.close()
MySQL
- 爬下来的数据要持久化,必须借助各种数据库,最常用的便是 MySQL
- 这里推荐文章,最关键的还是要多练习
- 说几点注意吧
- 使用 group by 的查询字段必须是分组字段(或者是其他字段的聚合形式),否则会出错,想要获取其他字段信息,可以借助于聚合函数
select job_title from emp group by job_title select gender,avg(age) from emp group by gender
- where 是针对分组之前的字段内容进行过滤,而 having 是针对分组后的
select job_title,avg(salary) from emp GROUP BY job_title having job_title = 'sale'
- 多表联查比较常用
select * from emp inner join dep on emp.dep_id = dep.id -- 交集 select * from emp left join dep on emp.dep_id = dep.id -- 左表为主表 select * from emp right join dep on emp.dep_id = dep.id -- 例如 #示例1:找出年龄大于25岁的员工名字以及员工所在的部门名称 select d.name,e.name from emp as e inner join dep as d on e.dep_id = d.id where age > 25 #示例2:找出年龄大于25岁的员工名字以及员工所在的部门名称,并且以age字段的升序方式显示 select * from emp as e inner join dep as d on d.id = e.dep_id where age > 25 order by age
- 使用 group by 的查询字段必须是分组字段(或者是其他字段的聚合形式),否则会出错,想要获取其他字段信息,可以借助于聚合函数
- 当然,我们最后是使用 pymysql 操作
- 看个 CURD 的例子
import pymysql #1.创建链接对象 conn = pymysql.Connect( host='127.0.0.1',#数据库服务器主机地址 port=3306, #mysql的端口号 user='root', #数据库的用户名 p='123456', #数据库密码 db='test',#数据仓库的名称 charset='utf8') #创建一个游标对象 cusor = conn.cursor() #2.增加记录操作 # sql = 'insert into emp(name,sex,age,dep_id)values("%s","%s",%d,%d)'%('haha','female',20,200) # cusor.execute(sql) # conn.commit() #对数据进行整改后,记得进行事物的提交 #3.删除记录 # sql = 'delete from emp where name = "%s"'%'haha' # print(sql) # cusor.execute(sql) # conn.commit() #4.修改操作 # new_age = input('enter a new age:') # new_age = int(new_age) # sql = 'update emp set age = %d where id = 3'%new_age # print(sql) # cusor.execute(sql) # conn.commit() #查询操作 sql = 'select * from emp where age > 30' cusor.execute(sql) #负责执行sql语句 #fetchall返回的是一个元组,元组元素又为一个元素,该元组中存储的是查询到的一条记录 # all_data = cusor.fetchall() #获取查询到所有的数据,如果没有查询到数据返回一个空元组 # print(all_data) #fetchone只会返回查询到的第一条数据 one_data = cusor.fetchone() #如果没有查询到数据返回None print(one_data) #关闭打开的资源对象 cusor.close() conn.close()
- 看个 CURD 的例子
MongoDB
- 参考这篇文章即可
Redis
- 参考这篇文章即可
- 基础进阶部分未完待续…