执行JS的类库:execjs,PyV8,selenium,node
pip list
pip install selenium
pip install xlrd
pip install xlwt
pip install PyExecJS
pip install xlutils
selenium测试工具可以用来模拟用户浏览器的操作,其支持的浏览器有:PhantomJS,Firefox,Chrome等等,开发者可以根据当前的系统形式选择不同的模拟浏览器
每种模拟浏览器都需要对应的浏览器驱动(一个以.exe为后缀的可执行文件),使用谷歌浏览器Chrome,对应的浏览器驱动可以通过下面的网址下载。要完整地安装Python-Selenium库,让Chrome浏览器实现自动化,需要完成下面4步:Chromedriver安装、Selenium库安装、测试、关闭Chrome浏览器自动更新。
ChromeDriver - WebDriver for Chrome - Downloads
如果您使用的是Chrome 115或更新版本,请参阅Chrome测试可用性仪表板。此页面为特定的ChromeDriver版本下载提供了方便的JSON端点。
# -*- coding: utf-8 -*-
"""
Created on Thu Feb 24 16:10:55 2024
@author: Administrator
"""
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import random
import time
from selenium.webdriver.chrome.options import Options
import threading
from datetime import datetime
from queue import Queue
class ShellChrome(object):
def __init__(self,count):
self.count = count
self.que = Queue(maxsize = count)
ua = self.getheaders()
#mobile_emulation = {"deviceName": "Nexus 7"}
self.options = webdriver.ChromeOptions()
# 把Chrome设置成可视化无界面模式,windows/Linux 皆可
self.options.add_argument('headless')
# 转换手机模式
#self.options.add_experimental_option("mobileEmulation", mobile_emulation)
# 全屏启动,无地址栏
self.options.add_argument('kiosk')
# 设置默认编码为 utf-8,也就是中文
self.options.add_argument('lang=zh_CN.UTF-8')
# 禁用图片加载 提升速度
self.options.add_argument('blink-settings=imagesEnabled=false')
# 隐身模式
self.options.add_argument('incognito')
# 自动打开开发者工具
self.options.add_argument("auto-open-devtools-for-tabs")
# 启动时,不激活(前置)窗口
#self.options.add_argument('no-startup-window')
# 设置窗口启动位置(左上角坐标)
self.options.add_argument('window-position=100,100')
# 禁用gpu渲染 规避bug
self.options.add_argument('disable-gpu')
# 以最高权限运行
self.options.add_argument('--no-sandbox')
# 禁用JavaScript
self.options.add_argument("--disable-javascript")
# 设置开发者模式启动,该模式下webdriver属性为正常值
self.options.add_experimental_option('excludeSwitches', ['enable-automation'])
# 禁用浏览器弹窗
prefs = {
'profile.default_content_setting_values' : {
'notifications' : 2
}
}
self.options.add_experimental_option('prefs',prefs)
# 添加ua
self.options.add_argument('user-agent=' + ua)
self.service = Service('D:\chromedriver.exe')
self.options.binary_location = "C:/Program Files/Google/Chrome/Application/chrome.exe"
# 特别注意,windows下要带.exe
self.driver = webdriver.Chrome(options=self.options,service=self.service)
# self.driver.maximize_window()
# 根据桌面分辨率来定,主要是为了抓到验证码的截屏
self.driver.set_window_size(960, 800)
self.elements = []
def threadFunc(self):
#print("正在打开页面...")
try:
self.product()
self.consume()
except Exception as e:
#打印异常堆栈信息
print(e)
#print("正在关闭页面...")
self.driver.quit()
def product(self):
url = ['https://m.baidu.com/',
'https://m.baidu.com/',
'https://m.baidu.com/',]
for i in range(self.count):
# 让浏览器不要显示当前受自动化测试工具控制的提醒
self.driver.get(url[i])
#设置隐式等待
self.driver.implicitly_wait(10) #加载等待最长10秒
time.sleep(4)
self.que.put(self.driver.title)
self.elements = self.driver.find_elements(by=By.CLASS_NAME, value='detail')
for k in range(len(self.elements)):
self.elements[k].click()
time.sleep(1)
# print('先进先出队列:{0};是否为空:{1};队列大小:{2};是否满:{3}'.format(self.que.queue,self.que.empty(),self.que.qsize(),self.que.full()))
def consume(self):
for i in range(self.count):
temp = self.que.get()
#print(temp)
#print(temp.current_url)
#print(temp.window_handles)
self.que.task_done()
def getheaders(self):
user_agent_list = ['Mozilla/5.0 (Windows NT 6.2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1464.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.16 Safari/537.36',
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36',
'Mozilla/5.0 (X11; CrOS i686 3912.101.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1667.0 Safari/537.36',
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:17.0) Gecko/20100101 Firefox/17.0.6',
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1468.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2224.3 Safari/537.36',
'Mozilla/5.0 (X11; CrOS i686 3912.101.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36']
UserAgent=random.choice(user_agent_list)
return UserAgent
if __name__ == '__main__':
startTime = time.perf_counter()
print(datetime.now())
#time.process_time()
#time.default_timer()
#time.perf_counter()
count = int(input('请输入队列数:'))
max = 10000
i = 1
while i < max:
try:
threads = []
for _ in range(count): # 循环创建10个线程
lianghua = ShellChrome(count)
t = threading.Thread(target=lianghua.threadFunc)
threads.append(t)
t.setDaemon(True) # 给每个子线程添加守护线程
for t in threads: # 循环启动10个线程
t.start()
for t in threads:
t.join(4) # 设置子线程超时4秒
except Exception as e:
#打印异常堆栈信息
print(e)
i+=1
endTime = time.perf_counter()
print(int( (endTime-startTime) * 1000) / 1000)
print(datetime.now())