爬虫学习记录

news2025/1/8 23:40:24

 1.概念

通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程

  • 通用爬虫:抓取的是一整张页面数据
  • 聚焦爬虫:抓取的是页面中的特定局部内容
  • 增量式爬虫:监测网站中数据更新的情况,只会抓取网站中最新更新出来的数据

robots.txt协议:

君子协议,网站后面添加robotx.txt可进行查看

https://www.baidu.com/robots.txt

1.1 http协议

服务器和客户端进行数据交互的一种形式

常用的请求头信息:

  • User-Agent: 请求载体的身份标识,如: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36
  • Connection: 请求完毕完毕之后,是断开连接还是保持连接(close和keep-alive)

常用响应头信息:

  • Content-Type: 服务器响应客户端的数据类型 如:(text/html; charset=utf-8)

1.2 https协议

安全的超文本传输协议,在客户端和服务端进行数据传输和数据交互的过程中,数据是进行加密的.

数据加密的方式

  • 对称秘钥加密: 客户端制定加密方式,加加密的密文和生成的秘钥都传输给服务端.服务端根据秘钥对于密文进行解密.(但是密文和秘钥可能会被同时拦截)
  • 非对称秘钥加密:服务端生成秘钥对,将生成的公钥传递给客户端,然后将生成的密文传递给服务端,服务端再使用私钥进行解密(客户端拿到的秘钥,不一定是从服务端传递过来的)
  • 证书秘钥加密:服务器端制定加密方式,服务端将公钥传递给证书认证机构,认证机构将公钥通过认证之后,进行数字签名.将携带数字签名的公钥封装到证书当中,将证书一并发送给客户端

2. requests模块

作用:模拟浏览器发送请求

requests模块的编码流程:

  • 指定url
  • 发起请求(get/post)
  • 获取相应数据
  • 持久化存储

2.1 简易网页采集器

2.1.1 UA 伪装

将python脚本发送的请求,伪装成为浏览器发送的请求

2.1.2 Get请求携带参数

将url的参数封装成为字典,传递给params

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests

# 参考url
# https://www.baidu.com/s?wd=%E6%88%90%E5%8A%9F
url = "https://www.baidu.com/s?"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}

word = input("enter your word:").strip()

# get请求携带的参数
param = {
    "wd":word
}

# 获取响应对象
response = requests.get(url, headers=header, params=param)

# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content

# 持久化储存
file_name = word + ".html"
with open(file_name, "wb") as f:
    f.write(page_text)

print(f"{file_name} has been save successfully")

2.2 破解百度翻译

2.2.1 POST请求携带参数

将传递的参数封装成为字典,并且传递给data

180e7e948fa9407cb8ffb152aa6caa93.png

2.2.2 Ajax请求

单词输入后,局部页面就会刷新

Ajax(Asynchronous JavaScript and XML)是一种在Web应用程序中进行异步通信的技术,它使用JavaScript和XML(现在通常使用JSON)来实现在不刷新整个页面的情况下与服务器进行数据交换

97db308ad6cc45dfa31e7f16dcee5cc8.png

2.2.3 JSON模块的使用

  • 反序列化 loads:将json字符串转化为python对象字典
  • 序列化 dump: 将python对象字典转化为json字符串,并写入文件

4c96e3dfa22742a1a2ce45a179a8f6cd.png

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests
import json

# 参考url
url = "https://fanyi.baidu.com/sug"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
    "Referer":"https://fanyi.baidu.com/mtpe-individual/multimodal"
}

word = input("enter your word:").strip()

# get请求携带的参数
param = {
    "kw":word
}

# 获取响应对象
response = requests.post(url, headers=header,data=param)

# 获取响应内容
word_text = json.loads(response.text)

# 持久化储存
file_name = word + ".json"
with open(file_name, "w",encoding="utf-8") as f:
    json.dump(word_text["data"][0], f,ensure_ascii=False)

print(f"{file_name} has been save successfully")

f35d983fa23d484db43827f19e219270.png

2.3 电影 

2.4 公司url

  • 动态加载数据分析
  • 获取每家公司的url,但是发现每家公司的详情页面也是动态加载出来的

3.数据解析

  • 正则
  • bs4
  • xpath

数据解析原理:

解析局部的文本内容都会在标签之间或是标签的属性中进行存储

  • 进行指定标签的定位
  • 标签或是标签对应属性存储值的获取

3.1 正则解析

3.2 bs4解析

只可以被应用于python中

3.2.1 数据解析原理

  • 实例化BeautifulSoup对象,并且将页面源码数据加载到该对象里面
  • 通过调用BeautifulSoup对象中的相关的属性或是方法,进行标签定位和数据提取

3.2.2 环境安装

pip3 install beautifulsoup4  -i  https://mirrors.aliyun.com/pypi/simple
pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.2.3 bs4的具体使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:07
# @Author : George
from bs4 import BeautifulSoup
import re

