目录
- 一、多任务概念
- 二、实现多任务方式
- 1、多进程 (Multiprocessing)
- 2、多线程(Multithreading)
- 3、协程(Coroutine)
- 三、多线程执行顺序
- 四、多线程的方法
- 1、join()
- 2、setDaemon()
- 3、threading.enumerate()
- 五、继承 Thread 类创建线程
- 六、线程间的通信(多线程共享全局变量)
- 七、互斥锁和死锁
- 1、互斥锁
- 2、死锁
- 八、生产者与消费者模式
- 1、Queue 线程队列
- 2、生产者和消费者
- 九、案例
- 1、单线程实现
- 2、多线程实现
- 十、作业
一、多任务概念
多任务(Multitasking)是指在同一时间内执行多个任务或进程的能力。它可以以不同的方式实现,包括多进程、多线程和协程等。
二、实现多任务方式
1、多进程 (Multiprocessing)
多进程是指同时运行多个独立的进程,每个进程有自己的地址空间和系统资源。多进程可以在多个处理器核心上并行执行任务,每个进程拥有独立的执行环境,相互之间不受影响。
进程(Process)
进程是计算机中运行的程序的实例。每个进程都拥有独立的内存空间和系统资源。一个进程可以包含多个线程。
2、多线程(Multithreading)
多线程是指在一个进程中同时执行多个线程的编程模型。线程是进程内的执行单元,每个线程独立执行特定的任务,但共享同一进程的内存空间。多线程编程可以提高程序的并发性和响应性。
线程(Thread)
线程是操作系统能够进行调度的最小单位。它包含了执行代码所需的上下文信息(如程序计数器、栈、寄存器等),可以独立运行和调度。多个线程可以在同一时间内执行不同的任务。
主线程(Main Thread)
主线程是程序启动时默认创建的第一个线程。主线程负责执行程序的入口点,并可以创建其它线程。
3、协程(Coroutine)
协程是一种轻量级的并发编程技术=,它可以在单线程中实现多个独立的执行流程,从而提供高效的并发和协作。与线程相比,协程的切换开销更小,且没有多线程中的锁和同步机制的复杂性。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其它地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
寄存器上下文(Register Context)
是指存储在处理器寄存器中的一组值,用于保存正在执行的程序的状态信息。寄存器上下文包含了程序计数器、栈指针、通用寄存器等寄存器的值。
并发(Concurrency)
并发是指多个任务同时进行,但不一定同时完成。在多线程编程中,线程可以并发执行,通过时间片轮转等方式实现看似同时执行的效果。(资源够用,比如三个线程,四核的 CPU。)
并行(Parallelism)
并行是指多个任务同时进行且同时完成。在多核处理器上,多个线程可以被映射到不同的核上并行执行。(比如单核 CPU 资源,同时只能运行一个任务,A 运行一段后,让给 B,B 用完继续给 A,交替使用,提高效率。)
三、多线程执行顺序
# 时间模块
import time
def task():
print("hello python")
time.sleep(1)
print("hello world")
for i in range(5):
task()
# hello python
# hello world
# hello python
# hello world
# hello python
# hello world
# hello python
# hello world
# hello python
# hello world
# 时间模块
import time
# 多线程模块
import threading
# 子线程
def task():
print("hello python")
time.sleep(1)
print("hello world")
# 主线程
if __name__ == '__main__':
for i in range(5): # 循环5次,创建了5个线程对象
# 创建线程对象,target 是执行任务
t = threading.Thread(target=task)
# 多线程为开始工作状态
t.start()
# hello python
# hello python
# hello python
# hello python
# hello python
# hello world
# hello world
# hello world
# hello world
# hello world
四、多线程的方法
1、join()
等待子线程结束之后,主线程继续执行。
谨慎使用,假设子线程当中有一个死循环,子线程不结束,主线程能不能结束。
# 时间模块
import time
# 多线程模块
import threading
# 子线程
def task():
print("hello python")
time.sleep(1)
print("hello world")
# 主线程
if __name__ == '__main__':
for i in range(5): # 循环5次,创建了5个线程对象
# 创建线程对象,target 是执行任务
t = threading.Thread(target=task)
# 多线程为开始工作状态
t.start()
# 子线程结束了才会执行后面的代码
t.join()
# hello python
# hello world
# hello python
# hello world
# hello python
# hello world
# hello python
# hello world
# hello python
# hello world
2、setDaemon()
守护线程,不会等待子线程结束。
# 时间模块
import time
# 多线程模块
import threading
# 子线程
def task():
print("hello python")
time.sleep(1)
print("hello world")
# 主线程
if __name__ == '__main__':
for i in range(5): # 循环5次,创建了5个线程对象
# 创建线程对象,target 是执行任务
t = threading.Thread(target=task)
# 守护线程:主线程结束程序就立马结束了,不会影响到主线程的运行
t.setDaemon(True)
# 多线程为开始工作状态
t.start()
# hello python
# hello python
# hello python
# hello python
# hello python
3、threading.enumerate()
查看当前线程的数量。
# 时间模块
import time
# 多线程模块
import threading
# 子线程
def sing():
for i in range(3):
print(f'正在唱歌。。。{i}')
time.sleep(0.5)
# 子线程
def dance():
for i in range(3):
print(f'正在跳舞。。。{i}')
time.sleep(0.5)
# 主线程
if __name__ == '__main__':
# 创建线程对象
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
# 开启线程
t1.start() # start 开启时,子线程才算创建
t2.start()
# 查看线程数量
# 2子1主,共3个线程
print(threading.enumerate()) # [<_MainThread(MainThread, started 8584)>, <Thread(Thread-1, started 5504)>, <Thread(Thread-2, started 18404)>]
五、继承 Thread 类创建线程
# 时间模块
import time
# 多线程模块
import threading
# 创建的是类,继承线程类,就具备线程的特性
class MyThread1(threading.Thread):
# 重写父类的 run 方法,start 触发 run 方法
def run(self):
for i in range(5):
print(f'MyThread1---{i}')
time.sleep(1)
class MyThread2(threading.Thread):
def run(self):
for i in range(5):
print(f'MyThread2---{i}')
time.sleep(1)
if __name__ == '__main__':
# 创建对象
mt = MyThread1()
mt1 = MyThread2()
# 开启线程
mt.start()
mt1.start()
六、线程间的通信(多线程共享全局变量)
在一个函数中,对全局变量进行修改的时候,如果修改了指向,必须使用 global,仅仅是修改了指向空间中的数据时,不用必须使用 global。
线程是共享全局变量的。
import threading # 导入线程模块
# 定义全局变量 num,初始值为0
num = 0
# 定义函数 task
def task():
# 在函数内部使用全局变量 num
global num
# 循环数据
for i in range(10000000): # 1千万
num += 1
# 打印当前 num 的值
print("task--num=%d" % num)
# 定义函数 task1
def task1():
# 在函数内部使用全局变量 num
global num
# 循环数据
for i in range(10000000): # 1千万
num += 1
# 打印当前 num 的值
print(f"task1 num={num}")
# 主程序
if __name__ == '__main__':
# 创建一个线程对象 t,执行函数 task
t = threading.Thread(target=task)
# 创建一个线程对象 t1,执行函数 task1
t1 = threading.Thread(target=task1)
# 启动线程 t
t.start()
# 启动线程 t1
t1.start()
# 打印当前 num 的值(在两个子线程运行之前打印)
print(f"main--num={num}")
七、互斥锁和死锁
1、互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制。
某个线程要更改共享数据时,先将其锁定,此时,资源的状态为“锁定”,其它线程不能改变,直到该线程释放资源,将资源的状态变成“非锁定”,其它的线程才能再次锁定该资源。
互斥锁保证了每次只有一个线程进入写入操作,从而保证了多线程情况下数据的正确性。
# 创建锁
mutex = threading.Lock()
# 锁定
mutex.acquire()
# 释放
mutex.release()
import threading # 导入线程模块
import time # 导入时间模块
# 定义全局变量 num,初始值为0
num = 0
# 定义函数 task
def task(nums):
# 在函数内部使用全局变量 num
global num
# 获取互斥锁,确保线程安全
mutex.acquire()
# 循环数据
for i in range(nums):
num += 1
# 释放互斥锁
mutex.release()
# 打印当前 num 的值
print("task--num=%d" % num)
# 定义函数 task1
def task1(nums):
# 在函数内部使用全局变量 num
global num
# 获取互斥锁,确保线程安全
mutex.acquire()
# 循环数据
for i in range(nums):
num += 1
# 释放互斥锁
mutex.release()
# 打印当前 num 的值
print(f"task1 num={num}")
# 主程序
if __name__ == '__main__':
# 创建互斥锁对象
mutex = threading.Lock()
# 定义 nums 的值
nums = 10000
# 创建一个线程对象 t,执行函数 task
t = threading.Thread(target=task, args=(nums,), ) # 传参,数据类型必须是元组
# 创建一个线程对象 t1,执行函数 task1
t1 = threading.Thread(target=task1, args=(nums, ))
# 启动线程 t
t.start()
# 启动线程 t1
t1.start()
# 主线程等待2秒,确保子线程执行完毕
time.sleep(2)
# 打印当前 num 的值
print(f"main--num={num}")
2、死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
八、生产者与消费者模式
1、Queue 线程队列
Queue(队列)是一个线程安全的数据结构,常用于在多线程编程中实现线程间的通信和数据共享。
Python 中的 queue 模块中提供了同步的、线程安全的队列类,包括 FIFO(先进先出)队列 Queue,LIFO(后入先出)队列 LifoQueue。
这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用,使用队列可以实现线程间的同步。
队列方法
- 初始化 Queue(maxsize):创建一个先进先出的队列。
- empty():判断队列是否为空。
- full():判断队列是否满了。
- get():从队列中取最后一个数据。
- put():将一个数据放到队列中。
from queue import Queue # 导入队列模块中的 Queue 类,用于使用队列数据结构
# 实例化对象,队列充当的是容器
# 初始化 Queue(maxsize)
q = Queue(5) # maxsize 为5,就只能存5组数据,可以存放任何类型的数据
q.put(1) # 往队列当中添加值
q.put({"key":"value"})
q.put([2, 3, 4])
q.put(3.5)
q.put(True)
# q.put(4) # 超出队列大小,程序会出现阻塞
print('----', q.qsize()) # 查看队列的大小
# 取值
print(q.get())
print(q.get())
print('----', q.qsize()) # 查看队列的大小,取出后的值不在队列中了
# 判断队列是否满了
print(q.full()) # False 3
print(q.empty()) # 判断队列是否为空,如果是空返回的是 True
2、生产者和消费者
生产者和消费者模式是多线程开发中常见的一种模式。通过这种模式,可以让代码达到高内聚低耦合的目标,线程管理更加方便,程序分工更加明确。
生产者的线程专门用来生产一些数据,然后存放到容器(中间变量)中,消费者再从这个中间的容器中取出数据进行消费。
from queue import Queue # 导入 Queue 模块,用于使用队列数据结构
import threading # 导入 threading 模块,用于多线程编程
import time # 导入 time 模块,用于时间相关操作
# 定义函数,用于向队列中存值
def set_value(q):
num = 0
while True:
# 将值放入队列
q.put(num)
# 值自增
num += 1
# 线程休眠0.5秒
time.sleep(0.5)
# 定义函数,用于从队列中获取值并打印
def get_value(q):
while True:
# 从队列中获取值并打印
print(q.get())
# 主程序
if __name__ == '__main__':
# 创建一个大小为4的队列对象
q = Queue(4)
# 创建一个子线程,调用 set_value 函数,用于存值
t1 = threading.Thread(target=set_value, args=(q,))
# 创建一个子线程,调用 get_value 函数,用于取值
t2 = threading.Thread(target=get_value, args=(q,))
# 启动线程 t1
t1.start()
# 启动线程 t2
t2.start()
九、案例
目标网站:https://qq.yh31.com/zjbq/List_48.html
需求:爬取表情包图片,并且将图片保存到文件夹中
1、单线程实现
页面分析
1、数据有多页,先获取第一页数据
2、确定 url,判断是静态加载还是动态加载
静态加载 url:https://qq.yh31.com/zjbq/List_48.html
3、解析数据
先获取到所有的 img 标签
循环遍历获取每一组的数据
4、获取翻页数据,观察 url 变化的规律
第一页:https://qq.yh31.com/zjbq/List_48.html
第二页:https://qq.yh31.com/zjbq/List_47.html
第三页:https://qq.yh31.com/zjbq/List_46.html
代码实现
import requests # 导入 requests 模块,用于发送网络请求
from lxml import etree # 导入 lxml 库中的 etree 模块,用于解析 HTML
import re # 导入 re 模块,用于正则表达式匹配
# 定义函数,用于下载图片
def download_img():
# 请求头
head = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
# 获取5页数据
for i in range(48, 43, -1):
# 目标 url
url = f'https://qq.yh31.com/zjbq/List_{i}.html'
# 发送 get 请求,获取响应对象
res = requests.get(url, headers=head)
# 设置响应编码为 utf-8
res.encoding = 'utf-8'
# 打印响应内容
# print(res.text)
# 解析响应内容
html = etree.HTML(res.text)
# 获取所有的 img 标签
images = html.xpath('//div[@class="zj_tp"]/a/img')
# 遍历循环每一个 img 标签
for img in images:
# 获取图片 url
img_url = img.xpath('@src')[0]
# 获取图片标题
img_title = img.xpath('@alt')[0]
# 使用正则表达式替换标题中的特殊字符
title = re.sub(r'[<>:?.()/\\]', '', img_title)
# 打印图片 url 和标题
# print(img_url, img_title)
# 发送 get 请求,获取图片响应
res = requests.get(img_url, headers=head)
# 打开文件,将图片内容写入到文件中
with open(f'pictures/{title}.jpg', 'wb') as f:
f.write(res.content)
print(f'{title}正在下载')
# 调用下载图片的函数
download_img()
2、多线程实现
页面分析
用生产者与消费者下载表情包
一个是生产数据类,一个是下载数据类
队列只是一个容器
代码实现
import requests # 导入 requests 库,用于发送 HTTP 请求
from lxml import etree # 导入 lxml 库,用于解析 HTML
import re # 导入 re 库,用于正则表达式操作
import threading # 导入 threading 库,用于多线程编程
from queue import Queue # 导入 Queue 类,用于创建队列
# 请求头
head = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
# 生产者类,用于获取图片链接
class Producer(threading.Thread):
# 初始化方法
def __init__(self, page_queue, img_queue):
# 必须要执行父类当中的 init 方法完成初始化
super().__init__()
# 设置页码队列属性
self.page_queue = page_queue
# 设置图片队列属性
self.img_queue = img_queue
# 重写 run 方法
def run(self):
# 循环取队列里面的数据,直到队列数据为空
while True:
# 如果页码队列为空
if self.page_queue.empty():
# 退出循环
break
# 从页码队列中获取 url
url = self.page_queue.get()
# 打印 url
# print(url)
# 调用 parse_html 方法解析页面
self.parse_html(url)
# 定义解析页面的方法
def parse_html(self, url):
# 发送 get 请求,获取响应对象
res = requests.get(url, headers=head)
# 设置响应编码为 utf-8
res.encoding = 'utf-8'
# 打印响应内容
# print(res.text)
# 解析响应内容
html = etree.HTML(res.text)
# 获取所有的 img 标签
images = html.xpath('//div[@class="zj_tp"]/a/img')
# 遍历循环每一个 img 标签
for img in images:
# 获取图片 url
img_url = img.xpath('@src')[0]
# 获取图片标题
img_title = img.xpath('@alt')[0]
# 使用正则表达式替换标题中的特殊字符
title = re.sub(r'[<>:?.()/\\]', '', img_title)
# 将图片 url 和标题作为元组放入图片队列中
self.img_queue.put((img_url, title))
# 打印图片队列的大小
# print(self.img_queue.qsize())
# 消费者类,用于下载图片
class consumer(threading.Thread):
# 初始化方法
def __init__(self, img_queue):
# 必须要执行父类当中的 init 方法完成初始化
super().__init__()
# 设置图片队列属性
self.img_queue = img_queue
# 重写 run 方法
def run(self):
# 循环取队列里面的数据,直到队列数据为空
while True:
# 打印图片队列的大小
print(self.img_queue.qsize())
# # 如果图片队列为空
# if self.img_queue.empty():
# # 退出循环
# break
# 从图片队列中获取图片数据
img_data = self.img_queue.get()
# 将图片数据解包为 url 和标题
url, title = img_data
# 发送 get 请求,获取图片响应
res = requests.get(url, headers=head)
# 打开文件,将图片内容写入到文件中
with open(f'pictures/{title}.jpg', 'wb') as f:
f.write(res.content)
print(f'{title}正在下载')
# 主程序
if __name__ == '__main__':
# 存放 url 的队列
page_queue = Queue()
# 创建图片队列
img_queue = Queue()
# 循环页码
for i in range(48, 43, -1):
# 创建 url
url = f'https://qq.yh31.com/zjbq/List_{i}.html'
# url 放入页码队列
page_queue.put(url)
# 创建空列表
lst = []
# 创建生产者
for i in range(3):
# 将存放的 url 队列传递给生产者
t = Producer(page_queue, img_queue)
# 开启线程
t.start()
# 添加线程到列表
lst.append(t)
# # join:等子线程结束了才会执行主线程的代码
# # 加 join 是生产完了再下载,不加是边生产边下载
# # 如消费者 run 方法里判断图片队列为空,就需要加 join
# for i in lst:
# i.join()
# 创建消费者
for i in range(3):
# 将图片队列传递给消费者
t1 = consumer(img_queue)
# 开启线程
t1.start()
十、作业
目标网站:https://www.fabiaoqing.com/biaoqing/lists/page/1.html
需求:爬取表情包图片,并且将图片保存到文件夹中
import requests # 导入 requests 库,用于发送 HTTP 请求
from lxml import etree # 导入 lxml 库,用于解析 HTML
import re # 导入 re 库,用于正则表达式操作
import threading # 导入 threading 库,用于多线程编程
from queue import Queue # 导入 Queue 类,用于创建队列
# 请求头
head = {
'Referer':'https://www.fabiaoqing.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
# 生产者类,用于获取图片链接
class Producer(threading.Thread):
# 初始化方法
def __init__(self, page_queue, img_queue):
# 必须要执行父类当中的 init 方法完成初始化
super().__init__()
# 设置页码队列属性
self.page_queue = page_queue
# 设置图片队列属性
self.img_queue = img_queue
# 重写 run 方法
def run(self):
# 循环取队列里面的数据,直到队列数据为空
while True:
# 如果页码队列为空
if self.page_queue.empty():
# 退出循环
break
# 从页码队列中获取 url
url = self.page_queue.get()
# 打印 url
# print(url)
# 调用 parse_html 方法解析页面
self.parse_html(url)
# 定义解析页面的方法
def parse_html(self, url):
# 发送 get 请求,获取响应对象
res = requests.get(url, headers=head)
# 设置响应编码为 utf-8
res.encoding = 'utf-8'
# 打印响应内容
# print(res.text)
# 解析响应内容
html = etree.HTML(res.text)
# 获取所有的 img 标签
images = html.xpath('//div[@class="tagbqppdiv"]/a/img')
# 遍历循环每一个 img 标签
for img in images:
# 获取图片 url
img_url = img.xpath('@data-original')[0]
# 获取图片标题
img_title = img.xpath('@alt')[0]
# 使用正则表达式替换标题中的特殊字符
title = re.sub(r'[<>:?.()/\\]', '', img_title)
# 将图片 url 和标题作为元组放入图片队列中
self.img_queue.put((img_url, title))
# 打印图片队列的大小
# print(self.img_queue.qsize())
# 消费者类,用于下载图片
class consumer(threading.Thread):
# 初始化方法
def __init__(self, img_queue):
# 必须要执行父类当中的 init 方法完成初始化
super().__init__()
# 设置图片队列属性
self.img_queue = img_queue
# 重写 run 方法
def run(self):
# 循环取队列里面的数据,直到队列数据为空
while True:
# 打印图片队列的大小
print(self.img_queue.qsize())
# # 如果图片队列为空
# if self.img_queue.empty():
# # 退出循环
# break
# 从图片队列中获取图片数据
img_data = self.img_queue.get()
# 将图片数据解包为 url 和标题
url, title = img_data
# 发送 get 请求,获取图片响应
res = requests.get(url, headers=head)
# 打开文件,将图片内容写入到文件中
with open(f'pictures/{title}.jpg', 'wb') as f:
f.write(res.content)
print(f'{title}正在下载')
# 主程序
if __name__ == '__main__':
# 存放 url 的队列
page_queue = Queue()
# 创建图片队列
img_queue = Queue()
# 循环页码
for i in range(1, 5, 1):
# 创建 url
url = f'https://www.fabiaoqing.com/biaoqing/lists/page/{i}.html'
# url 放入页码队列
page_queue.put(url)
# 创建空列表
lst = []
# 创建生产者
for i in range(3):
# 将存放的 url 队列传递给生产者
t = Producer(page_queue, img_queue)
# 开启线程
t.start()
# 添加线程到列表
lst.append(t)
# # join:等子线程结束了才会执行主线程的代码
# # 加 join 是生产完了再下载,不加是边生产边下载
# # 如消费者 run 方法里判断图片队列为空,就需要加 join
# for i in lst:
# i.join()
# 创建消费者
for i in range(3):
# 将图片队列传递给消费者
t1 = consumer(img_queue)
# 开启线程
t1.start()
记录学习过程,欢迎讨论交流,尊重原创,转载请注明出处~