python并发编程实战

news2025/1/4 19:28:56

python并发编程有三种

  • 多线程Thread
  • 多进程Process
  • 多协程Coroutine

cpu密集型计算

cpu密集型也叫计算密集型,是指I/O在很短的时间就可以完成,cpu需要大量的计算处理,特点是cpu占用率相当高

例如:压缩解压缩、加密解密、正则表达式搜索

IO密集型

IO密集型指的是系统运作大部分的状态是CPU在等I/O(硬盘/内存)的读/写操作,cpu占用率仍然较低

例如:文件处理程序、网络爬虫程序、读写数据库程序

多线程、多进程、多协程的对比

多进程

  • 优点:可以利用多核CPU并行计算
  • 缺点:占用资源最多、可启动数目比线程少
  • 适用于:CPU密集型计算

多线程

  • 优点:相比进程,更轻量级、占用资源少
  • 缺点:
    • 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
    • 相比协程:启动数目有限,占用内存资源,有线程切换开销
  • 适用于:IO密集型计算、同时运行的任务数目要求不多

多协程

  • 优点:内存开销最少、启动协程数量最多
  • 缺点:支持的库有限制(aiohttp vs request)、代码实现复杂
  • 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景

python慢的头号嫌疑犯——全局解释器锁GIL

python速度慢的原因一:动态类型语言,边解释边执行

python速度慢的原因二:GIL无法利用多核CPU并发执行

GIL是什么

全局解释器(Global Interpreter Lock)

是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行

即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程

怎么规避GIL带来的限制

1.多线程threading机制依然是有用的,用于IO密集型计算

因为在I/O期间,线程会释放GIL,实现CPU和IO的并行

因此多线程用于IO密集型计算依然可以大幅提升速度

但是多线程用于CPU密集型计算时,只会更加拖慢速度

2.使用multiprocessing的多进程机制实现并行计算、利用多核cpu优势

为了应对GIL的问题,python提供了multiprocessing

python利用多线程加速爬虫

blog_spider.py

import requests

urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 51)]


def craw(url):
    r = requests.get(url)
    print(url, len(r.text))


if __name__ == '__main__':
    craw(urls[0])

multi_thread_craw.py

import threading
import time

import blog_spider


# 单线程爬取
def single_thread():
    print("单线程爬取开始")
    for url in blog_spider.urls:
        blog_spider.craw(url)
    print("单线程爬取结束")


# 多线程爬取
def multi_thread():
    print("多线程爬取开始")
    threads = []
    for url in blog_spider.urls:
        threads.append(threading.Thread(target=blog_spider.craw, args=(url,)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("多线程爬取结束")


if __name__ == '__main__':
    start = time.time()
    single_thread()
    end = time.time()
    single_time = end - start
    print("单线程爬取时间:", single_time, "秒")
    print("------------------分割线---------------------")
    start = time.time()
    multi_thread()
    end = time.time()
    multi_time = end - start
    print("多线程爬取时间:", multi_time, "秒")
    print("时间倍数:", single_time / multi_time)

单线程爬取时间:

image-20240929143010006

多线程爬取时间:

image-20240929143040682

python实现生产者消费者爬虫

多组件的Pipeline技术架构

复杂的事情一般都不会一下子做完,而是会分很多中间步骤一步一步完成

生产者消费者爬虫的架构

image-20240929144204508

多线程数据通信的queue.Queue

queue.Queue可以用于多线程之间的、线程安全的数据通信

blog_spider.py

import requests
from bs4 import BeautifulSoup
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 51)]


def craw(url):
    r = requests.get(url)
    return r.text


def parse(html):
    # class="post-item-title"
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]


if __name__ == '__main__':
    for result in parse(craw(urls[3])):
        print(result)

producer_customer_spider.py

import queue
import random
import threading
import time

import blog_spider


# 生产者
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = blog_spider.craw(url)
        html_queue.put(html)
        print(threading.current_thread().name, f"爬取:{url}", "url_queue队列大小:", url_queue.qsize())
        time.sleep(random.randint(1, 2))


# 消费者
# 输出对象fout
def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()
        results = blog_spider.parse(html)
        for result in results:
            fout.write(str(result) + "\n")
        print(threading.current_thread().name, f"results大小:", len(results), "html_queue队列大小:", html_queue.qsize())
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in blog_spider.urls:
        url_queue.put(url)
    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"生产者{idx}")
        t.start()
    fout = open("data.txt", "w")
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"消费者{idx}")
        t.start()

线程安全问题以及Lock解决方案

线程安全

线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。

由于线程的执行随时会发生切换,就造就了不可预料的结果,出现线程不安全

Lock用于解决线程安全问题

用法一:try-finally模式

import threading
lock = threading.Lock()
lock.acquire()
try:
    #do something
finally:
    lock.release()

用法2:with模式

import threading
lock = threading.Lock()
with lock:
    #do something
线程不安全案例代码

unlock_thread.py

