Python并发编程简介

news2024/9/28 9:24:02

1、Python对并发编程的支持

  • 多线程: threading, 利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
  • 多进程: multiprocessing, 利用多核CPU的能力,真正的并行执行任务
  • 异步IO: asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
  • 使用Lock对资源加锁,防止冲突访问
  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
  • 使用subprocess启动外部程序的进程,并进行输入输出交互

2、 怎样选择多线程多进程多协程

Python并发编程有3三种方式:多线程Thread、多进程Process、多协程Coroutine

2.1 什么是CPU密集型计算、IO密集型计算?

CPU密集型(CPU-bound ) :
CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高
例如:压缩解压缩、加密解密、正则表达式搜索
IO密集型(I/0 bound):
IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存)的读/写操作,CPU占用率仍然较低。
例如:文件处理程序、网络爬虫程序、读写数据库程序(依赖大量的外部资源)

2.2 多线程、多进程、多协程的对比

一个进程中可以启动N个线程,一个线程中可以启动N个协程
多进程Process ( multiprocessing )

  • 优点:可以利用多核CPU并行运算
  • 缺点:占用资源最多、可启动数目比线程少
  • 适用于:CPU密集型计算

多线程Thread ( threading)

  • 优点:相比进程,更轻量级、占用资源少
  • 缺点:
    相比进程:多线程只能并发执行,不能利用多CPU ( GIL )
    相比协程:启动数目有限制,占用内存资源,有线程切换开销
  • 适用于: IO密集型计算、同时运行的任务数目要求不多

多协程Coroutine ( asyncio )

  • 优点:内存开销最少、启动协程数量最多
  • 缺点:支持的库有限制(aiohttp VS requests )、代码实现复杂
  • 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景

2.3 怎样根据任务选择对应技术?

待执行任务
任务特点
CPU密集型
使用多进程
multiprocessing
IO密集型
1、需要超多任务量?
2、有现成协程库支持?
3、协程实现复杂度可接受?
使用多线程
threading
使用多协程
asyncio
 flowchart LR
 A[待执行任务]-->B{任务特点}
 B --- C(CPU密集型)
