Python中的多进程、多线程、协程

news2024/12/28 6:41:57

Python中的多线程、多进程、协程

一、概述

1. 多线程Thread (threading):

  • 优点:同一个进程中可以启动多个线程,充分利用IO时,cpu进行等待的时间
  • 缺点:相对于进程,多线程只能并发执行,不能利用多CPU,相对于协程,多线程的启动数目有限 ,占用内存资源,并且有线程切换的时间开销
  • 使用场景IO密集型计算、同时运行的任务数据要求不多

2. 多进程Process(multiprocessing):

  • 优点:可以利用多核CPU进行并行计算
  • 缺点:占用资源最多,可启动的数目比线程少
  • 使用场景CPU密集型计算

3. 协程Coroutine(asyncio):

  • 优点:内存开销最少、启动协程的数量最多
  • 缺点:支持的库有限制 (request对用的能使用协程的库为:aiohttp)、代码实现复杂
  • 使用场景IO密集型计算、超多任务运行、有现成的库支持的场景

4. Python比C/C++/Java速度慢的原因:

  1. Python是动态类型语言,边解释边执行。
  2. GIL,无法利用多核CPU并发 执行。

5. GIL

GIL名为:全局解释器锁(Global Interpreter Lock 缩写为: GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行,即便在多核心处理器上,使用GIL的解释器也只允许同一时刻执行一个线程。这是为了解决多线程之间数据完整性的状态同步问题。

6. 创建工具代码:

这个代码是一个基本的爬虫代码,创建这个代码的目的时为了下面实现多线程或多进程的时候让代码显的更加简洁,进而能更清晰的观察到多线程或多进程的实现方式。

utils.blog_spider.py

import requests
from bs4 import BeautifulSoup

urls = [
    f'https://www.cnblogs.com/#p{page}'
    for page in range(1, 50 + 1)
]

# 请求url,得到html
def craw(url):
    r = requests.get(url)
    # print(url, len(r.text))
    return r.text

# 解析html
def parse(html):
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link['href'], link.get_text()) for link in links]


if __name__ == '__main__':
    for restult in parse(craw(urls[2])):
        print(restult)

二、多线程

1. 创建多线程的一般流程:

  1. 准备一个函数

    def my_func(a, b):
    	doing(a, b)
    
  2. 创建一个线程

    import threading  # 导入线程包
    t = threading.Threading(target=my_func, args=(100, 200))  # 第一个参数为一个函数名,第二个参数时传入函数的参数
    
  3. 启动线程

    t.start()
    
  4. 等待结束

    t.join()
    

代码示例:

# 多线程爬虫
import threading
import time

import utils.blog_spider as bg


def signal_thread():
    for url in bg.urls:
        bg.craw(url)


def multi_thread():
    threads = []
    for url in bg.urls:
        threads.append(threading.Thread(target=bg.craw, args=(url,)))  # 添加线程
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