fp = open("./result.html", "r", encoding="utf-8")
soup = BeautifulSoup(fp, "lxml")
# soup.tagName:返回的是html中第一次出现的tag标签
print(soup.div)
# -------------------------------------------
# 等同于soup.tagName
print(soup.find("a"))
# 属性定位
print(soup.find("a", href=re.compile(".*ip138.com/$")).text)
# # 加载源码中所有的tag标签组成的列表
print(soup.find_all("a"))
# -------------------------------------------
# 可以使用选择器,id/类/标签/选择器,返回的是一个列表
print(soup.select('.center'))
# 层级选择器,
# “ ”空格就是表示多个层级
# > 表示一个层级
for i in soup.select('.center > ul > li > a'):
    print(i.text)
print(soup.select('.center > ul a')[0])
# -------------------------------------------
# 获取标签之间的文本数据 soup.a.text/string/get_text()
# --text/get_text可以获取标签里面啊所有的文本内容
# --string只可以获取该标签下的直系文本内容
print(soup.select('.center > ul a')[0].string)
# -------------------------------------------
# 获取标签里面的属性值
print(soup.select('.center > ul a')[0]["href"])

3.2.4 bs4爬取三国演义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:43
# @Author : George
# ==================================================
# <a href="/guwen/bookv_6dacadad4420.aspx">第一回</a>
# 第一回网址
# https://www.gushiwen.cn/guwen/bookv_6dacadad4420.aspx
# ==================================================

from bs4 import BeautifulSoup
import requests

url = "https://www.gushiwen.cn/guwen/book_46653FD803893E4F7F702BCF1F7CCE17.aspx"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 获取响应对象
response = requests.get(url, headers=header)
# 实例化bs对象
soup = BeautifulSoup(response.text, "lxml")
a_liat = soup.select(".bookcont >ul > span >a")
fp = open("三国.txt","w",encoding="utf-8")
# 解析章节标题和详情页面的url
for tag in a_liat:
    title = tag.text
    detail_url = "https://www.gushiwen.cn/"+tag["href"]
    # 对详情页面发起请求
    detail_page = requests.get(detail_url, headers=header)
    # 解析出详情页面中的内容
    detail_soup = BeautifulSoup(detail_page.text,"lxml")
    # 使用此种方法出现一个问题,就是文章都是在p标签里面,所以文章不会换行
    # content = detail_soup.find("div",class_="contson").text
    fp.write(title + ":")
    for line in detail_soup.select('.contson > p'):
        fp.write(line.text+"\n")
    fp.write("\n\n")
    print(f"{title}爬取成功")
fp.close()

效果:

2fed6d19354c427799d07f88da4f0cb3.png

3.3 xpath解析

最常用且是最便捷高效的一种解析方式

3.3.1 xpath解析原理

  • 实例化一个etree的对象,且将被解析的页面远吗数据加载到该对象中
  • 调用etree对象中的xpath方法结合xpath表达式实现标签的定位和内容的捕获

3.3.2 环境的安装

pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.3.3 具体使用

xpath只能够根据层级关系定位标签页面.

1d80a29970ae42d885f98586b60dd8f9.png

ba9263465f7742479051cd9574f5c8fd.png

<!DOCTYPE html>
<html lang="zh-CN">
<head>
<!--    <meta charset="UTF-8">-->
<!--    <meta name="viewport" content="width=device-width, initial-scale=1.0">-->
    <title>小型HTML页面示例</title>
</head>
<body>
    <div class="container">
        <div class="div1">
            <h2>第一个Div</h2>
            <p>这是第一个div的内容。它使用了类标签.div1进行样式定位。</p>
            <p>这是第一个div的内容。它使用了类标签.div2进行样式定位。</p>
            <p><title>这是第一个div的内容。它使用了类标签.div3进行样式定位。</title></p>
        </div>
        <div class="div2">
            <h2>第二个Div</h2>
            <p>这是第二个div的内容。它使用了类标签.div2进行样式定位。</p>
        </div>
        <div class="div3">
            <h2 class="div3_h2">第三个Div</h2>
            <p>这是第三个div的内容。它使用了类标签.div3进行样式定位。</p>
        </div>
    </div>
</body>
</html>
from lxml import etree

tree_ = etree.parse("./baidu.html")
# r = tree_.xpath("/html/head/title")  # => [<Element title at 0x10a79ee60>],返回的是定位出来的对象
# r = tree_.xpath("/html//title")  # => [<Element title at 0x10a510dc0>],定位出来的效果是一样的
# r = tree_.xpath("//title") # => [<Element title at 0x106b75dc0>],找到源码里面所有的title标签

# 属性定位
# r = tree_.xpath('//div[@class="div1"]') # => [<Element div at 0x101807e60>]

# 索引定位,索引位置从1开始
# r = tree_.xpath('//div[@class="div1"]/p[3]')

# 取文本 /text()获取的标签里面直系的文本内容, //text() 获取标签下面所有的文本内容
# text = tree_.xpath('//div[@class="div1"]/p[3]/text()')
# print(text)  # => ['这是第一个div的内容。它使用了类标签.div3进行样式定位。']
# text = tree_.xpath('//div[@class="div1"]//text()')
# print(text)

# 获取属性
# attr = tree_.xpath('//div[@class="div3"]/h2/@class')  # ['div3_h2']
# print(attr)

3.3.4 爬取ppt模板

已经成功,结果如下,但是是大学时多亏了它的免费模板,就不贴代码给它带来麻烦了

99221c29b8094f67b7b8f70744dd24fa.png