import threading
import time


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account: Account, amount):
   if account.balance>=amount:
       # 这里加阻塞为了保证线程不安全现象发生
       time.sleep(0.1)
       print(threading.current_thread().name,"取钱成功")
       account.balance -= amount
       print(threading.current_thread().name,"余额:",account.balance)
   else:
       print(threading.current_thread().name,"取钱失败")

if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(name="线程a", target=draw, args=(account, 800))
    tb = threading.Thread(name="线程b", target=draw, args=(account, 800))

    ta.start()
    tb.start()

image-20240929170656638

线程安全案例代码

lock_thread.py

import threading
import time

# 获取lock对象
lock = threading.Lock()


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account: Account, amount):
    with lock:
        if account.balance >= amount:
            time.sleep(0.1)
            print(threading.current_thread().name, "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name, "余额:", account.balance)
        else:
            print(threading.current_thread().name, "取钱失败")


if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(name="线程a", target=draw, args=(account, 800))
    tb = threading.Thread(name="线程b", target=draw, args=(account, 800))

    ta.start()
    tb.start()

image-20240929171145373

好用的线程池ThreadPoolExecutor

线程池的原理

新建线程系统需要分配资源、终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销

线程池的执行过程可以分为以下几个步骤‌:

  1. 核心线程数检查‌:当提交任务后,线程池首先会检查当前线程数。如果此时线程数小于核心线程数,则新建线程并执行任务。
  2. 任务队列处理‌:随着任务的不断增加,线程数会逐渐增加并达到核心线程数。此时,如果仍有任务被不断提交,这些任务会被放入阻塞队列中等待执行。
  3. 非核心线程创建‌:如果任务特别多,达到了任务队列的容量上限,线程池就会继续创建非核心线程来执行任务,直到达到最大线程数。
  4. 拒绝策略‌:当线程数达到最大线程数时,如果仍有任务提交,线程池会执行拒绝策略,如抛出异常、丢弃最旧的任务等。

image-20240929171834940

使用线程池的好处

  1. 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
  2. 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
  3. 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
  4. 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

ThreadPoolExecutor的使用语法

from concurrent.futures import ThreadPoolExecutor, as_completed

用法1:map函数,简单,注意map的结果和入参时顺序对应的

with ThreadPoolExecutor() as pool:
    results = pool.map(craw,urls)
    for result in results:
        print(result)

用法2:future模式,更强大。注意如果用as_completed顺序是不定的

with ThreadPoolExecutor() as pool:
    futures = [pool.submit(craw,url) for url in urls]
    for future in futures:
        print(future.result())
    for future in as_completed(futures):
        print(future.result())
代码演示
import concurrent.futures
import blog_spider

#爬取
with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(blog_spider.craw,blog_spider.urls)
    htmls = list(zip(blog_spider.urls,htmls))
    for url, html in htmls:
        print(url, len(html))
print("爬取结束")

# 解析
with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        future = pool.submit(blog_spider.parse,html)
        futures[future] = url
    #有序打印
    for future, url in futures.items():
        print(url, future.result())

    print("开始无序打印")
    #无序打印
    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        print(url, future.result())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2179650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Qt】开发环境与下载

这里写目录标题 1 Qt的开发工具概述2 Qt的下载2.1 下载Qt SDK 3. 认识SDK中的重要工具 1 Qt的开发工具概述 Qt支持持多种开发工具,其中⽐较常⽤的开发工具有:Qt Creator、Visual Studio、Eclipse. (1) QtCreator Qt Creator 是⼀个轻量级的跨平台集成…

iot网关是什么?iot网关在工业领域的应用-天拓四方

一、IoT网关的定义 IoT网关,即物联网网关,是物联网(IoT)系统中的重要组成部分。它主要实现感知网络与通信网络,以及不同类型感知网络之间的协议转换,既能够支持广域互联,也能满足局域互联的需求…

windows系统下Telnet工具的安装步骤

通过控制面板启用Telnet客户端 点击“确定”按钮,按照系统提示完成安装。 打开cmd,输入telnet就可以了

APISIX 联动雷池 WAF 实现 Web 安全防护

Apache APISIX 是一个动态、实时、高性能的云原生 API 网关,提供了负载均衡、动态上游、灰度发布、服务熔断、身份认证、可观测性等丰富的流量管理功能。 雷池是由长亭科技开发的 WAF 系统,提供对 HTTP 请求的安全请求,提供完整的 API 管理和…

【盘一盘】加密软件有哪些?10款电脑文件加密软件超好用推荐!让您的数据更安全!

在信息洪流中,数据安全如古战场上的坚固堡垒,至关重要。 古人云:"机密深藏,方能安身立命。" 为此,我特意搜罗了10款电脑文件加密软件,它们如同现代版的"八卦阵",既能保护…

华为/海思 Hi3516CV610 4K@20,6M@30 分辨率,1T 算力 NPU

总体介绍 Hi3516CV610 是一颗应用在安防市场的 IPC SoC 。在开放操作系统、新一代视频编解码标准、 网络安全和隐私保护、人工智能方面引领行业发 展,主要面向室内外场景下的枪机、球机、半球 机、海螺机、枪球一体机、双目长短焦机等产品 形态,打…

Spring - @Import注解

文章目录 基本用法源码分析ConfigurationClassPostProcessorConfigurationClass SourceClassgetImportsprocessImports处理 ImportSelectorImportSelector 接口DeferredImportSelector 处理 ImportBeanDefinitionRegistrarImportBeanDefinitionRegistrar 接口 处理Configuratio…

全同态加密算法概览

我们前面有谈到《Paillier半同态加密算法》,半同态加密算法除了支持密文加法运算的 Paillier 算法,还有支持密文乘法计算的 RSA 算法,早期的PSI(隐私求交)和PIR(匿踪查询)都有使用基于RSA盲签名技术来实现。今天我们来谈谈能够有效支持任意函…

【Git原理与使用】分支管理

分支管理 1.理解分支2.创建分支2.1创建分支2.2切换分支2.3合并分支 3.删除分支4.合并冲突4.分支管理策略5.分支策略6.bug分支7.删除临时分支8.小结 点赞👍👍收藏🌟🌟关注💖💖 你的支持是对我最大的鼓励&…

美客多自养号测评的常见问题与解决方案,从零开始的技术指南

美客多(MercadoLibre)主要专注于拉丁美洲市场。涵盖了多个国家,包括阿根廷、巴西、墨西哥、智利、哥伦比亚等,在这些国家占据了重要份额。对于卖家来说,要充分了解平台的特点和市场需求,制定合理的营销策略,不断提升自…

vue使用高德地图,点标记+轨迹

<template><!-- 轨迹--><divv-if"visible"ref"pageTotal"v-loading"loading"class"page-total"><div><divref"pageHead"class"page-head"><div class"head-title" /&…

《最高人民法院关于审理民间借贷案件适用法律若干问题的规定》(最新)民间借贷司法解释全文

原文地址 编辑于&#xff1a; 贵格律师事务所 2024年06月11日 16:04 上海 “ 为正确审理民间借贷纠纷案件&#xff0c;根据《中华人民共和国民法典》《中华人民共和国民事诉讼法》《中华人民共和国刑事诉讼法》等相关法律之规定&#xff0c;结合审判实践&#xff0c;制定本规定…

AI生成垃圾内容对互联网的冲击与应对:一场持续扩展的危机

引言 随着生成式AI技术的迅猛发展&#xff0c;人工智能在内容生产上的应用已深刻改变了互联网的内容生态。越来越多由AI生成的低质量内容正在淹没搜索引擎、内容社区、甚至学术领域。通过自动化工具&#xff0c;创建大规模虚假账号矩阵、批量生成无价值信息、甚至操纵搜索引擎…

叉车防撞系统方案,引领安全作业新时代

在现代工业的舞台上&#xff0c;叉车如同忙碌的“搬运工”&#xff0c;在仓储和制造环境中发挥着不可或缺的作用。然而&#xff0c;随着叉车使用频率的不断攀升&#xff0c;安全事故也如影随形&#xff0c;给企业带来经济损失的同时&#xff0c;更严重威胁着操作人员的生命安全…

大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

详解DNS工作原理及实例分析

DNS概述 互联网上的主机能够互相识别、访问&#xff0c;通过分配IP地址的方式进行的。全球主机数量众多&#xff0c;人们要记住像202.12.23.203这样的IP地址&#xff0c;不仅记不住&#xff0c;而且容易出错&#xff0c;于是&#xff0c;人们采用域名的形式&#xff0c;如网易…

详解 Spring Boot 的 RedisAutoConfiguration 配置

引言 带大家分析 Spring Boot 内置的有关 Redis 的自动配置类【RedisAutoConfiguration】。 1. Spring Data Redis Spring Data Redis 是 Spring Data 家族的一部分&#xff0c;它提供了从 Spring 应用程序中轻松配置和访问 Redis 的功能。 我们来看看官方介绍的特性&#xff…

速通数据结构与算法第七站 排序

系列文章目录 速通数据结构与算法系列 1 速通数据结构与算法第一站 复杂度 http://t.csdnimg.cn/sxEGF 2 速通数据结构与算法第二站 顺序表 http://t.csdnimg.cn/WVyDb 3 速通数据结构与算法第三站 单链表 http://t.csdnimg.cn/cDpcC 4 速通…

基于springboot+小程序的医院核酸检测服务管理系统(医院2)(源码+sql脚本+视频导入教程+文档)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 基于springboot小程序的医院核酸检测服务管理系统实现了管理员、用户管理、普通管理员、医护人员。 1、管理员实现了首页、用户管理、医护人员管理、普通管理员、通知公告管理、疫苗接种…

【Postgresql】安装新手教程

在以下postgresql官网下载软件 https://www.enterprisedb.com/downloads/postgres-postgresql-downloads下载完成后安装&#xff0c;找个记事本记录下安装过程中填写的数据库管理原的password和port 在所有程序目录中打开pgadmin 输入刚才的数据库管理员密码 自动跳转到以下…