if __name__ == '__main__':
    start = time.time()
    signal_thread()
    end = time.time()
    print("signal_thread cosst :", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi_thread cosst :", end - start, "seconds")

输出结果:

在这里插入图片描述

在这里插入图片描述

2. 多线程之间的数据通信queue.Queue

queue.Queue可以用于多线程之间、线程安全的数据通信。

具体流程如下 :

  1. 导入队列库

    import queue
    
  2. 创建Queue

    q = queue.Queue()
    
  3. 添加元素

    q.put(item)
    
  4. 获取元素

    item = q.get()
    
  5. 查询状态

    # 查看元素的多少
    q.qsize()
                   
    # 判断是否为空
    q.empty()
                   
    # 判断是否已满
    q.full()
    

代码示例:

# 多线程数据通信
import queue
import utils.blog_spider as bg
import time
import random
import threading


def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = bg.craw(url)
        html_queue.put(html)   # 加入队列
        print(threading.current_thread().name, f"craw{url}", "url_queue.size=", url_queue.qsize())
        time.sleep(random.randint(1, 2))  # 进行随机休眠


def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()  # 从队列中取数据
        results = bg.parse(html)
        for result in results:
            fout.write(str(result) + '\n')
        print(threading.current_thread().name, "results.size=", len(results), "html_queue.size=", html_queue.qsize())
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in bg.urls:
        url_queue.put(url)
    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
        t.start()
    fout = open("data.txt", "w")
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
        t.start()

在这里插入图片描述

3. 线程安全

由于线程的执行随时会切换,这会造成不可预料的结果,出现线程不安全的情况。

Lock用于解决线程安全问题,对线程进行加锁,这样会使得该线程运行结束之后在切换线程。

用法1:try-finally模式

import threading
lock = threading.Lock()   # 设置线程锁
lock.acquire()  # 获得锁
try:
    # do something
finally:
    lock.realse()   # 释放锁

用法2:with模式

import threading
lock = threading.Lock()
with lock:
    # do something

sleep语句一定会导致当前线程阻塞,会进行线程的切换(加锁则不会进行进程的切换)。

代码示例:

# 线程安全
import threading
import time

lock = threading.Lock()


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    with lock:
        if account.balance >= amount:
            time.sleep(0.1)
            print(threading.current_thread().name, "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name, "余额", account.balance)
        else:
            print(threading.current_thread().name, "取钱失败,余额不足")


if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(name="ta", target=draw, args=(account, 800))
    tb = threading.Thread(name='tb', target=draw, args=(account, 800))

    ta.start()
    tb.start()

在这里插入图片描述

4. 线程池

线程池中有一些线程,新来的任务放在任务队列中,在线程池中的线程空闲的时候会自动处理任务队列里的任务。

使用线程池的好处:

  1. 提升性能
    因为减去了大量新建、终止线程的开销,重用了线程资源;
  2. 使用场景
    适合处理突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
  3. 防御功能
    能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢的问题。
  4. 代码优势
    使用线程池的语法比自己新建线程执行线程更加简洁。

使用方法:

  1. 用法1map函数,注意map结果和入参是顺序对应的

    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor() as pool:
        results = pool.map(craw, urls)
        for result in results:
            print(result)
    
  2. 用法2future模式更强大,注意如果用as_complete方法,这样的线程执行顺序是不固定的,但是相应的效率会更高。(as_completed方法是不管哪个线程先执行完了,都会直接返回,不用按照顺序返回)

    with ThreadPoolExecutor() as pool:
        futures = [pool.submit(craw, url) for url in urls]
        for future in futures:
            print(future.result())
        for future in as_complated(futures):
            print(future.result())
    

代码示例:

import concurrent.futures
import utils.blog_spider as bg

# 进行爬取html
with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(bg.craw, bg.urls)  # 加入线程池
    htmls = list(zip(bg.urls, htmls))
    for url, html in htmls:
        print(url, len(html))
    print("craw over")

# 进行解析html
with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        future = pool.submit(bg.parse, html)  # 加入线程池
        futures[future] = url
    for future, url in futures.items():
        print(url, future.result())

    # 顺序不定
    # for future in concurrent.futures.as_completed(futures):
    #     url = futures[future]
    #     print(url, future.result())

三、多进程

CPU密集型计算线程的自动的切换反而变成了负担,多线程甚至减慢了运行速度。multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行。

多进程multiprocessing知识梳理(对比多线程threading

多线程:

  1. 引入模块

    from threading import Thread
    
  2. 新建

    t = Thread(target=func, args=(100, ))
    
  3. 启动

    t.start()
    
  4. 等待结束

    t.join()
    
  5. 数据通信

    import queue
    q = queue.Queue()
    q.put(item)
    item = q.get()
    
  6. 线程安全加锁

    from threading import Lock
    lock = Lock()
    with lock:
        # do something
    
  7. 池化技术

    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor() as executor:
        # 方法1
        results = executor.map(func, [1, 2, 3])
        # 方法2
        future = executor.submit(func, 1)
        result = future.result()
    

多进程:

  1. 引入模块:

    from multiprocessing import Process
    
  2. 新建

    p = process(target=f, args=('bob', ))
    
  3. 启动

    p.start()
    
  4. 等待结束

    p.join()
    
  5. 数据通信

    from multiprocessing import Queue
    q = Queue()
    q.put([42, None, 'hello'])
    item = q.get()
    
  6. 线程安全加锁

    from  multiprocessing import Lock
    lock = Lock()
    with lock:
        # do something
    
  7. 池化技术

    from concurrent.futures import ProcessPoolExecutor
    with ProcessPoolExecutor() as executor:
        # 方法1:
        results = executor.map(func, [1, 2, 3])
        # 方法2:
        future = executor.submit(func, 1)
        result = funture.result()
    

代码示例:

import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor  # 导入线程池与进程池
import time

PRIMES = [112272535095293] * 100


def is_prime(n):
    '''
    判断一个数是否是素数
    :param n:
    :return:
    '''
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def signal_thread():
    '''
    单线程
    :return:
    '''
    for number in PRIMES:
        is_prime(number)


def multi_thread():
    '''
    多线程
    :return:
    '''
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


def multi_process():
    '''
    多进程
    :return:
    '''
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


if __name__ == '__main__':
    start = time.time()
    signal_thread()
    end = time.time()
    print("signal_thread, cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi_thread, cost:", end - start, "seconds")


    start = time.time()
    multi_process()
    end = time.time()
    print("multi_process, cost:", end - start, "seconds")

输出结果:

在这里插入图片描述

四、协程

协程是在单线程内实现并发

  • 核心原理:用一个超级循环(其实就是while Treu)循环
  • 配合IO多路复用原理(IOCPU可以干其他事情)

异步IO库介绍:asyncio

1. 创建协程的一般流程:

  1. 导入库

    import asyncio
    
  2. 获取事件循环

    loop = asyncio.get_enent_loop()
    
  3. 定义协程

    async def myfunc(url):
        await get_url(url)
    
  4. 创建task任务列表

    task = [loop.create_task(myfunc(url)) for url in urls]
    
  5. 执行爬虫事件列表

    loop.run_until_complete(asyncio.wait(tasks))
    

注意:要用在异步IO编程中,依赖的库必须支持异步IO特性,爬虫引用中:requests不支持异步,需要用aiohttp

代码示例:

import asyncio
import aiohttp
import utils.blog_spider as bg   # 该模块是工具代码中的模块
import time


async def async_craw(url):
    print("craw url:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}, {len(result)}")


loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url=url))   # 添加任务
         for url in bg.urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))   # 启动协程