3.3.5 爬取美女图片

https://pic.netbian.com/4kmeinv/

爬取美女图片失败,开始进入网站总是有人机验证,进去什么都爬取不了,后面再试一下

db1321caf5304010a2a3d806cc43f60c.png

4.模拟登录

4.1 验证码识别 

要收费,自己写个图片文字识别

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 13:44
# @Author : George
from PIL import Image
import pytesseract

# 如果你没有将tesseract.exe添加到系统的PATH中,
# 你需要在这里指定tesseract可执行文件的完整路径
pytesseract.pytesseract.tesseract_cmd = r'D:\Tesseract-OCR\tesseract.exe'

# 打开一个图像文件
image_path = 'abc_1.png'  # 替换为你的图像路径
image = Image.open(image_path)

# 使用pytesseract进行OCR
text = pytesseract.image_to_string(image, lang='chi_sim')  # lang参数指定语言,例如'chi_sim'表示简体中文

# 打印识别出的文字
print(text)

4.2 模拟登录逻辑

5.cookie

5.1含义及作用

网页的Cookie是一种在Web开发中广泛使用的技术,用于在用户的计算机上存储小块的数据。这些数据通常由Web服务器发送给用户的浏览器(所以主要是服务器创建的),并在用户后续的访问中被浏览器返回给服务器。Cookie的主要功能和用途包括:

  1. 会话管理:Cookie可以用于保持用户的会话状态,例如在用户登录到网站后,服务器可以发送一个包含会话ID的Cookie到用户的浏览器。在用户后续的请求中,浏览器会自动包含这个会话ID,使得服务器能够识别并持续管理用户的会话。

  2. 个性化设置:Cookie可以用来存储用户的偏好设置,例如网站的语言、主题颜色、字体大小等。这样,当用户再次访问网站时,这些设置可以自动恢复,提高用户体验。

  3. 跟踪用户行为:通过Cookie,网站可以跟踪和分析用户的行为,例如用户访问了哪些页面、停留了多长时间、点击了哪些链接等。这些信息对于网站优化、广告定位等非常有用。

  4. 安全性:在某些情况下,Cookie还可以用于增强网站的安全性,例如通过存储加密的令牌来验证用户的身份。

Cookie具有以下几个特点:

  • 存储在客户端:Cookie数据存储在用户的浏览器上,而不是服务器上。这意味着即使用户关闭了浏览器或计算机,只要没有删除Cookie,数据仍然存在。
  • 自动发送:在用户访问与Cookie相关的网站时,浏览器会自动将相关的Cookie数据发送到服务器。
  • 有限的大小和数量:每个Cookie的大小和数量都有限制,这取决于浏览器和Web服务器的配置。
  • 过期时间:Cookie可以设置过期时间,在过期时间之前,Cookie一直有效。如果没有设置过期时间,Cookie就是一个会话Cookie,在用户关闭浏览器时自动删除。

5.2 session的含义及特点

  • 一、Session的基本概念

Session是服务器用于跟踪用户会话的一种机制。它允许服务器在多个请求之间识别同一个用户,并维护该用户的状态信息。这些状态信息可以包括用户的登录状态、购物车内容、偏好设置等。

  • 二、Session的工作原理
  1. 创建Session:当用户首次访问网站时,服务器会创建一个新的Session对象,并为其分配一个唯一的Session ID。
  2. 发送Session ID:服务器通常会将这个Session ID以Cookie的形式发送给客户端浏览器。客户端浏览器会在后续的请求中自动将这个Session ID附加在请求头中。
  3. 识别用户:服务器通过检查请求头中的Session ID来识别用户,并获取相应的Session数据。这样,服务器就可以在多个请求之间保持用户的状态信息。
  • 三、Session的作用
  1. 保持用户状态:Session允许服务器在多个请求之间跟踪用户的状态信息,如登录状态、购物车内容等。这使得用户可以在不同页面之间无缝切换,而无需重新认证或输入信息。
  2. 个性化服务:根据用户的喜好和历史行为,服务器可以为用户提供个性化的内容和服务。这有助于提高用户体验和满意度。
  3. 安全性:通过验证Session ID来确认用户的请求,服务器可以防止未授权访问和非法操作。这有助于保护用户的隐私和数据安全。
  • 四、Session与Cookie的关系

Session和Cookie是密切相关的两种技术。Cookie是服务器发送到客户端浏览器并保存在本地的一小块数据,而Session则是服务器用于跟踪用户会话的一种机制。Cookie通常用于存储Session ID,以便服务器在后续的请求中识别用户。因此,可以说Cookie是Session的一种实现方式。

  • 五、Session的管理

在网络请求中,管理Session是非常重要的。开发人员需要确保Session的安全性、有效性和可维护性。这包括:

  1. 设置合理的Session过期时间:为了避免用户长时间未操作而导致的会话过期问题,开发人员需要设置合理的Session过期时间。
  2. 保护Session ID:Session ID是用户身份的重要标识,开发人员需要采取措施来保护它免受攻击和泄露。例如,可以使用HTTPS协议来加密传输Session ID,或者使用更复杂的Session ID生成算法来提高安全性。
  3. 清理无效的Session:为了节省服务器资源和提高性能,开发人员需要定期清理无效的Session对象。这可以通过设置Session的失效时间、使用数据库存储Session数据并定期清理过期数据等方式来实现。

