说明
插件爬虫相当于二次爬虫,二次加工信息,因为大部分插件信息也是从正规网上去获取数据,这次列举helium插件爬虫案例,其他插件爬虫也是类似这个方式.
需求
1、⽤⾕歌浏览器,下载chrome extension:“Helium 10
2、登录helium10
3、打开
- 打开Amazon⾸⻚搜索women clothes https://www.amazon.com/s?
k=women+clothes&crid=F0IYFXRNCJHD&sprefix=women+clothes%2Caps%2C113&ref=n b_sb_noss_1
点击插件,点击Xray,得到如上图的弹窗。
- 针对这个表格⾥的每⼀⾏:
(1)记录所有的信息(序号,Product,ASIN,Brand,Price,Sales,Revenue,BSR,
Seller Country/Region, Fees, Active Sellers, Ratings, Reviews, Size Tier, Buy Box, Fulfillment, Dimensions, Weight, Creation Date)等信息
(2) 在Sales,BSR和Reviews旁边如果不是空值的话,会有⼀个图标
点进图标,选All Time:
下载CSV,以每个产品的ASIN命名csv⽂件。
⽐如:
B0CRKQ44NH_sales.csv, B0CRKQ44NH_bsr.csv, B0CRKQ44NH_review.csv
也就是说,针对每个产品,我想要得到⼀个总表和三个分表(Sales,BSR和Review Count)。遍历所有的women clothes产品(我⼀共需要10000个产品)。
代码
import time
from platforms.base_platform import ObjectPlatform
from util.xpath_operation import SeleniumOperation
from util.pd_util import PandasUtil
from urllib.parse import urlsplit
import ddddocr
import os
import sys
import datetime
'''
helium 谷歌插件爬虫
'''
class HeliumExtensionPlatform(ObjectPlatform):
name = "heliumextensionplatform"
describtion = ""
config_file = "%s%s" % (name, "_config")
setting_file = "%s%s" % (name, "_setting")
def __init__(self, config_file_input=None, setting_file_input=None, log=None):
super(HeliumExtensionPlatform, self).__init__(load_extension_2=True, login=False, logger=log)
if not self.driver:
print("启动失败...,请根据问题,重新启动")
sys.exit(1)
self.config_file = config_file_input if config_file_input else \
"%s%s" % (self.base_config_package, self.config_file)
self.setting_file = setting_file_input if setting_file_input else \
"%s%s" % (self.base_setting_package, self.setting_file)
self.config_package = __import__(self.config_file, fromlist=True)
self.setting_package = __import__(self.setting_file, fromlist=True)
self.log = log
SeleniumOperation.log = log
self.beans = {}
def before_run(self):
dependencies = self.config_package.basic_config["dependencies"]
for depend in dependencies:
self.log.info("加载依赖:{}....", depend)
platform_position = __import__(dependencies[depend]["path"], fromlist=True)
if hasattr(platform_position, dependencies[depend]["class"]):
dependency = getattr(platform_position,
dependencies[depend][
"class"]) # http://blog.csdn.net/d_ker/article/details/53671952
dependency_obj = dependency(self.driver, log=self.log)
self.beans[depend] = dependency_obj
self.log.info("加载依赖:{}成功", depend)
self.ocr = ddddocr.DdddOcr()
print("启动%s平台" % HeliumExtensionPlatform.describtion)
def run(self):
url = self.config_package.basic_config["main_url"]
# 打开亚马逊页面
print("打开页面:", url)
self.get_url_ignore_exception(url)
# 解决打开url有验证码的情况
self._input_code(url)
page = 1
print("打开url:", url)
SeleniumOperation.get_url_ignore_exception(self.driver, self.config_package.basic_config["url"])
while True:
print("第%s页数据获取......" % page)
datas = self.run_helium_extension()
if datas:
out_file_name = os.path.join(self.config_package.basic_config["out_path"],
str(datetime.date.today()) + "_" + str(page) + ".csv")
PandasUtil.write_csv_append(datas, out_file_name)
elements = SeleniumOperation.get_elements(self.driver, self.setting_package.NEXT_PAGE_XPATH)
if elements:
next_page_element = elements[-1]
text = next_page_element.text
if "下一页" or "Next" in text:
next_page_element.click()
page += 1
print("有下一页, 进入下一页,开始爬取:第%s页......" % (page))
while True:
time.sleep(5)
url_str = self.driver.current_url
page_str = "page=" + str(page)
if page_str in url_str:
self.config_package.basic_config["url"] = url_str
break
print("当前url:%s,希望url中有关键词:%s"%(url_str,page_str))
continue
else:
print("找不到下一页,结束运行")
break
'''
输入验证码
'''
def _input_code(self, url):
while True:
input_code_element = SeleniumOperation.get_element(self.driver, self.setting_package.INPUT_CODE_XPATH)
if input_code_element:
code = self._ocr_code()
if not code:
print("刷新页面,识别不出来")
self.get_url_ignore_exception(url)
continue
else:
break
else:
return
input_code_element.clear()
input_code_element.send_keys(code)
SeleniumOperation.click_button_anyway(self.driver, self.setting_package.SUBMIT_CODE_XPATH)
def _ocr_code(self):
pic_elements = SeleniumOperation.get_elements(self.driver, self.setting_package.IMAGE_CODE_XPATH)
if len(pic_elements) > 1:
pic_element = pic_elements[0]
image_url = pic_element.get_attribute("src")
import requests
# code_file_name = os.path.join(self.config_package.basic_config["out_path"],
# os.path.splitext(
# self.config_package.basic_config["file_name"].split("/")[-1])[
# 0] + "-" + str(datetime.time()) + ".jpg")
# with open(code_file_name, mode="wb") as f:
# f.write(requests.get(image_url).content) # 将图片以二进制写入
#
# with open(code_file_name, 'rb') as f: # 打开图片
# img_bytes = f.read() # 读取图片
res = self.ocr.classification(requests.get(image_url).content) # 识别
print("识别验证码是:", res)
return res
return None
def run_helium_extension(self):
print("开始运行helium插件")
try:
self.beans["helium"].before_run(data=self.config_package.basic_config)
datas = self.beans["helium"].run()
return datas
except Exception as e:
print("运行helium插件出错了")
self.log.exception(e)
return None
finally:
self.beans["helium"].after_run()
def after_run(self):
print("%s 平台已经运行完成,请根据log目录查看运行日志\n" % HeliumExtensionPlatform.describtion)
super(HeliumExtensionPlatform, self).after_run()
if __name__ == '__main__':
url = "https://www.amazon.fr/dp/B0BNW5P4PC?th=1"
netloc = urlsplit(url).netloc
subfix_location = netloc.split('.')[-1]
print(subfix_location)
obj = HeliumExtensionPlatform()
obj.run()
插件运行核心代码
import random
import time
from extensions.basic_extension import BasicExtension
from util.xpath_operation import SeleniumOperation
from settings import ConfigPackage
from settings import SettingPackage
from settings import DownLoadPath
from selenium.webdriver.common.by import By
import os
import shutil
from util.io_util import IOUTIL
class Helium10Extension(BasicExtension):
name = "helium10extension"
config_file = "%s%s" % (name, "_config")
setting_package = "%s%s" % (name, "_setting")
def __init__(self, driver, log):
super(Helium10Extension, self).__init__()
self.config_file = "%s%s" % (ConfigPackage, self.config_file)
self.setting_file = "%s%s" % (SettingPackage, self.setting_package)
self.config_package = __import__(self.config_file, fromlist=True)
self.setting_package = __import__(self.setting_file, fromlist=True)
self.log = log
self.driver = driver
self.shadow_driver = driver
SeleniumOperation.log = log
def before_run(self, data=None):
# XPathOperation.click_button(self.driver, self.setting_package.HELIUM10_CLICK_XPATH)
url = data["url"]
while self.login():
pass
self._get_shadow_dom(url)
while True:
SeleniumOperation.click_button_anyway(self.shadow_driver, self.setting_package.HELIUM10_CLICK_CSS_PATH,
by_type=By.CSS_SELECTOR)
element = SeleniumOperation.get_element(self.shadow_driver,
self.setting_package.HELIUM10_XRAY_2_CSS_PATH,
loading=False, by_type=By.CSS_SELECTOR)
if element:
SeleniumOperation.click_button(self.shadow_driver,
self.setting_package.HELIUM10_XRAY_2_CSS_PATH,
by_type=By.CSS_SELECTOR)
break
else:
SeleniumOperation.click_button_anyway(self.shadow_driver, self.setting_package.HELIUM10_CLICK_CSS_PATH,
by_type=By.CSS_SELECTOR)
element = SeleniumOperation.get_element(self.shadow_driver, self.setting_package.LOGIN_CSS_PATH,
loading=False, by_type=By.CSS_SELECTOR)
if element:
print("需要重新登录....")
self.login()
self._get_shadow_dom(url)
def _get_shadow_dom(self, url):
while True:
element = SeleniumOperation.get_element(self.driver, self.setting_package.AMAZION_XPATH)
if element:
self.shadow_driver = SeleniumOperation.get_shadow_root_js(self.driver,
self.setting_package.SHADOW_CSS_PATH)
if self.shadow_driver:
return
print("刷新页面:", url)
element = SeleniumOperation.get_element(self.driver, self.setting_package.SORRY_XPATH)
if element:
print("回到首页")
SeleniumOperation.click_button_anyway(self.driver, self.setting_package.SORRY_XPATH)
time.sleep(10)
SeleniumOperation.get_url_ignore_exception(self.driver, url)
# def before_refresh(self):
# element = SeleniumOperation.get_element(self.driver, self.setting_package.SORRY_XPATH)
# if element:
# SeleniumOperation.get_url_ignore_exception()
def run(self):
datas, shadow_driver = self.run_page(0)
# if not next_page:
# print("没有下一页,结束运行.....")
# return datas
print("关闭x-ray")
click_result = False
while not click_result:
element = SeleniumOperation.get_element(shadow_driver, self.setting_package.ALL_CLOSE_CSS_PATH,
by_type=By.CSS_SELECTOR)
if not element:
print("x-ray已经关闭了")
break
click_result = SeleniumOperation.click_button_anyway(shadow_driver, self.setting_package.ALL_CLOSE_CSS_PATH,
by_type=By.CSS_SELECTOR)
return datas
def run_page(self, begin_index):
datas = []
count = 0
while True:
shadow_driver = SeleniumOperation.get_shadow_root_js(self.driver,
self.setting_package.SHADOW_DETAIL_PATH)
if shadow_driver:
count += 1
print("X_RAY 树加载出来")
elements = SeleniumOperation.get_elements(shadow_driver, self.setting_package.TABLE_CSS_PATH,
loading=False,
by_type=By.CSS_SELECTOR)
if elements and len(elements) > begin_index:
break
print("等待数据出来.......")
element = SeleniumOperation.get_element(shadow_driver, self.setting_package.NEW_UI_CSS_PATH,
loading=False,
by_type=By.CSS_SELECTOR)
if element:
print("现在是旧的ui,切换新的ui")
SeleniumOperation.click_button_anyway(shadow_driver, self.setting_package.NEW_UI_CSS_PATH,
loading=False,
by_type=By.CSS_SELECTOR)
if elements and count > 20 and begin_index > 0:
print("重新点击load mores的按钮")
SeleniumOperation.click_button_anyway(shadow_driver, self.setting_package.LOAD_MORE_CSS_PATH,
by_type=By.CSS_SELECTOR)
print("等待X-RAY加载完成")
time.sleep(5) # 这个X-RAY隐藏树加载有点慢
print("找到%s条数据" % len(elements))
elements = elements[begin_index:]
print("只要获取%s条数据" % len(elements))
for index, element in enumerate(elements):
print("获取第%s条数据" % (index + 1))
datas.append(self.load_data(element))
# # 基于xpath也能找到
# child_element = SeleniumOperation.get_element(element, self.setting_package.ORDER_XPATH,
# loading=True)
# if child_element:
# print("xpath方式找到儿子的元素", child_element.text)
print("开始下载csv文件")
# 下载文件
for index, data in enumerate(datas):
print("下载第%s个商品:%s 的csv文件" % (index + 1, data['Product']))
self.download_all_time_csv(elements[index], data, shadow_driver)
# load_more_element = SeleniumOperation.get_element(shadow_driver, self.setting_package.LOAD_MORE_CSS_PATH,
# by_type=By.CSS_SELECTOR)
# loads_more = False
# if load_more_element:
# print("还有loads more,尝试loads more点击")
# loads_more = SeleniumOperation.click_button_anyway(shadow_driver, self.setting_package.LOAD_MORE_CSS_PATH,
# by_type=By.CSS_SELECTOR)
return datas, shadow_driver
def download_all_time_csv(self, element, data, shadow_driver):
while SeleniumOperation.click_button_anyway(
element, self.setting_package.COLUMNS_CSS_PATH[
"BSR"] + self.setting_package.BSR_CLICK_CSS_PATH,
loading=True,
by_type=By.CSS_SELECTOR) \
or SeleniumOperation.click_button_anyway(element,
self.setting_package.COLUMNS_CSS_PATH[
"Reviews"] + self.setting_package.REVIEWS_CLICK_CSS_PATH,
loading=True,
by_type=By.CSS_SELECTOR) \
or SeleniumOperation.click_button_anyway(
element, self.setting_package.COLUMNS_CSS_PATH[
"Sales"] + self.setting_package.SALE_CLICK_CSS_PATH,
loading=True,
by_type=By.CSS_SELECTOR):
print("进入sales、bsr 、reviews趋势图页面")
element = SeleniumOperation.get_element(shadow_driver, self.setting_package.SALE_CLICK_SWITCH_CSS_PATH,
loading=True,
by_type=By.CSS_SELECTOR)
if element:
break
#
# else:
# print("商品:%s三个地方都不可点击无法下载,ASIN号:%s" % (data["Product"], data["Asin"]))
# return
file_name = data["Asin"] if data["Asin"] else random.Random(10000)
if not self.download_files_csv(shadow_driver, self.setting_package.SALE_CLICK_SWITCH_CSS_PATH):
print("下载商品{}的sales csv文件失败".format(data["Product"]))
self.log.error("下载商品{}的sales csv文件失败".format(data["Product"]))
else:
self.wait_loaded_and_rename(file_name + "_sales.csv", data)
print("下载商品%s的bsr csv文件" % (data["Product"]))
if not self.download_files_csv(shadow_driver, self.setting_package.BSR_CLICK_SWITCH_CSS_PATH):
print("下载商品{}的bsr csv文件失败".format(data["Product"]))
self.log.error("下载商品{}的bsr csv文件失败".format(data["Product"]))
else:
self.wait_loaded_and_rename(file_name + "_bsr.csv", data)
print("下载商品%s的reviews csv文件" % (data["Product"]))
if not self.download_files_csv(shadow_driver, self.setting_package.REVIEWS_CLICK_SWITCH_CSS_PATH):
print("下载商品%s的reviews csv文件" % (data["Product"]))
self.log.error("下载商品{}的reviews csv文件失败".format(data["Product"]))
else:
self.wait_loaded_and_rename(file_name + "_reviews.csv", data)
click_result = False
while not click_result:
print("关闭窗口")
element = SeleniumOperation.get_element(shadow_driver, self.setting_package.CLOSE_CSS_PATH, loading=True,
by_type=By.CSS_SELECTOR)
if not element:
print("进入sales、bsr 、reviews趋势图页面已经关闭")
break
click_result = SeleniumOperation.click_button_anyway(shadow_driver, self.setting_package.CLOSE_CSS_PATH,
loading=True, by_type=By.CSS_SELECTOR)
def download_files_csv(self, shadow_driver, css_path):
click_result = SeleniumOperation.click_button_anyway(shadow_driver, css_path, loading=True,
by_type=By.CSS_SELECTOR)
if click_result:
# 选择 all——time时间
while True:
times_elements = SeleniumOperation.get_elements(shadow_driver, self.setting_package.ALL_TIME_CSS_PATH,
loading=False,
by_type=By.CSS_SELECTOR)
if times_elements and len(times_elements) > 1:
times_elements[-1].click()
break
time.sleep(5)
# 点击下载入口
click_result = SeleniumOperation.click_button_anyway(shadow_driver,
self.setting_package.DOWNLOAD_ENTRY_CSS_PATH,
by_type=By.CSS_SELECTOR)
while not click_result:
time.sleep(5)
click_result = SeleniumOperation.click_button_anyway(shadow_driver,
self.setting_package.DOWNLOAD_ENTRY_CSS_PATH,
by_type=By.CSS_SELECTOR)
# 下载csv文件
return SeleniumOperation.click_button_anyway(shadow_driver, self.setting_package.DOWNLOAD_CSV_CSS_PATH,
by_type=By.CSS_SELECTOR)
def wait_loaded_and_rename(self, filename, data):
download_file_name = SeleniumOperation.get_downloaded_filename(self.driver, 5) # wait_time 根据实际需要进行调整
if download_file_name:
try:
shutil.move(os.path.join(DownLoadPath, download_file_name),
os.path.join(DownLoadPath, filename))
print("文件:%s下载完成" % (filename))
return
except:
pass
last_file, last_name = IOUTIL.get_last_filename(DownLoadPath)
if "_sales" in last_name or "_bsr" in last_name or "_reviews" in last_name:
print("下载商品{}的sales csv文件失败".format(data["Product"]))
self.log.error("下载商品{}的sales csv文件失败".format(data["Product"]))
return
result = IOUTIL.rename(last_file, last_name, filename)
if not result:
self.log.error("文件:{}修改成新文件名:{}出错了,请手动修改".format(last_name, filename))
def load_data(self, element):
element_data = {}
for key, values in self.setting_package.COLUMNS_CSS_PATH.items():
child_element = SeleniumOperation.get_element(element, values,
loading=True,
by_type=By.CSS_SELECTOR)
if child_element:
text = child_element.text
element_data[key] = text
else:
print("找不到%s的元素,请查询是否path问题,默认设置为空")
element_data[key] = "None"
return element_data
def after_run(self, data=None):
pass
def _is_login(self):
# 判断是否登陆了
element = SeleniumOperation.get_element(self.shadow_driver, self.setting_package.HELIUM10_CLICK_CSS_PATH,
by_type=By.CSS_SELECTOR)
return True if element else False
def login(self):
try:
print("进入尝试自动登录.......")
current_url = self.driver.current_url
if "https://members.helium10.com/dashboard?accountId=" in current_url:
print("已经登录成功")
return False
SeleniumOperation.get_url_ignore_exception(self.driver, self.config_package.basic_config["login_url"])
element = SeleniumOperation.get_element(self.driver, self.setting_package.EMAIL_XPATH)
if not element:
print("找不到登录输入信息, 可能已经登录成功")
return False
element.send_keys(self.config_package.basic_config["email"])
element = SeleniumOperation.get_element(self.driver, self.setting_package.PASSWORD_XPATH)
element.send_keys(self.config_package.basic_config["password"])
element = SeleniumOperation.get_element(self.driver, self.setting_package.CAPTCHAID_XPATH)
if element:
input("请在页面手动操作登陆验证码,然后输入任意按键继续:")
SeleniumOperation.click_button(self.driver, self.setting_package.LOGIN_XPATH)
return False
except:
self.log.exception("登录失败")
return True