end = time.time()
print("use time seconds :", end - start)

在这里插入图片描述

2. 控制协程的并发度

可以使用信号量(semaphore)来控制并发度

实现方式1:

sem = asyncio.Semaphore(10)
# later
async with sem:
    # work with shared resource

实现方式2:

sem = asyncio.Semaphore(10)
# later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1660661.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows:管理用户账户,密码策略和安全配置

在Windows操作系统中&#xff0c;管理用户账户和密码策略是确保系统安全的关键步骤。本文将探讨如何通过PowerShell和其他Windows工具管理用户账户&#xff0c;包括查看和设置密码策略、检查用户状态&#xff0c;以及导出和导入安全策略。这些管理任务对于系统管理员尤其重要&a…

STM32学习和实践笔记(25):USART(通用同步、异步收发器)

一&#xff0c;STM32的USART简介 USART即通用同步、异步收发器&#xff0c;它能够灵活地与外部设备进行全双工数据交换&#xff0c;满足外部设备对工业标准 NRZ 异步串行数据格式的要求。 UART即通用异步收发器&#xff0c;它是在USART基础上裁剪掉了同步通信功能。 开发板上…

Star-CCM+分配零部件至区域2-根据零部件的特性分组分配零部件至区域

前言 前文已经讲解了将零部件分配至区域的方法。其中有一种方法是"将所有部件分配到一个区域"。在工程应用中&#xff0c;有时会把同一种类型的部件分配到一个区域&#xff0c;因此在一个项目中有可能需要多次进行"将所有部件分配到一个区域"。如在电机温…

主机通过带光发端和ops接收端控制屏串口调试记录

场景就是主机电脑使用cutecom通过光纤口再到ops接收端从而控制屏过程 光纤口有个发送端波特率&#xff0c;Ops有接收端波特率&#xff0c;屏有自己的波特率&#xff0c;主机电脑可以通过发串口指令去设置发送端波特率和ops接收端波特率。因为主机只有一个&#xff0c;屏有多种…

【机器学习300问】82、RMSprop梯度下降优化算法的原理是什么?

RMSprop&#xff0c;全称Root Mean Square Propagation&#xff0c;中文名称“均方根传播”算法。让我来举个例子给大家介绍一下它的原理&#xff01; 一、通过举例来感性认识 建议你第一次看下面的例子时忽略小括号里的内容&#xff0c;在看完本文当你对RMSprop有了一定理解时…

智能助手上线,大模型提供云服务专属顾问

业务背景 在使用云服务的时候&#xff0c;当您遇到复杂问题&#xff0c;如配置、关联或计费方式不明确时&#xff0c;可能需要向客服提交工单进行技术沟通。在漫长的工作过程中&#xff0c;耗费了宝贵的时间和精力。 2024 年 4 月&#xff0c;百度智能云正式推出了融合文心大…

嵌入式学习<1>:建立工程、GPIO和keil仿真

嵌入式学习_part1 本部分笔记用于学习记录&#xff0c;笔记源头 >>b站江科大_STM32入门教程_新建工程 建立工程、GPIO 开发环境&#xff1a;keil MDK、STM32F103C8T6 1 &#xff09;建立工程 &#xff08;1&#xff09;基于寄存器开发、基于标准库 或者 基于HAL库开…

学习网络需要认识的各种设备

网桥&#xff08;bridge&#xff09; 网桥工作在数据链路层&#xff0c;可以把多个局域网连接起来&#xff0c;组成一个更大的局域网 以太网中&#xff0c;数据链路层地址就是mac地址&#xff0c;网桥与集线器的区别就是&#xff0c;网桥会过滤mac&#xff0c;只有目的mac地址…

财务管理|基于SprinBoot+vue的财务管理系统(源码+数据库+文档)

财务管理系统 目录 基于SprinBootvue的财务管理系统 一、前言 二、系统设计 三、系统功能设计 系统功能实现 1管理员功能模块 2员工功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1…