5.3 http和https协议的特点

无状态。即是说,即使你的第一次请求已经实现了自动登录。但是你第二次发送请求时,服务器端并不知道你的请求是基于第一次登录状态的。

cookie可以让服务器端记录客户端的相关状态

5.3 cookie值的处理

  • 手动抓包cookie值,将其封装到headers里面,但是这种方法面对cookie是动态变化的时候就很难处理了
  • session会话对象
    • 可以进行请求的发送
    • 如果请求过程中产生了cookie,则该cookie会被自动存储/携带在该session会话对象中
  • b3b252aa2fe644b3aef552a0ef159520.png

6.代理

需要绕过IP封锁、限制或进行大规模数据抓取时。代理服务器充当客户端和目标服务器之间的中介,可以隐藏你的真实IP地址,提供额外的安全性,有时还能加速请求

测试网址:

https://httpbin.org/get

现在基本上没有免费正常的代理可以被使用,我这个也是失败的。看到一个博主写建立代理ip池的python3之爬虫代理IP的使用+建立代理IP池_python爬虫代理池-CSDN博客,代码写的不错,爬取出来的ip也没什么能用的了。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 19:14
# @Author : George
import requests

url = "https://www.baidu.com/s?"

# 将爬虫程序伪装成为浏览器来发送请求
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}

# 设置代理
proxies = {
    "http": "154.203.132.49:8080",
    "https":"49.73.4.158:8089"
}
param = {
    "wd":"ip"
}

response = requests.get(url,headers=headers,proxies=proxies,verify=False,params=param)
with open("proxy.html","wt",encoding="utf-8") as f:
    f.write(response.text)

公司的联网也是需要代理的

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests

user = ""
passwd = "

url = "https://www.baidu.com/"

# 模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 指定代理,不指定代理,无法上网
proxies = {
    "http": f"http://{user}:{passwd}@ip:port",
    "https": f"https://{user}:{passwd}@ip:port"
}

# 获取响应对象
response = requests.get(url, headers=header, proxies=proxies)

# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content

# 持久化储存
with open("baidu.html", "wb") as f:
    f.write(page_text)

7.异步爬虫(进程池)

本来是针对视频进行爬取的,但是ajax请求时的请求地址,看不懂mrd这个怎么来的,暂时跳过,我灰太狼一定会回来的!!!!!!!!!!!!

295b11c4eb6346fba989e6cdf2874352.png

d684c278cec246cf8b9eb62f1497660d.png

7.1 视频爬取

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 13:16
# @Author : George
import requests
import chardet
import os
from lxml import etree
"""
"https://www.pearvideo.com/category_1"
['video_1797596', 'video_1797399', 'video_1797404', 'video_1797260']

https://www.pearvideo.com/video_1797596

ajax请求
https://www.pearvideo.com/videoStatus.jsp?contId=1797596&mrd=0.4308675768914054
https://www.pearvideo.com/videoStatus.jsp?contId=1797399&mrd=0.6241695396585363
"""