C-->E(["使用多进程
			multiprocessing"])
 B ---D(IO密集型)

D-->F{"1、需要超多任务量?
			2、有现成协程库支持?
			3、协程实现复杂度可接受?"}
F -- 否---> G(["使用多线程
			threading"])
F -- 是---> H(["使用多协程
			asyncio"])

%% style选项为每个节点设置了颜色和边框样式
style A fill:#333399,color:#fff
style B fill:#fff,stroke:#CC6600
style C fill:#FFFFCC
style D fill:#FFFFCC
style E fill:#FF9999
style F fill:#fff,stroke:#CC6600
style G fill:#FF9999
style H fill:#FF9999

%% linkStyle选项为连接线设置颜色和样式
linkStyle 1 stroke:blue,stroke-width:1px;
linkStyle 3 stroke:blue,stroke-width:1px;

3、 Python速度慢的罪魁祸首——全局解释器锁GIL

3.1 Python速度慢的两大原因

相比C/C+ +/JAVA, Python确实慢,在一些特殊场景 下,Python比C+ +慢100~ 200倍
由于速度慢的原因,很多公司的基础架构代码依然用C/C+ +开发
比如各大公司阿里/腾讯/快手的推荐引擎、搜索引擎、存储引擎等底层对性能要求高的模块
Python速度慢的原因1
动态类型语言,边解释边执行
Python速度慢的原因2
GIL ,无法利用多核CP并发执行

3.2 GIL是什么?

全局解释器锁( 英语: Global Interpreter Lock,缩写GIL)
是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。
即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程。
在这里插入图片描述
由于GIL的存在,即使电脑有多核CPU,单个时刻也只能使用1个,相比并发加速的C+ +/JAVA所以慢

3.3 为什么有GIL .这个东西?

简而言之: Python设计初期,为了规避并发问题引入了GIL,现在想去除却去不掉了!
为了解决多线程之间数据完整性和状态同步问题
Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象
开始:线程A和线程B都引用了对象obj,obj.ref_ num = 2,线程A和B都想撤销对obj的引用
在这里插入图片描述
在这里插入图片描述
GIL确实有好处:简化了Python对共享资源的管理;

3.4 怎样规避GIL带来的限制?

多线程threading机制依然是有用的,用于IO密集型计算
因为在I/O (read,write ,send,recv,etc. )期间,线程会释放GIL,实现CPU和IO的并行
因此多线程用于IO密集型计算依然可以大幅提升速度
但是多线程用于CPU密集型计算时,只会更加拖慢速度
使用multiprocessing的多进程机制实现并行计算、利用多核CPU优势
为了应对GIL的问题,Python提供了multiprocessing

4、使用多线程,爬虫被加速

4.1 Python创建多线程的方法

1、准备一个函数

def my_func(a,b):
	do_craw(a,b)

2、创建一个线程

import threading
t = threading.Thread(target=my_func,args=(100,200))

3、启动线程

t.start()

4、等待结束

t.join()

4.2 改写爬虫程序,变成多线程爬取

blog_spider.py程序

import requests

urls = [
    f"https://www.cnblogs.com/sitehome/p/{page}"
    for page in range(1, 50 + 1)
]

def craw(url):
    r = requests.get(url)
    print(url, len(r.text))
import blog_spider
import threading
import time


def single_thread():
    print("single_thread begin...")
    for url in blog_spider.urls:
        blog_spider.craw(url)
    print("single_thread end...")


def multi_thread():
    print("multi_thread begin...")
    threads = []
    for url in blog_spider.urls:
        threads.append(threading.Thread(target=blog_spider.craw,args=(url,)))

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("multi_thread end...")


if __name__ == '__main__':
    start = time.time()
    single_thread()
    end=time.time()
    print("single thread cost:",end-start,"seconds")

    begin = time.time()
    multi_thread()
    finish = time.time()
    print("multi thread cost:", finish - begin, "seconds")

4.3 速度对比:单线程爬虫VS多线程爬虫

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5 Python实现生产者消费者爬虫

5.1 多组件的Pipeline技术架构

复杂的事情一般都不会一下子做完, 而是会分很多中间步骤一步步完成。
在这里插入图片描述

消费者
生产者
输入数据
中间数据
中间数据
输出数据
处理器N
处理器1
处理器X
很多个
graph LR
O[ ] --> |输入数据|A(处理器1)-->|中间数据|B(处理器X<br>很多个)-->|中间数据|C(处理器N)-->|输出数据|D[ ]

5.2 生产者消费者爬虫的架构

5.3 多线程数据通信的queue.Queue

5.4 代码编写实现生产者消费者爬虫

import requests
from bs4 import BeautifulSoup


urls = [
    # f"https://www.cnblogs.com/#p{page}"
    f"https://www.cnblogs.com/sitehome/p/{page}"
    for page in range(1, 3 + 1)
]



def craw(url):
    r = requests.get(url)
    return r.text


def parse(html):
    soup=BeautifulSoup(html,'html.parser')
    links = soup.find_all('a',class_="post-item-title")
    return [(link["href"],link.get_text()) for link in links]


if __name__ == '__main__':
    for result in parse(craw(urls[0])):
        print(result)
import blog_spider
import threading
import time
import queue
import random


def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = blog_spider.craw(url)
        html_queue.put(html)
        print(threading.current_thread().name, f"craw {url}",
              "url_queue.size=", url_queue.qsize())
        # time.sleep(random.randint(1, 2))


def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()
        results = blog_spider.parse(html)
        for result in results:
            fout.write(str(result) + "\n")
        print(threading.current_thread().name, f"results.size {len(results)}",
              "html_queue.size=", html_queue.qsize())
        # time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in blog_spider.urls:
        url_queue.put(url)

    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
        t.start()


    fout = open("02.data.txt", 'w', encoding='utf-8')

    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
        t.start()

6、Python线程安全问题以及解决方案

import threading
import time

lock = threading.Lock()


class Accout():
    def __init__(self,balance):
        self.balance=balance

def draw(accout,amount):
    with lock:
        if accout.balance>=amount:
            time.sleep(0.1)
            print(threading.current_thread().name,"取钱成功")
            accout.balance-=amount
            print(threading.current_thread().name, "余额",accout.balance)
        else:
            print(threading.current_thread().name, "取钱失败,余额不足")

if __name__ == '__main__':
    accout=Accout(1000)
    ta = threading.Thread(target=draw,args=(accout,600))
    tb = threading.Thread(target=draw, args=(accout, 600))

    ta.start()
    tb.start()

    ta.join()
    tb.join()

7、Python好用的线程池ThreadPoolExecutor

import concurrent.futures
import blog_spider

with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(blog_spider.craw, blog_spider.urls)
    htmls = list(zip(blog_spider.urls, htmls))
    for url, html in htmls:
        print(url, len(html))

print("爬虫结束..")

with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        future = pool.submit(blog_spider.parse, html)
        futures[future] = url

    # for future,url in futures.items():
    #     print(url,future.result())

    for future in concurrent.futures.as_completed(futures):
        # url = futures[future]
        futures[future] = url
        print(url, future.result())

8、Python使用线程池在Web服务中实现加速

import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
app = flask.Flask(__name__)
pool = ThreadPoolExecutor()

def read_db():
    time.sleep(0.2)
    return "db result"


def read_file():
    time.sleep(0.1)
    return "file result"


def read_api():
    time.sleep(0.3)
    return "api result"


@app.route("/")
def index():
    result_file=pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)

    return json.dumps({
        "result_file":result_file.result(),
        "result_db": result_db.result(),
        "result_db": result_api.result(),
    })


if __name__ == '__main__':
    app.run()

9、使用多进程multiprocessing模块加速程序的运行

import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

PRIMES = [112272535095293] * 10

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def single_thread():
    for num in PRIMES:
        is_prime(num)


def multi_thread():
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


def multi_process():
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


if __name__ == '__main__':
    start = time.time()
    single_thread()
    end = time.time()
    print("single thread cost:", end - start, "secend")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi thread cost:", end - start, "secend")

    start = time.time()
    multi_process()
    end = time.time()
    print("multi process cost:", end - start, "secend")

在这里插入图片描述

10、Python在Flask服务中使用多进程池加速程序运行

import flask
import math
import json
from concurrent.futures import ProcessPoolExecutor

app = flask.Flask(__name__)


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
    number_list = [int(x) for x in numbers.split(",")]
    results = process_pool.map(is_prime, number_list)
    return json.dumps(dict(zip(number_list, results)))


if __name__ == '__main__':
    process_pool = ProcessPoolExecutor()
    app.run()

11、Python异步IO实现并发爬虫

import asyncio
import aiohttp
import blog_spider
import time


async def async_craw(url):
    print("爬虫开始:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url:{url},{len(result)}")


loop = asyncio.get_event_loop()

tasks = [loop.create_task(async_craw(url)) for url in blog_spider.urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("asyncio cost:", end - start, "second")

12、在异步IO中使用信号量控制爬虫并发度

import asyncio
import aiohttp
import blog_spider
import time


async def async_craw(url):
    print("爬虫开始:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url:{url},{len(result)}")


loop = asyncio.get_event_loop()

tasks = [loop.create_task(async_craw(url)) for url in blog_spider.urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("asyncio cost:", end - start, "second")

参考:【2021最新版】Python 并发编程实战,用多线程、多进程、多协程加速程序运行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1078265.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++程序员必修第一课【C++基础课程】00:课程介绍

1 课程目标&#xff1a; 搭建 VC 2019 开发环境全面系统学习 C 语法和开发基础学会用代码思维解决实际工作中的问题拥有一定的程序设计能力&#xff0c;能够开发一个完整软件 2 适用人群&#xff1a; 【零基础&#xff0c;想入行 C 程序员&#xff0c;必修第一课程】学生&am…

公众号留言板小程序哪个好用?一一列举

为什么公众号没有留言功能&#xff1f;2018年2月12日之后直到现在&#xff0c;新注册公众号的运营者会发现一个问题&#xff1a;无论是个人还是企业的公众号&#xff0c;在后台都找不到留言功能了。这对公众号来说绝对是一个极差的体验&#xff0c;少了一个这么重要的功能&…

【java学习】对象的产生(18)

文章目录 1. 初始化赋值2. 匿名对象3. 练习3.1. 习题一3.2. 习题二 4. 总结 1. 初始化赋值 当一个对象被创建时&#xff0c;会对其中各种类型的成员变量自动进行初始化赋值。除了基本数据类型之外的变量类型都是引用类型&#xff0c;如上节的 Person 和前面讲过的数组。 成员…

C# 搭建一个简单的WebApi项目23.10.10

一、创建Web API 1、创建一个新的web API项目 启动VS 2019&#xff0c;并在“开始页”选择“创建新项目”。或从“文件”菜单选择“新建”&#xff0c;然后选择“项目”。 选择ASP.NET Web应用程序(.NET Framework) 2.点击下一步&#xff0c;到这个页面时选择Web API。 3.选中…

类加载器、双亲委派机制

目录 1 JVM是什么2 类加载系统2.1 类的加载过程2.2 类加载器 3 双亲委派机制3.1 双亲委派机制介绍3.2 双亲委派机制的优缺点3.3 自定义类加载器实现双亲委派机制 1 JVM是什么 Java Virtual Machine&#xff08;Java虚拟机&#xff09;是java程序实现跨平台的⼀个重要的⼯具&am…

python查找替换:查找空行,空行前后添加```,```中间添加 # + 空格 + 空行后遇到的第1行文字?

初始代码 查找空行空行前后添加 中间添加 # 空行后遇到的第1行文字txt 36 96 159 8 72可以使用Python的字符串处理函数来查找并修改文本中的空行。以下是一个示例代码&#xff0c;演示如何在文本中查找空行&#xff0c;并在每个空行前后添加和一个注释&#xff1a; # 原始文本…

销售活动管理必备工具——CRM系统软件

在企业业务中&#xff0c;销售活动是实现企业业绩目标的基本单元&#xff0c;起着奠基石的作用。CRM销售管理系统是销售活动管理的必备工具&#xff0c;帮助企业更好地开展销售活动。下面来说说CRM系统如何找到并输出关键销售活动&#xff1f; 在能顺利找到并输出关键销售活动…

选刊CFP | 中科院1区TOP,IF18.6,Elsevier出版社,仅3个月录用!

【SciencePub学术】 本期推荐 部分学者论文完成后&#xff0c;选刊上犯难&#xff0c;面对纷繁复杂的期刊信息及流程&#xff0c;很难有时间和精力一一调研查看&#xff0c;小编在也经常收到此类信息&#xff0c;希望我们帮助查询期刊信息。为此&#xff0c;小编开设此专栏【选…

leetcode:455. 分发饼干(python3解法)

难度&#xff1a;简单 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#xff…

超详细!主流大语言模型的技术原理细节汇总!

1.比较 LLaMA、ChatGLM、Falcon 等大语言模型的细节&#xff1a;tokenizer、位置编码、Layer Normalization、激活函数等。 2. 大语言模型的分布式训练技术&#xff1a;数据并行、张量模型并行、流水线并行、3D 并行、零冗余优化器 ZeRO、CPU 卸载技术 ZeRo-offload、混合精度训…

吃透底层:从路由到前缀树

前言 今天学到关于路由相关文章&#xff0c;发现动态路由中有一个很常见的实现方式是前缀树&#xff0c;很感兴趣这个算法&#xff0c;故进行记录。 前缀树 Trie&#xff08;又被叫做字典树&#xff09;可以看作是一个确定有限状态自动机&#xff0c;尽管边上的符号一般是隐含…

Prometheus和grafana安装配置手册

1.简介 本文档为prometheus和grafana安装配置手册&#xff0c;prometheus和grafana的内容、和操作过程&#xff0c;详细介绍了服务监控配置、dashboard配置、告警配置等操作。 2.部署说明 Prometheus基于Golang编写&#xff08;需要安装&#xff09;&#xff0c;编译后的软件…

433/315无线接收芯片XL710,适合各种低功耗要求的设备等

XL710是一款高集成度、低功耗的单片ASK/OOK射频接收芯片。高频信号接收功能全部集成于片内以达到用最少的外围器件和最低的成本获得最可靠的接收效果。因此它是真正意义.上的“无线高频调制信号输入&#xff0c;数字解调信号输出”的单片接收器件。 XL710为SOP8封装&#xff0…

Python 中最常用的 4种股票价格移动平均方法(二)

一、简介 在本文中&#xff0c;我们重点关注一些小众但值得注意的移动平均方法。这些利基工具通常来自专门研究或开发用于解决非常特殊的交易场景。虽然不太主流&#xff0c;但它们提供了对市场动态的极其细致入微的见解。完整列表如下&#xff1a; 第 1 部分 — 基本技术&…

大模型部署手记(11)LLaMa2+Chinese-LLaMA-Plus-2-7B+Windows+llama.cpp+中文对话

1.简介&#xff1a; 组织机构&#xff1a;Meta&#xff08;Facebook&#xff09; 代码仓&#xff1a;GitHub - facebookresearch/llama: Inference code for LLaMA models 模型&#xff1a;LIama-2-7b-hf、Chinese-LLaMA-Plus-2-7B 下载&#xff1a;使用huggingface.co和百…

微软放大招!Bing支持DALL-E3,免费AI绘画等你来体验!

最近 OpenAI 发布了DALL-E3模型&#xff0c;出图效果和Midjourney不相上下&#xff0c;不过要使用它有些门槛&#xff0c;必须是 ChatGPT Plus 账户&#xff0c;而且还要排队&#xff0c;怎么等都等不到&#xff0c;搞得大家都比较焦虑。 不过现在微软在Bing上也支持 DALL-E3 …

Excel恢复科学技术法显示的数据

Excel中输入位数较大的数据时&#xff0c;软件会自动使用科学计数法显示。很多时候并不需要这样的计数格式&#xff0c;所以需要把它转变为普通的数字格式 操作方法 选中单元格/列/行》右键》设置单元格式 在打开的窗口中&#xff0c;切换到“数字”选项卡&#xff0c;点击“自…

第四章 图表样式美化

第四章 图表样式美化 1.图表样式概述 1.1.默认图表样式 ​ matplotlib在绘图的过程中会读取存储在本地的配置文件matplotlibrc&#xff0c;通过matplotlibrc文件中的缺省配置信息指定图表元素的默认样式&#xff0c;完成图表元素样式的初始设置。 ​ matplotlib文件包含众多…

springboot单独在指定地方输出sql

一般线上项目都是将日志进行关闭&#xff0c;因为mybatis日志打印&#xff0c;时间长了&#xff0c;会占用大量的内存&#xff0c;如果我想在我指定的地方进行打印sql情况&#xff0c;怎么玩呢&#xff01; 下面这个场景&#xff1a; 某天线上的项目出bug了&#xff0c;日志打印…

Keil软件仿真的方法: μVision2调试器

目录 1. μVision2调试器2. 调试工具3. 单步调试4. 使用断点调试5. 使用监视窗口调试6. 调试按钮的功能1&#xff0e;“单步”按钮2&#xff0e;“跟踪”按钮3&#xff0e;“运行到退出”按钮4&#xff0e;“运行到光标行”按钮 参考资料 软件仿真是利用PC的CPU来模拟单片机的运…