爬取cos猎人
数据库管理主要分为4个模块,代理获取模块,代理储存模块,代理测试模块,爬取模块
cos猎人已经倒闭,所以放出爬虫源码
api.py 为爬虫评分提供接口支持
import requests
import concurrent.futures
import redis
import random
import flask # 导入flask模块
from flask import request # 获取url地址中查询参数
from flask import jsonify # 可以把对象转换为字符串
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_DATABASE = 0
REDISOBJECT = 'proxysss'
"""时间间隔配置"""
GETTER_PROXY = 60*5
VERIFY_PROXY = 60*3
class RedisClient:
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DATABASE):
self.db = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def exists(self, proxy):
"""判断传入代理有没有存输到数据库
有TRUE,没有False
is比not优先级高"""
return not self.db.zscore(REDISOBJECT, proxy) is None
def add(self, proxy, score=10):
"""添加代理到数据库,设置初始分数为10分
决定是否加入新代理"""
if not self.exists(proxy):
return self.db.zadd(REDISOBJECT, {proxy: score})
def random(self):
"""随机选择一个代理
尝试获取评分为100分的代理
获取指定范围的代理
如果数据库没有代理就提示数据库为空"""
proxies = self.db.zrangebyscore(REDISOBJECT, 100, 100)
if len(proxies):
return random.choice(proxies)
proxies = self.db.zrangebyscore(REDISOBJECT, 1, 99)
if len(proxies):
return random.choice(proxies)
print("-----数据库为空----")
def decrease(self, proxy):
"""传入代理如果检测不过关,降低代理分数"""
self.db.zincrby(REDISOBJECT, -10, proxy)
score = self.db.zscore(REDISOBJECT, proxy) # 查询分数
if score <= 0:
self.db.zrem(REDISOBJECT, proxy) # 删除代理
def max(self, proxy):
"""检测代理可用,就将代理设置最大分数"""
return self.db.zadd(REDISOBJECT, {proxy: 100})
def count(self):
"""获取数据库中代理的数量"""
return self.db.zcard(REDISOBJECT)
def all(self):
"""获取所有代理,返回列表"""
proxies = self.db.zrangebyscore(REDISOBJECT,1,100)
if proxies:
return proxies
else:
print('-----数据库无代理----')
def count_for_num(self,number):
"""指定数量获取代理,返回一个列表
"""
all_proxies = self.all()
proxies = random.sample(all_proxies,k=number)#随机取数据,不重样
return proxies
def get_proxy():
return requests.get("http://127.0.0.1:5010/all").json()
def delete_proxy(proxy):
requests.get("http://127.0.0.1:5010/delete/?proxy={}".format(proxy))
# getHtml()
# def verify_thread_pool():
# """线程池检测代理
# 1.从数据库中取到所有代理
# 2.用线程池检测代理"""
# proxies_list = client.all() # 列表
# with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# for proxy in proxies_list:
# executor.submit(verify_proxy, proxy)
#
#
# TEST_URL = "https://www.baidu.com/"
# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'}
#
#
#
#
# def verify_proxy(proxy):
# """检测代理是否可用"""
# proxies = {
# "http": "http://" + proxy,
# "https": "https://" + proxy
# }
# try:
# response = requests.get(url=TEST_URL, headers=headers, proxies=proxies, timeout=2)
# if response.status_code in [200, 206, 302]:
# """#判断请求返回的状态码是否成功
# 请求成功设为100分,调用max
# 请求不成功,将代理降分,调用decrease"""
# client.max(proxy)
# print("***代理可用***", proxy)
# else:
# client.decrease(proxy)
# print("--状态码不合法--", proxy)
# except:
# """请求超时,表示代理不可用"""
# client.decrease(proxy)
# print("===请求超时===")
#
# # 检测速度太慢,引入多任务,多线程
# def verify_thread_pool():
# """线程池检测代理
# 1.从数据库中取到所有代理
# 2.用线程池检测代理"""
# proxies_list = client.all() # 列表
# with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# for proxy in proxies_list:
# executor.submit(verify_proxy, proxy)
#
# if __name__ == '__main__':
# # proxy = [
# # '45.234.63.220:999',
# # '60.168.255.69:8060',
# # '65.108.27.185:3128',
# # '62.162.91.205:3129',
# # '37.34.236.15:80'
# # ]
# # for pro in proxy:
# # verify_proxy(pro)
# verify_thread_pool()
getter.py从数据库抽取一个代理
import requests
def get_one_proxy():
return requests.get("http://127.0.0.1:5000/all")
print(get_one_proxy().text)
sever.py搭建本地服务器供调用
import flask # 导入flask模块
from api import RedisClient
from flask import request # 获取url地址中查询参数
from flask import jsonify # 可以把对象转换为字符串
app = flask.Flask(__name__)
client = RedisClient()
@app.route('/')
# 将下面的函数挂载到路由
def index():
"""视图函数:http://demo.spiderpy.cn/get/
视图函数返回的数据,只能返回字符串类型的数据"""
return '<h2>欢迎来到代理池</h2>'
@app.route('/get')
def get_proxy():
"""随机获取一个代理,调用数据库random模块"""
one_proxy = client.random()
return one_proxy
@app.route('/getcount')
def get_any_proxy():
"""获取指定数量一个代理,调用数据库的 count_for_num()
拿到查询参数的值
又可能用户没有传递查询参数,num返回为空"""
num = request.args.get('num', '')
if not num:
"""没有获取到查询参数"""
num = 1
else:
num = int(num)
any_proxy = client.count_for_num(num)
return jsonify(any_proxy)
@app.route('/getnum')
def get_count_proxy():
"""获取所有代理数量,调用数据库count方法"""
count_proxy = client.count()
return f"代理可用的数量为:{count_proxy}个"
@app.route('/getall')
def get_all_proxy():
"""获取所有代理,调用数据库的all()"""
all_proxy = client.all()
return jsonify(all_proxy)
if __name__ == '__main__':
"""运行实例化的app对象"""
app.run()
test_self.py和tests.py对已经储存的代理质量进行检测
记不清哪个效果更好
import time
import requests
from api import RedisClient
clients = RedisClient()
def get_proxy():
return requests.get("http://127.0.0.1:5000/getall")
a = get_proxy()
a = a.json()
# print(a)
# for b in a:
# print(b)
# print(type(a))
def getHtml():
# retry_count = 1
for proxy in a:
# print(proxy)
try:
html = requests.get('http://www.example.com', proxies={"http": "http://{}".format(proxy)},timeout=4)
# print(html.text)
if html.status_code in [200, 206, 302]:
print(proxy,":可以使用")
clients.add(proxy)
# 使用代理访问
except Exception:
print("代理不可用", proxy)
clients.decrease(proxy)
# 删除代理池中代理
# delete_proxy(proxy)
while True:
getHtml()
time.sleep(60*2)
进程池爬取cos猎人.py 主爬虫代码
from typing import List, Any
from concurrent.futures import ThreadPoolExecutor
import requests
import os
from lxml import etree
import re
if not os.path.exists('./img'):
os.makedirs("img")
def get_one_proxy():
return requests.get("http://127.0.0.1:5000/get")
proxies = get_one_proxy().text
# proxies = ''
def down_img(img_url):
for urls in img_url:
response = requests.get(url=urls, headers=headers)
name = urls.split("/")[-1]
with open("./img/"+f'{name}', 'wb') as f:
f.write(response.content)
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58",
"referer": "https://www.coshunter.pro/simo"
}
a = 252
#354
while a < 355:
url = f"https://www.coshunter.pro/shop/buy/page/{a}"
res = requests.get(url, headers=headers, proxies={"http": "http://{}".format(proxies)})
res.encoding = "utf-8"
html = re.findall(r'<a class="link-block" href="(.*?)"></a>',res.text)
urls = html[:-1]
# print(urls)
for i in urls:
res = requests.get(i, headers=headers, proxies={"http": "http://{}".format(proxies)})
img_url = re.findall(r'<figure class="wp-block-image.*src="(.*?)"',res.text)
print(img_url)
with ThreadPoolExecutor(10) as t:
t.submit(down_img,img_url)
print(a)
a += 1