class LiVideo(object):
    def __init__(self):
        #  定义输出ppt的文件夹
        self.extract_to_dir = f"./video"
        os.makedirs("./video", exist_ok=True)
        # 添加Referer防止反爬虫
        self.header = {
            "Referer": "https://www.pearvideo.com/",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        # 编码
        self.encoding = None
        self.home_url = "https://www.pearvideo.com/category_1"

    def get_HTML(self, url_param):
        response = requests.get(url_param, headers=self.header)
        if response.status_code == 200:
            # 使用 chardet 自动检测编码
            self.encoding = chardet.detect(response.content)['encoding']
            response.encoding = self.encoding
            # 创建etree对象
            tree = etree.HTML(response.text)
            return tree

    def deal(self):
        home_tree = self.get_HTML(self.home_url)
        home_url_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/@href")
        name_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/div[2]/text()")
        for name,url in zip(name_list,home_url_list):
            url = "https://www.pearvideo.com/"+url
            detail_tree = self.get_HTML(url)


if __name__ == '__main__':
    vi = LiVideo()
    vi.deal()

 7.2 诗文异步爬取

只要将任务提交给线程池,线程池就会自动安排一个线程来执行这个任务.同过线程池提交的任务是异步提交.异步提交的结果就是不等待任务的执行结果,继续往下执行 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 14:26
# @Author : George
"""
https://www.gushiwen.cn/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb

第二层
https://www.gushiwen.cn/guwen/book_4e6b88d8a0bc.aspx
https://www.gushiwen.cn/guwen/book_a09880163008.aspx

第三層
https://www.gushiwen.cn/guwen/bookv_b630af160f65.aspx
"""
import os
import requests
import chardet
from lxml import etree
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from typing import Dict, List, Tuple

class NovelDownloader:
    def __init__(self):
        self.output_dir = "./novels"
        os.makedirs(self.output_dir, exist_ok=True)
        
        self.headers = {
            "Referer": "https://www.gushiwen.cn/guwen/Default.aspx?p=1",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        self.base_url = "https://www.gushiwen.cn"
        self.home_url = f"{self.base_url}/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb"

    def get_html_tree(self, url: str) -> etree._Element:
        """获取页面HTML并返回etree对象"""
        response = requests.get(url, headers=self.headers)
        if response.status_code != 200:
            raise Exception(f"Failed to get {url}, status code: {response.status_code}")
            
        encoding = chardet.detect(response.content)['encoding']
        response.encoding = encoding
        return etree.HTML(response.text)

    def get_chapter_details(self) -> Dict[str, Dict[str, str]]:
        """获取所有章节详情"""
        home_tree = self.get_html_tree(self.home_url)
        
        # 获取书籍链接和标题
        urls = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/@href")[1:3]
        titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")[1:3]
        
        book_details = {}
        for title, url in zip(titles, urls):
            detail_tree = self.get_html_tree(f"{self.base_url}{url}")
            
            # 获取章节链接和标题
            chapter_urls = [f"{self.base_url}{url}" for url in 
                          detail_tree.xpath("//*[@class='bookcont']/ul/span/a/@href")]
            chapter_titles = detail_tree.xpath("//*[@class='bookcont']/ul/span/a/text()")
            
            book_details[title] = dict(zip(chapter_titles, chapter_urls))
            
        return book_details

    def download_novel(self, title: str, chapters: Dict[str, str]):
        """下载单本小说"""
        output_path = os.path.join(self.output_dir, f"{title}.txt")
        print(f"开始下载 {title}".center(100, "="))
        
        with open(output_path, "w", encoding="utf-8") as f:
            for chapter_title, chapter_url in chapters.items():
                response = requests.get(chapter_url, headers=self.headers)
                soup = BeautifulSoup(response.text, "lxml")
                
                f.write(f"{chapter_title}:\n")
                for paragraph in soup.select('.contson > p'):
                    f.write(f"{paragraph.text}\n")
                f.write("\n\n")
                
                print(f"{chapter_title} 下载完成".center(20, "-"))
                
        print(f"{title} 下载完成".center(100, "="))

def main():
    start_time = time.time()
    
    downloader = NovelDownloader()
    books = downloader.get_chapter_details()
    
    # 使用线程池并行下载
    with ThreadPoolExecutor(max_workers=10) as pool:
        futures = [pool.submit(downloader.download_novel, title, chapters) 
                  for title, chapters in books.items()]
        
    print(f"总耗时: {time.time() - start_time:.2f}秒")

if __name__ == '__main__':
    main()

8.异步爬虫(协程)

  协程内容不赘述:CSDN

c62484262bf6460187337af4d2ab3996.png

基于单线程和协程,实现异步爬虫。 

8.1. 基于flask搭建服务器

简单的学习了一下,感觉不是很复杂,后面等着详细学习

261a05e6ee984879838d6b8015f8b299.png

pip3 install Flask -i  https://mirrors.aliyun.com/pypi/simple
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 21:47
# @Author : George
"""
# @app.route('/'): 这是一个装饰器,用于告诉 Flask 哪个 URL 应该触发下面的函数。
在这个例子中,它指定了根 URL(即网站的主页)。

# return 'Hello, World!': 这行代码是 hello_world 函数的返回值。
当用户访问根 URL 时,这个字符串将被发送回用户的浏览器。

"""
from flask import Flask, request, jsonify
import time


def get_client_ip(request):
    # 如果使用了反向代理,优先从 X-Forwarded-For 头部获取 IP 地址
    x_forwarded_for = request.headers.get('X-Forwarded-For', '').split(',')
    if x_forwarded_for:
        client_ip = x_forwarded_for[0]  # 通常第一个 IP 地址是客户端的真实 IP 地址
    else:
        client_ip = request.remote_addr
    return client_ip


app = Flask(__name__)


@app.route('/')
def index():
    return 'Hello, World!'


@app.route('/bobo')
def index_bobo():
    # 获取client的user_agent和referer
    user_agent = request.headers.get('User-Agent')
    user_referer = request.headers.get('Referer')
    print(user_agent, user_referer)
    # client ip地址没有获取到
    client_ip = get_client_ip(request)
    print({'client_ip': client_ip})
    time.sleep(3)
    return 'bobo'


@app.route('/jar')
def index_jar():
    time.sleep(3)
    return 'jar'


@app.route('/test')
def index_test():
    time.sleep(3)
    return 'test'


if __name__ == '__main__':
    app.run(threaded=True)

8.2 基于aiohttp实现异步爬虫

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 22:08
# @Author : George
import asyncio
import requests
import time
import aiohttp
from threading import currentThread

"""
async def request(url): # 耗时: 9.054526805877686,requests是同步阻塞,必须替换为异步库提供的函数aiohttp
"""

urls = [
    "http://127.0.0.1:5000/jar",
    "http://127.0.0.1:5000/bobo",
    "http://127.0.0.1:5000/test"
]

start = time.time()


# 耗时: 9.054526805877686,requests是同步操作
# async def request(url):
#     print("开始下载", url)
#     response = requests.get(url=url)
#     print(response.text)
#     print("下载结束", url)
#     print("------------")

async def request_2(url):
    print("开始下载", url, currentThread())
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
        "Referer": "https://movie.douban.com/explore"
    }
    # 设置代理
    proxy = "http://8.220.204.215:8008"

    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            text = await response.text()
            print(text)
    print("下载结束", url)


async def main():
    tasks = []
    for url in urls:
        tasks.append(asyncio.create_task(request_2(url)))
    await asyncio.wait(tasks)


asyncio.run(main())

end = time.time()
print("耗时:", end - start)

9.selenuim使用

  • 便捷的获取网页中间动态加载的数据
  • 边界实现模拟登录

selenuim:基于浏览器自动化的一个模块

入门指南 | Selenium

b2505f3011124e5bb0606b9d5922d52f.png

基于selenium实现浏览器自动化,自动输入并播放动漫核心代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2025/1/1 16:04
# @Author : George
"""
在 Selenium 4 中,不再直接在 webself.driver.Chrome 中设置驱动程序路径,而是通过引入 Service 对象来设置。
这样可以避免弃用警告,并确保驱动程序的正确加载
"""
import os.path

from selenium import webdriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import time
from lxml import etree
import requests
import chardet
import json
from log_test import logger

class VideoAutoPlay(object):
    def __init__(self):
        self.driver = {}
        self.count_num = 1
        self.movies_dict =None
        self.home_url = "https://www.agedm.org/"
        self.headers = {
            "Referer": "https://www.agedm.org/",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0"
        }

    def get_html_tree(self, url: str) -> etree._Element:
        """获取页面HTML并返回etree对象"""
        response = requests.get(url, headers=self.headers)
        if response.status_code != 200:
            raise Exception(f"Failed to get {url}, status code: {response.status_code}")

        encoding = chardet.detect(response.content)['encoding']
        response.encoding = encoding
        return etree.HTML(response.text)

    def movies_input(self, movie_name, n):
        try:
            if not self.driver:
                self.driver = self.setup_driver()

            self.driver.get(self.home_url)
            # 查找搜索框元素
            search_box = self.driver.find_element(By.ID, "query")

            # 输入搜索内容
            search_box.send_keys(movie_name)

            # 提交搜索表单
            search_box.send_keys(Keys.RETURN)

            # 等待搜索结果加载
            # WebDriverWait(self.driver, 10).until(
            #     EC.presence_of_element_located((By.CLASS_NAME, "content_left"))
            # )
            # 二级请求
            self.driver.get(f"https://www.agedm.org/search?query={movie_name}")

            # 查找在线播放btn
            # wait = WebDriverWait(self.driver, 10)  # 10秒超时
            # # titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")
            button = self.driver.find_element(By.XPATH, "//*[@class='video_btns']/a[1]")
            # print(button)
            button.click()
            WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.CLASS_NAME, "tab-content"))
            )
            # # 等待搜索结果加载
            detail_url = self.driver.current_url
            tree = self.get_html_tree(detail_url)
            url_dict = {}
            titles = tree.xpath("//*[@class='video_detail_episode']/li/a/text()")[n - 1:]
            urls = tree.xpath("//*[@class='video_detail_episode']/li/a/@href")[n - 1:]
            for title, url in zip(titles, urls):
                url_dict[title] = url
            # print(url_dict)
            return url_dict
        except Exception as e:
            logger.error(f"搜索过程发生错误: {str(e)}")
            if self.driver:
                self.driver.quit()
                self.driver = None
            return {}

    def setup_driver(self):
        # 优化视频播放性能的设置
        options = webdriver.EdgeOptions()
        options.add_argument('--disable-gpu')  # 禁用GPU加速
        options.add_argument('--no-sandbox')  # 禁用沙箱模式
        options.add_argument('--disable-dev-shm-usage')  # 禁用/dev/shm使用
        options.add_argument('--disable-software-rasterizer')  # 禁用软件光栅化
        options.add_argument('--disable-extensions')  # 禁用扩展
        options.add_argument('--disable-infobars')  # 禁用信息栏
        options.add_argument('--autoplay-policy=no-user-gesture-required')  # 允许自动播放
        options.add_experimental_option('excludeSwitches', ['enable-automation'])  # 禁用自动化提示
        options.add_experimental_option("useAutomationExtension", False)  # 禁用自动化扩展

        # 设置正确的驱动路径
        service = EdgeService(executable_path='./msedgedriver.exe')
        return webdriver.Edge(options=options, service=service)

    def video_player(self, movie_name, n):
        try:
            url_dict = self.movies_input(movie_name, n)
            if not url_dict:
                logger.error("未找到可播放的视频链接")
                return

            if not self.driver:
                self.driver = self.setup_driver()

            for title, url in url_dict.items():
                try:
                    # 打开网站
                    self.driver.get(url)
                    # 切换到视频iframe
                    self.driver.switch_to.frame("iframeForVideo")
                    logger.info(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

                    # 等待视频元素加载
                    wait = WebDriverWait(self.driver, 20)
                    video_element = wait.until(
                        EC.presence_of_element_located((By.CLASS_NAME, "art-video"))
                    )
                    logger.info(f"找到视频元素: {video_element}")
                    logger.info(f"{movie_name}:{title}集开始播放".center(10, "="))
                    # 等待视频加载完成后执行全屏
                    page_stage = True
                    while page_stage:
                        paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)
                        print("paused_state", paused_state)
                        if not paused_state:
                            break
                    # 双击使得视频全屏显示
                    ActionChains(self.driver).double_click(video_element).perform()
                    time.sleep(3)
                    # 视频单击播放
                    paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)
                    if paused_state:
                        logger.info(f"{movie_name}:{title}触发双击全屏")
                        ActionChains(self.driver).click(video_element).perform()
                    # # 点击视频开始播放
                    # action = ActionChains(self.driver)
                    # action.move_to_element_with_offset(video_element, 100, 100).click().perform()
                    print(f"点击播放时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

                    # 等待视频播放完成
                    time.sleep(60*21)
                    n = n + 1
                    self.update_movie_count(movie_name,n)
                    logger.info(f"第{title}集播放完毕".center(10, "="))

                except Exception as e:
                    logger.error(f"播放第{title}集时发生错误: {str(e)}")
                    continue

        except Exception as e:
            logger.error(f"视频播放总体发生错误: {str(e)}")
        finally:
            if self.driver:
                self.driver.quit()
                self.driver = None

    def __del__(self):
        """确保在对象销毁时关闭driver"""
        if self.driver:
            try:
                self.driver.quit()
            except:
                pass

    def count_read(self, movies, n=1):
        # 将此文件作为播放内容的缓存
        if not os.path.exists("./count.json"):
            with open("./count.json", "w", encoding="utf-8") as f:
                self.movies_dict = {movies:n}
                json.dump({movies: n}, f, ensure_ascii=False, indent=4)
            return n
        else:
            with open("./count.json", "r", encoding="utf-8") as f:
                self.movies_dict = json.load(f)
            if not self.movies_dict.get(str(movies)):  # 读取不到movies
                self.movies_dict.update({str(movies):n})
                with open("./count.json", "w", encoding="utf-8") as f:
                    json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)
                return n
            else:  # 读取到了movie
                self.count_num = self.movies_dict[str(movies)]
                return self.count_num

    # update movies count
    def update_movie_count(self, movies, n):
        with open("./count.json", "w", encoding="utf-8") as f:
            self.movies_dict[str(movies)] = n
            json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)


