官方仓库地址:https://github.com/miyakogi/pyppeteer
官方文档地址:API Reference — Pyppeteer 0.0.25 documentation
Selenium环境的相关配置比较繁琐,此外,有的网站会对selenium和webdriver进行识别和反爬,因此在这里介绍一下它的替代产品Pyppeteer。
Pyppeteer 就是依赖于 Chromium 这个浏览器来运行的。如果第一次运行的时候,Chromium 浏览器没有安装,那么程序会帮我们自动安装和配置,就免去了繁琐的环境配置等工作。另外 Pyppeteer 是基于 Python 的新特性 async 实现的,所以它的一些执行也支持异步操作,效率相对于 Selenium也有所提高。
安装
pip3 install pyppeteer
测试代码:
import asyncio
from pyppeteer import launch
async def main():
# 创建浏览器对象
browser = await launch(headless=False,args=['--disable-infobars'])
# 打开新的标签页
page = await browser.newPage()
#设置视图大小
await page.setViewport({'width':1366,'height':768})
#设置UserAgent
await page.setUserAgent('Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36')
# 访问页面
response = await page.goto('https://www.baidu.com')
#获取status、headers、url
print(response.status)
print(response.headers)
print(response.url)
#获取当前页标题
print(await page.title())
#获取当前页内容
print(await page.content()) #文本类型
# print(await response.text())
#cookie操作
print(await page.cookies()) #获取cookie,[{'name':xx,'value':xxx...},...]
# page.deleteCookie() 删除cookie
# page.setCookie() 设置cookie
#定位元素
#1、只定位一个元素(css选择器)
# element = await page.querySelector('#s-top-left > a')
#2、css选择器
elements = await page.querySelectorAll('#s-top-left > a:nth-child(2n)')
#3、xpath
# elements = await page.xpath('//div[@id="s-top-left"]/a')
for element in elements:
print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容
print(await (await element.getProperty('href')).jsonValue())#获取href属性
#模拟输入和点击
await page.type('#kw','中国',{'delay':1000}) #模拟输入,输入时间:1000 ms
await asyncio.sleep(2)
await page.click('#su') #模拟点击,也可以先定位元素,然后await element.click()
await asyncio.sleep(2)
#执行js,滚动页面到底部
await page.evaluate('window.scrollTo(0,document.body.scrollHeight);')
#截图
await page.screenshot({'path':'baidu.png'})
await asyncio.sleep(5)
await browser.close() #关闭浏览器
asyncio.get_event_loop().run_until_complete(main())
主要功能介绍
1.打开浏览器
调用 launch 方法即可,相关参数介绍:
ignoreHTTPSErrors (bool): 是否要忽略 HTTPS 的错误,默认是 False。
headless (bool): 是否启用无界面模式,默认为 True。如果 devtools 这个参数是 True 的话,那么该参数就会被设置为 False。
executablePath (str): 可执行文件的路径,如果指定之后就不需要使用默认的 Chromium 了,可以指定为已有的 Chrome 或 Chromium。
args (List[str]): 在执行过程中可以传入的额外参数。
slowMo (int|float): 设置这个参数可以延迟pyppeteer的操作,单位是毫秒.
userDataDir (str): 即用户数据文件夹,即可以保留一些个性化配置和操作记录。
devtools (bool): 是否为每一个页面自动开启调试工具,默认是 False。如果为 True,那么headless参数会被强制设置为 False。
2.关闭提示条:”Chrome 正受到自动测试软件的控制”
browser = await launch(headless=False, args=['--disable-infobars'])
3.代理设置
proxy = 'http://具体代理'
browser = await launch(headless=False, args=['--disable-infobars', f'--proxy-server={proxy}'])
4.开启无痕模式
# 创建浏览器对象
browser = await launch(headless=False, args=['--disable-infobars'])
# 开启无痕模式
context = await browser.createIncognitoBrowserContext()
# 打开新的标签页
page = await context.newPage()
5.设置窗口大小
width, height = 1366, 768
await page.setViewport({'width': width, 'height': height})
6.设置UserAgent
await page.setUserAgent('xxx')
7.执行JS脚本:调用page.evaluate()方法
await page.evaluate('window.scrollTo(0,document.body.scrollHeight);')
#滚动页面到底部
8.规避webdriver检测
import asyncio
from pyppeteer import launch
async def main():
browser = await launch(headless=False, args=['--disable-infobars'])
page = await browser.newPage()
await page.goto('https://login.taobao.com/member/login.jhtml?redirectURL=https://www.taobao.com/')
await page.evaluateOnNewDocument(
'''() =>{ Object.defineProperties(navigator,{ 'webdriver':{ get: () => false } }) }''')
await page.evaluateOnNewDocument(
'''() =>{ Object.defineProperties(navigator,{ 'languages':{ get: () => ['en-US', 'en'] } }) }''')
await page.evaluateOnNewDocument(
'''() =>{ Object.defineProperties(navigator,{ 'plugins':{ get: () => [1, 2, 3, 4, 5] } }) }''')
await asyncio.sleep(100)
asyncio.get_event_loop().run_until_complete(main())
9.多个page页面选项卡操作
# 新建选项卡1
page1 = await browser.newPage()
await page1.goto('https://www.baidu.com/')
await asyncio.sleep(2)
# 新建选项卡2
page2 = await browser.newPage()
await page2.goto('https://www.zhihu.com/')
# 查看所有选项卡
pages = await browser.pages() # 含第一个空白页,总共3页
await pages[1].bringToFront() # 切换到第2页,即百度
10.模拟输入和点击
await page.type(selector, text, {"delay":100}) #模拟输入,输入每个字符的间隔时间100 ms
await asyncio.sleep(2)
await page.click(selector) #模拟点击
await asyncio.sleep(2)
11.鼠标移动和按下松开操作
await page.hover(selector) #鼠标移动到某个元素上
await page.mouse.down() #按下鼠标
await page.mouse.move(2000, 0, {'delay': random.randint(1000, 2000)}) #移动鼠标
await page.mouse.up() #松开鼠标
12.定位元素、获取元素文本内容和属性值
page.querySelector(selector)#只匹配第一个元素
element = await page.querySelector('#s-top-left > a')
print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容
print(await (await element.getProperty('href')).jsonValue())#获取href属性
page.querySelectorAll(selector)#css选择器
elements = await page.querySelectorAll('#s-top-left > a:nth-child(2n)')
for element in elements:
print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容
print(await (await element.getProperty('href')).jsonValue())#获取href属性
page.xpath(expression)#xpath
elements = await page.xpath('//div[@id="s-top-left"]/a')
for element in elements:
print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容
print(await (await element.getProperty('href')).jsonValue())#获取href属性
page.jeval(selector,pageFunction)#定位元素,并调用js函数去执行
print(await page.Jeval('#s-top-left > a:first-child','node => node.textContent') ) #获取文本内容
print(await page.Jeval('#s-top-left > a:first-child','node => node.href') ) #获取href属性
13.延时等待,通过各种等待方法,可以控制页面的加载情况
page.waitForSelector() # 等待符合Selector的节点加载出来,否则直到超时
page.waitForXPath() # 等待符合Xpath的节点加载出来
page.waitForFunction() # 等待某个JS方法执行完毕并返回结果
page.waitFor() # 通用等待方式,如果是数字,则表示等待具体时间(毫秒),其它也可以是Selector、Xpath、Function字符串
page.waitForRequest() # 等待请求出现(url或者函数)
page.waitForResponse() # 等待响应内容出现(url或者函数)
page.waitForNavigation() # 等待页面跳转,如果没加载出来就报错,比如前面使用await page.click('某个链接'),后面使用该等待
14.请求拦截器,对请求进行过滤等操作
可以通过page.setRequestInterception(True)来开启拦截器,然后自定义拦截规则
对Request的拦截有3个固定的常用方法:
Request.continue_()
不传参数,则保持请求
传入参数overrides,则跳转,该参数是一个字典{'url': 'xx', 'method': '', 'postData': '', 'headers': ''}
Request.abort()
停止请求,可以起过滤作用(比如不显示图片)
Request.respond({"body": "响应内容"})
用给定的响应内容完成请求(比如替换JS文件内容时)
import asyncio
from pyppeteer import launch
async def main():
browser = await launch(headless=False, args=['--disable-infobars'])
browser = await browser.createIncognitoBrowserContext()
page = await browser.newPage()
# 开启请求拦截器
await page.setRequestInterception(True)
# 设置请求拦截器
page.on('request', lambda req: asyncio.ensure_future(intercept_request(req)))
# 设置响应拦截器
page.on('response', lambda response: asyncio.ensure_future(intercept_response(response)))
await page.goto('https://www.baidu.com/')
print(await page.title())
await asyncio.sleep(3)
await page.goto("https://spa6.scrape.center/")
await asyncio.sleep(3)
await browser.close()
# 请求拦截器
async def intercept_request(req):
url = req.url
if url == 'https://fanyi.baidu.com/':
# 用给定内容响应请求
await req.respond({'status': 200, 'body': 'welcome to new page'})
elif url == 'https://www.baidu.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png':
# 停止请求
print('已过滤该图片')
await req.abort()
elif url == 'https://www.qq.com/':
# 跳转请求
await req.continue_({'url': 'https://www.tencent.com/zh-cn/', 'method': 'GET'})
else:
# 保持请求
await req.continue_()
# 响应拦截器
async def intercept_response(response):
if response.status == 200 and response.url == 'https://www.baidu.com/':
text = await response.text()
print(text)
if '/api/movie' in response.url and response.status == 200:
json_data = await response.json()
print(json_data)
asyncio.get_event_loop().run_until_complete(main())
15.针对frame操作
page.frames获取页面中的所有frames列表,对于每一个frame操作,和page操作一致
page.mainFrame获取当前页面的主frame
frame_list = page.frames #获取所有frame
#获取当前页面的标题,下面3个效果一样
print(await frame_list[0].title())
print(await page.mainFrame.title())
print(await page.title())
实战案例
1.爬取京东商城
import requests
from bs4 import BeautifulSoup
from pyppeteer import launch
import asyncio
def screen_size():
"""使用tkinter获取屏幕大小"""
import tkinter
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def main(url):
# browser = await launch({'headless': False, 'args': ['--no-sandbox'], })
browser = await launch({'args': ['--no-sandbox'], })
page = await browser.newPage()
width, height = screen_size()
await page.setViewport(viewport={"width": width, "height": height})
await page.setJavaScriptEnabled(enabled=True)
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
await page.goto(url)
# await asyncio.sleep(2)
await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
await asyncio.sleep(1)
# content = await page.content()
li_list = await page.xpath('//*[@id="J_goodsList"]/ul/li')
# print(li_list)
item_list = []
for li in li_list:
a = await li.xpath('.//div[@class="p-img"]/a')
detail_url = await (await a[0].getProperty("href")).jsonValue()
promo_words = await (await a[0].getProperty("title")).jsonValue()
a_ = await li.xpath('.//div[@class="p-commit"]/strong/a')
p_commit = await (await a_[0].getProperty("textContent")).jsonValue()
i = await li.xpath('./div/div[3]/strong/i')
price = await (await i[0].getProperty("textContent")).jsonValue()
em = await li.xpath('./div/div[4]/a/em')
title = await (await em[0].getProperty("textContent")).jsonValue()
item = {
"title": title,
"detail_url": detail_url,
"promo_words": promo_words,
'p_commit': p_commit,
'price': price
}
item_list.append(item)
# print(item)
# break
# print(content)
await page_close(browser)
return item_list
async def page_close(browser):
for _page in await browser.pages():
await _page.close()
await browser.close()
msg = "手机"
url = "https://search.jd.com/Search?keyword={}&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq={}&cid2=653&cid3=655&page={}"
task_list = []
for i in range(1, 6):
page = i * 2 - 1
url = url.format(msg, msg, page)
task_list.append(main(url))
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*task_list))
# print(results, len(results))
for i in results:
print(i, len(i))
# soup = BeautifulSoup(content, 'lxml')
# div = soup.find('div', id='J_goodsList')
# for i, li in enumerate(div.find_all('li', class_='gl-item')):
# if li.select('.p-img a'):
# print(li.select('.p-img a')[0]['href'], i)
# print(li.select('.p-price i')[0].get_text(), i)
# print(li.select('.p-name em')[0].text, i)
# else:
# print("#" * 200)
# print(li)
2.爬取淘宝网
import asyncio
import time
from pyppeteer.launcher import launch
from alifunc import mouse_slide, input_time_random
from exe_js import js1, js3, js4, js5
def screen_size():
"""使用tkinter获取屏幕大小"""
import tkinter
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def main(username, pwd, url):
browser = await launch({'headless': False, 'args': ['--no-sandbox'], }, userDataDir='./userdata',
args=['--window-size=1366,768'])
page = await browser.newPage()
width, height = screen_size()
await page.setViewport(viewport={"width": width, "height": height})
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
await page.goto(url)
await page.evaluate(js1)
await page.evaluate(js3)
await page.evaluate(js4)
await page.evaluate(js5)
pwd_login = await page.querySelector('.J_Quick2Static')
# print(await (await pwd_login.getProperty('textContent')).jsonValue())
await pwd_login.click()
await page.type('#TPL_username_1', username, {'delay': input_time_random() - 50})
await page.type('#TPL_password_1', pwd, {'delay': input_time_random()})
await page.screenshot({'path': './headless-test-result.png'})
time.sleep(2)
slider = await page.Jeval('#nocaptcha', 'node => node.style') # 是否有滑块
if slider:
print('出现滑块情况判定')
await page.screenshot({'path': './headless-login-slide.png'})
flag = await mouse_slide(page=page)
if flag:
print(page.url)
await page.keyboard.press('Enter')
await get_cookie(page)
else:
await page.keyboard.press('Enter')
await page.waitFor(20)
await page.waitForNavigation()
try:
global error
error = await page.Jeval('.error', 'node => node.textContent')
except Exception as e:
error = None
print(e, "错啦")
finally:
if error:
print('确保账户安全重新入输入')
else:
print(page.url)
# 可继续网页跳转 已经携带 cookie
# await get_search(page)
await get_cookie(page)
await page_close(browser)
async def page_close(browser):
for _page in await browser.pages():
await _page.close()
await browser.close()
async def get_search(page):
# https://s.taobao.com/search?q={查询的条件}&p4ppushleft=1%2C48&s={每页 44 条 第一页 0 第二页 44}&sort=sale-desc
await page.goto("https://s.taobao.com/search?q=气球")
await asyncio.sleep(5)
# print(await page.content())
# 获取登录后cookie
async def get_cookie(page):
res = await page.content()
cookies_list = await page.cookies()
cookies = ''
for cookie in cookies_list:
str_cookie = '{0}={1};'
str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
cookies += str_cookie
print(cookies)
# 将cookie 放入 cookie 池 以便多次请求 封账号 利用cookie 对搜索内容进行爬取
return cookies
if __name__ == '__main__':
username = 'username'
pwd = 'password'
url = "https://login.taobao.com/member/login.jhtml?spm=a21bo.2017.754894437.1.5af911d9qqVAb1&f=top&redirectURL=https%3A%2F%2Fwww.taobao.com%2F"
loop = asyncio.get_event_loop()
loop.run_until_complete(main(username, pwd, url))
3.针对iframe 的操作:page.frames 获取所有的 iframe 列表 需要判断操作的是哪一个 iframe 跟操作 page 一样操作。
from pyppeteer import launch
import asyncio
async def main(url):
w = await launch({'headless': False, 'args': ['--no-sandbox'], })
page = await w.newPage()
await page.setViewport({"width": 1366, 'height': 800})
await page.goto(url)
try:
await asyncio.sleep(1)
frame = page.frames
print(frame) # 需要找到是哪一个 frame
title = await frame[1].title()
print(title)
await asyncio.sleep(1)
login = await frame[1].querySelector('#switcher_plogin')
print(login)
await login.click()
await asyncio.sleep(20)
except Exception as e:
print(e, "EEEEEEEEE")
for _page in await w.pages():
await _page.close()
await w.close()
asyncio.get_event_loop().run_until_complete(main("https://i.qq.com/?rd=1"))
# asyncio.get_event_loop().run_until_complete(main("https://www.gushici.com/"))