Word设置代码块格式

前言 Word中无法像Markdown和LaTeX一样插入代码块&#xff0c;若要在Word中插入代码块可以手动设置代码块格式或自动粘贴代码块格式。若不追求完美高亮效果&#xff0c;可使用前者方案&#xff1b;若追求完美的高亮效果&#xff0c;可使用后者方案。下文介绍这2种方案。 手动…

同时安装多个nodejs版本可切换使用,或者用nvm管理、切换nodejs版本(两个详细方法)

目录 一.使用nvm的方法&#xff1a; 1.卸载nodejs 2.前往官网下载nvm 3.安装nvm 4.查看安装是否完成 5.配置路径和淘宝镜像 6.查看和安装各个版本的nodejs 7.nvm的常用命令 二.不使用nvm&#xff0c;安装多个版本&#xff1a; 1.安装不同版本的nodejs 2.解压到你想放…

生信技能45 - 基于docker容器运行生信软件

1. 获取docker镜像 以运行xhmm CNV分析软件为例。 # 搜索仓库镜像 sudo docker search xhmm# 拉取镜像 sudo docker pull ksarathbabu/xhmm_v1.0# 启动镜像,非后台 sudo docker run -it ksarathbabu/xhmm_v1.0 /bin/bash # -i: 交互式操作。 # -t: 终端。 # ksarathbabu/xhmm…

鲁棒控制:鲁棒性能

鲁棒控制&#xff08;Robust Control&#xff09;是一种控制系统设计方法&#xff0c;其目标是使控制系统在面临参数摄动、外部干扰、建模误差等不确定性因素时&#xff0c;仍能够保持其期望的性能特性。鲁棒性是指控制系统在一定&#xff08;结构&#xff0c;大小&#xff09;…

分布式光伏管理平台功能介绍

一、项目管理系统 1、关键信息&#xff1a;板块化展现项目关键信息&#xff0c;包含所在区域、屋面类型、未来25年发电量、累计收益等信息。 (1) 可迅速获取项目核心要点 (2) 及时跟进修改&#xff0c;凸显项目信息 (3) 项目信息清晰展现&#xff0c;了解整体项目流程 2、项…

QQ超大文件共享(别用,传进去后,压缩都显示不出来,LJ qq!)(共享文件)

文章目录 需要共享双方同时在线开启方法第一次会提示设置默认共享目录&#xff0c;默认是E:\QQFileShare\<qq号>\&#xff1a;然后新建共享会在其后创建共享目录&#xff0c;共享目录中只能共享文件。需要点击添加文件&#xff0c;直接把文件拷贝到目录里好像还不行&…

激光雷达在智能自动装车系统中的关键技术

智能自动装车系统是现代物流领域的重要发展方向之一&#xff0c;而激光雷达作为其中的关键技术之一&#xff0c;发挥着至关重要的作用。 一、激光雷达在智能自动装车系统中的关键技术 三维点云处理&#xff1a;激光雷达通过获取目标车辆的三维点云数据&#xff0c;可以构建出…

从零入门激光SLAM(十三)——LeGo-LOAM源码超详细解析4

大家好呀&#xff0c;我是一个SLAM方向的在读博士&#xff0c;深知SLAM学习过程一路走来的坎坷&#xff0c;也十分感谢各位大佬的优质文章和源码。随着知识的越来越多&#xff0c;越来越细&#xff0c;我准备整理一个自己的激光SLAM学习笔记专栏&#xff0c;从0带大家快速上手激…

六一儿童节活动方案策划怎么写?

六一儿童节活动方案策划不难&#xff0c;一般看前人策划的案例就可以仿写一篇充满创意的儿童节活动方案。 当然&#xff0c;你也可以照着下面的模版直接写&#xff1a; 成年人的时间是离弦的箭 向着目标,一往无前 孩子的时间是旋转木马 载着今天和明天转啊转啊圈圈 成年人…

互联网轻量级框架整合之SpringIoC概念详解

在之前的几篇文字中说道容器的概念&#xff0c;实际上Spring也是基于容器的理念&#xff0c;之所以如此成功并不是因为很先进的技术&#xff0c;而是因为理念&#xff0c;其中核心便是IoC(控制反转)&#xff0c;AOP(面向切面编程)&#xff0c;其中IoC是Spring的基础&#xff0c…

idea使用前的全局配置,一次配置,多次使用

前提&#xff1a;每次导入一个新的项目&#xff0c;就需要重新设置编码、maven、jdk、git版本等信息。实际每个项目所用到的配置信息是一致的&#xff0c;除非换一家公司&#xff0c;不然不会改动到这些内容。 idea版本&#xff1a;2024.1.1 1.1、全局Maven配置 IDEA启动页面…