if __name__ == "__main__":
    video = VideoAutoPlay()
    movie = "火影忍者 疾风传"
    # movie = "神之塔 第二季"
    count_num = video.count_read(movie, 1)
    # video.update_movie_count(movie, 4)
    video.video_player(movie, count_num)

10.scrapy框架

破电脑太老,安装不了!!

d684c278cec246cf8b9eb62f1497660d.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2273474.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

黑马头条平台管理实战

黑马头条 08平台管理 1.开始准备和开发思路1.1.开发网关1.2编写admin-gateway 代码 2.开发登录微服务2.1编写登录微服务 3.频道管理4.敏感词管理5.用户认证审核6.自媒体文章人工审核99. 最后开发中碰到的问题汇总1.关于nacos 配置 问题2.在开发频道管理新增频道后端无法接收到前…

实验四 数组和函数

实验名称 实验四 数组和函数 实验目的 &#xff08;1&#xff09;掌握一维、二维数组以及字符数组的定义、元素引用和编程方法。 &#xff08;2&#xff09;掌握字符串常用程序的设计方法。 &#xff08;3&#xff09;掌握函数定义和调用的方法&#xff0c;以及函数参数传…

Idea(中文版) 项目结构/基本设置/设计背景

目录 1. Idea 项目结构 1.1 新建项目 1.2 新建项目的模块 1.3 新建项目模块的包 1.4 新建项目模块包的类 2. 基本设置 2.1 设置主题 2.2 设置字体 2.3 设置注释 2.4 自动导包 2.5 忽略大小写 2.6 设置背景图片 3. 项目与模块操作 3.1 修改类名 3.2 关闭项目 1. I…

Android Audio基础(53)——PCM逻辑设备Write数据

1. 前言 本文,我们将以回放(Playback,播放音频)为例,讲解PCM Data是如何从用户空间到内核空间,最后传递到Codec。 在 ASoC音频框架简介中,我们给出了回放(Playback)PCM数据流示意图。: 对于Linux来说,由于分为 user space 和kernel space,而且两者之间数据不能随便…

Android Audio基础(54)——数字音频接口 I2S、PCM(TDM) 、PDM

1. 概述 本文介绍的数字音频接口全部是硬件接口,是实际的物理连线方式,即同一个PCB板上IC芯片和IC芯片之间的通讯协议。 PCM、PDM也可以用于表示音频编码格式,。编码格式是指模拟信号数字化的方式。 I2S和PCM(TDM)接口传输的数据是PCM格式的音频数据。这两种协议是最为常见…

HDFS架构原理

一、HDFS架构整体概述 HDFS是Hadoop Distribute File System 的简称&#xff0c;意为&#xff1a;Hadoop分布式文件系统。HDFS是Hadoop核心组件之一&#xff0c;作为大数据生态圈最底层的分布式存储服务而存在。HDFS解决的问题就是大数据如何存储,它是横跨在多台计算机上的文件…

windows11(或centos7)安装nvidia显卡驱动、CUDA、cuDNN

本文是我瞎搞时写的问题汇总及参考文献&#xff0c;记录了一些问题解决的进度及对问题的思考。 最近一次更新时间&#xff1a;2025年1月4日 一、安装或更新nvidia显卡驱动 首先&#xff0c;需要确保你的设备安装了最新的显卡驱动。 &#xff08;1&#xff09;centos7安装显…

2025-01-04 Unity插件 YodaSheet2 —— 基础用法

文章目录 环境配置1 创建 YadeSheetData2 读取方式2.1 表格读取2.2 列表读取 3 自定义设置3.1 修改代码生成位置3.2 添加列表支持3.2.1 修改 DataTypeMapper.cs3.2.2 修改 SheetDataExtensions.cs3.2.3 修改 CodeGeneratorEditor.cs3.2.4 测试 ​ 官方文档&#xff1a; Unity …

STM32 拓展 RTC(实时时钟)

RTC简介 RTC(Real Time Clock,实时时钟)。是一个掉电后仍然可以继续运行的独立定时器。 RTC模块拥有一个连续计数的计数器,在相应的软件配置下,可以提供时钟日历的功能。修改计数器的值可以重新设置当前时间和日期 RTC还包含用于管理低功耗模式的自动唤醒单元。 RTC实质…

微信小程序实现登录注册

文章目录 1. 官方文档教程2. 注册实现3. 登录实现4. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/路由跳转的几种方式&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/api/route/wx.switchTab…

[大模型开源]SecGPT 网络安全大模型

模型介绍 SecGPT的愿景是将人工智能技术引入网络安全领域&#xff0c;以提高网络防御的效率和效果。其使命是推动网络安全智能化&#xff0c;为社会提供更安全的数字生活环境。 ① SecGPT开源地址&#xff1a;https://github.com/Clouditera/secgpt② 模型地址&#xff1a;htt…

解决“KEIL5软件模拟仿真无法打印浮点数”之问题

在没有外部硬件支持时&#xff0c;我们会使用KEIL5软件模拟仿真&#xff0c;这是是仿真必须要掌握的技巧。 1、点击“Project”&#xff0c;然后点击“Options for target 项目名字”&#xff0c;点击“Device”,选择CPU型号。 2、点击“OK” 3、点击“Target”,勾选“Use Mi…

C语言 扫雷程序设计

目录 1.main函数 2.菜单打印menu函数 3.游戏game函数 4.宏定义 5.界面初始化 6.打印界面 7.设置雷 8.统计排查坐标周围雷的个数 9.排查雷 10.总代码 test.c代码 game.h代码 game.c代码 结语&#xff1a; 一个简单的扫雷游戏&#xff0c;通过宏定义可以修改行列的…

Excel 技巧03 - 如何对齐小数位数? (★)如何去掉小数点?如何不四舍五入去掉小数点?

这几个有点儿关联&#xff0c;我都给放到一起了&#xff0c;不影响大家分别使用。 目录 1&#xff0c;如何对齐小数位数&#xff1f; 2&#xff0c;如何去掉小数点&#xff1f; 3&#xff0c;如何不四舍五入去掉小数点&#xff1f; 1&#xff0c;如何对齐小数位数&#xff…

【大模型+本地自建知识图谱/GraphRAG/neo4j/ollama+Qwen千问(或llama3)】 python实战(中)

一、建立基本的知识图谱并导入neo4j 这里我举例用的属性表、关系表&#xff0c;大概格式如下 id名字颜色a1苹果红色 startrelenda1属于b1 启动neo4j&#xff08;关于neo4j的安装此处不再赘述&#xff09; import pandas as pd from py2neo import Graph, Node, Relationship…

量子计算遇上人工智能:突破算力瓶颈的关键?

引言&#xff1a;量子计算遇上人工智能——突破算力瓶颈的关键&#xff1f; 在数字化时代的浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;正以前所未有的速度改变着我们的生活&#xff0c;从语音助手到自动驾驶&#xff0c;从医学诊断到金融分析&#xff0c;无不彰显其…

jenkins入门12-- 权限管理

Jenkins的权限管理 由于jenkins默认的权限管理体系不支持用户组或角色的配置&#xff0c;因此需要安装第三发插件来支持角色的配置&#xff0c;我们使用Role-based Authorization Strategy 插件 只有项目读权限 只有某个项目执行权限

IWOA-GRU和GRU时间序列预测(改进的鲸鱼算法优化门控循环单元)

时序预测 | MATLAB实现IWOA-GRU和GRU时间序列预测(改进的鲸鱼算法优化门控循环单元) 目录 时序预测 | MATLAB实现IWOA-GRU和GRU时间序列预测(改进的鲸鱼算法优化门控循环单元)预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现IWOA-GRU和GRU时间序列预测…

分享3个国内使用正版GPT的网站【亲测有效!2025最新】

1. molica 传送入口&#xff1a;https://ai-to.cn/url/?umolica 2. 多帮AI 传送入口&#xff1a;https://aigc.openaicloud.cn?inVitecodeMYAAGGKXVK 3. 厉害猫 传送入口&#xff1a;https://ai-to.cn/url/?ulihaimao

Personal APP

1、Matlab 2023b https://www.bilibili.com/opus/887246540317392920 https://blog.csdn.net/qq_25719943/article/details/138096918 https://www.jokerdown.com/22886.html 2、 3、