Python 全栈系列249 IO并发:异步、线程与协程

news2025/1/16 8:02:17

说明

很久没有关注这方面的问题了,平时大部分时候还是做批量操作 。在这种情况下(CPU密集),异步、协程这些意义就不大了,甚至可能进一步拖慢处理时间。

但是在IO这一块的零碎处理是比较重要的,可以更快,且更省资源。很早的时候,曾经在执行规则引擎之前要分布的从mysql取数,结果处理时间特别慢;后来改用了asyncio和aiomysql,速度大幅提升,这给我了很深的印象:什么资源都没加,速度就是快了。

后来我主要还是集中在批次处理数据下,每次都是万条的密集操作,这时候主要就用数据库本身的功能;而面对大量零散需求的时候用了redis,这样看起来也不存在瓶颈。

事实上,主要是由于一些零碎的请求实在是太少了,完全没有达到redis-py处理的上线,所以我也忽略了,其实在应对大量请求上,还是可以使用aioredis的。
在这里插入图片描述

内容

1 基本概念

进程、线程与协程。chatgpt说的又快又好

进程、线程与协程是三种常见的计算机并发执行模型。它们各自有不同的特点和应用场景。下面对它们进行详细解释:

进程(Process)
定义
进程是操作系统分配资源的基本单位,每个进程都有独立的内存空间、数据段、代码段和系统资源。
特点
隔离性:进程之间是相互独立的,一个进程崩溃不会影响其他进程。
开销大:由于进程间资源独立,创建和切换进程的开销较大。
并行性:在多核系统上,不同进程可以真正并行运行。
应用场景
适用于需要高隔离性和稳定性的场景,如不同服务模块的隔离运行。
使用多进程来充分利用多核 CPU 的性能。

import multiprocessing

def worker(num):
    print(f'Worker: {num}')

if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        p.start()
        p.join()

线程(Thread)
定义
线程是进程的子单元,是 CPU 调度和执行的基本单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和系统资源。
特点
共享资源:同一进程内的线程共享内存和文件等资源,线程之间通信和切换的开销较小。
并发执行:在单核 CPU 上,线程通过时间片轮转并发执行;在多核 CPU 上,线程可以并行执行。
同步问题:由于线程共享资源,需要考虑线程同步和互斥问题,避免竞态条件。
应用场景
适用于需要高效并发处理和共享资源的场景,如 Web 服务器处理并发请求、后台任务调度等。

import threading

def worker(num):
    print(f'Worker: {num}')

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

协程(Coroutine
定义
协程是比线程更加轻量级的执行单元。协程在用户态进行调度和切换,不依赖于操作系统提供的线程调度机制。
特点
轻量级:协程创建和切换的开销极小,因为它们不需要进行系统调用。
协作式调度:协程通过显式的让出控制权来实现调度,不同于线程的抢占式调度。即,协程在执行过程中需要显式地调用挂起操作,才能切换到其他协程。
不适合多核并行:由于协程是单线程运行的,无法利用多核 CPU 的并行能力。
应用场景
适用于大量 I/O 密集型任务,如网络编程、异步 I/O 操作等。
使用协程可以避免线程同步问题,简化并发编程模型。
比较与总结
进程:独立性强,隔离性好,但创建和切换开销大,适合多核并行。
线程:共享资源,创建和切换开销小,但需要解决同步问题,适合高并发和资源共享的场景。
协程:轻量级,无需系统调用,调度开销极小,但无法利用多核并行,适合大量 I/O 密集型任务。

import asyncio

async def worker(num):
    print(f'Worker: {num}')
    await asyncio.sleep(1)

async def main():
    tasks = [worker(i) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

关于进程,我的理解和体会。进程就是CPU的核,就是一个jupyter服务,就是一个容器,虽然这么说不严密,但是挺容易记的。在这个级别并行的方法太多了,multiprocessing没啥大用。

  • 1 服务级别,采用nginx发挥多核作用。
  • 2 单服务,tornado之类的可以直接发挥多核
  • 3 程序级,pandas的apply可以发挥多核作用(对于可向量化的操作)

还有就是采用GPU那种根本性的并行器件。

关于线程,刚好有个实际的体会。我有一个tornado,里面允许临时给一个参数字典加参数,然后我就发现调用过程失灵时不灵。原因是我启动了多核,这个参数字典其实给了某一个线程,在python里,线程也就是进程。然后进程间是隔离的,所以对于很多进程,根本没有参数。

所以从整体性能上,在核/线程基本我还算利用的可以,底下的IO密集并发还做的很不够。现在虽然有了celery,不过那种是偏异步的利用。

最后,协程在IO并发上的性价比应该是远高于线程的,所以这点我看到java的多线程就感觉太浪费了。

2 简单梳理

我把chatpt给我的一些有用的示例记一下,其实也就是这些写的比价有用,才快速攒这篇文章。

首先,我用了大量的微服务,特别是很多的agent: MongoAgent, RedisAgent, MysqlAgent… 这些服务都采用了同步的包,因为我原来处理的核心就是大批量数据:在CPU已经密集的情况下,IO并发也就没有意义了

考虑到现在越来越多的轻处理(sniffer),所以突然间感觉异步就变得越来越重要了。

2.1 在服务端异步

这个可以参考这篇文章

用sleep模拟了耗时操作,实测是蛮好用的。tornado本身也是基于asyncio做的。

import time
from concurrent.futures.thread import ThreadPoolExecutor

from tornado import web, ioloop
from tornado.concurrent import run_on_executor


class SyncToAsyncThreadHandler(web.RequestHandler):

    executor = ThreadPoolExecutor(max_workers=2)

    @run_on_executor
    def sleep(self):
        print("休息1...start")
        time.sleep(5)
        print("休息1...end")
        return 'ok'

    async def get(self):
        res = await self.sleep()
        self.write(res)

url_map = [
    ("/?", SyncToAsyncThreadHandler)
]

if __name__ == '__main__':
    app = web.Application(url_map, debug=True)
    app.listen(8888)
    print('started...')
    ioloop.IOLoop.current().start()

2.2 在客户端请求

用线程池发起并发,虽然效率不那么搞,但看着是同步方式,比较简单。

import requests
from concurrent.futures import ThreadPoolExecutor

def make_request():
    response = requests.get('http://localhost:8888')
    print(response.text)

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(make_request) for _ in range(5)]
    for future in futures:
        future.result()

这是另一个变体,同时发起多个url的请求。

import time
import logging
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

logging.basicConfig(level=logging.INFO)

def fetch_url(url):
    logging.info(f"Fetching {url}...")
    response = requests.get(url)
    logging.info(f"Completed {url}")
    return response.text

urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
    "https://httpbin.org/uuid",
    "https://httpbin.org/headers",
]

def main():
    max_workers = 10  # 可以根据需要调整 max_workers 的数量
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(fetch_url, url): url for url in urls}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
                logging.info(f"Result from {url}: {data[:60]}...")
            except Exception as exc:
                logging.error(f"{url} generated an exception: {exc}")

if __name__ == "__main__":
    main()

线程与协程

在我问这个问题的时候,chat又了我例子
在这里插入图片描述

线程

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    print(f"Task {n} start")
    time.sleep(2)
    print(f"Task {n} end")
    return n

def main():
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, i) for i in range(5)]
        for future in futures:
            print(f"Result: {future.result()}")

if __name__ == "__main__":
    main()

协程

import asyncio

async def task(n):
    print(f"Task {n} start")
    await asyncio.sleep(2)
    print(f"Task {n} end")
    return n

async def main():
    tasks = [task(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

然后还给了一个混合版,我就不知道是不是它有点幻觉+过敏了。

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io(n):
    print(f"Blocking IO {n} start")
    time.sleep(2)
    print(f"Blocking IO {n} end")
    return n

async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        results = await asyncio.gather(
            loop.run_in_executor(pool, blocking_io, 1),
            loop.run_in_executor(pool, blocking_io, 2),
            loop.run_in_executor(pool, blocking_io, 3),
        )
    for result in results:
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

最后再附一个我自己的协程版,在eventloop方面我有点没搞明白,不过反正不是get loop就是new loop,是在不行再叠一个nest_asyncio,反正只要有那么一个协调组织者在就行(loop)。

import nest_asyncio 
nest_asyncio.apply()

import json 
import asyncio, aiohttp
async def json_query_worker(task_id = None , url = None , json_params = None,time_out = 60, semaphore = None):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json = {**json_params},timeout=aiohttp.ClientTimeout(total=time_out)) as response:
                res = await response.text()
                return {task_id: json.loads(res)}
async def json_player(task_list , concurrent = 3):
    semaphore = asyncio.Semaphore(concurrent) # 并发限制
    tasks = [asyncio.ensure_future(json_query_worker(**x,  semaphore = semaphore)) for x in task_list]
    return await asyncio.gather(*tasks)
loop = asyncio.new_event_loop()
# loop = asyncio.get_event_loop()
tick1 = time.time()
res = loop.run_until_complete(json_player(para_dict['task_rec_list'], concurrent=10))
tick2 = time.time()
print(tick2- tick1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1790992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

css动画案例练习之会展开的魔方和交错的小块

这里写目录标题 一级目录二级目录三级目录 下面开始案例的练习,建议第一个动手操作好了再进行下一个一、交错的小块效果展示1.大致思路1.基本结构2.实现动态移动 2.最终版代码 二、会展开的魔方1.大致思路1.基本结构;2.静态魔方的构建3.让静态的魔方动起来 2.最终版…

使用python绘制华夫饼图

使用python绘制华夫饼图 华夫饼图效果代码 华夫饼图 华夫饼图(Waffle Chart)是一种数据可视化图表,用于显示数据在一个网格中的分布情况。它类似于饼图,通过将数据划分为等大小的方块来表示不同类别的比例。华夫饼图的优势在于它…

图解Mysql索引原理

概述 是什么 索引像是一本书的目录列表,能根据目录快速的找到具体的书本内容,也就是加快了数据库的查询速度索引本质是一个数据结构索引是在存储引擎层,而不是服务器层实现的,所以,并没有统一的索引标准,…

bootstrap5-学习笔记2-模态框+弹窗+tooltip+popover+信息提示框

参考: Bootstrap5 教程 | 菜鸟教程 https://www.runoob.com/bootstrap5/bootstrap5-tutorial.html Bootstrap 入门 Bootstrap v5 中文文档 v5.3 | Bootstrap 中文网 https://v5.bootcss.com/docs/getting-started/introduction/ 之前用bootstrap2和3比较多&#x…

音频信号分析与实践

音频信号分析与实践课程,方便理解音频信号原理和过程 1.音频信号采集与播放 两种采样模式和标准的采样流程 人说话的声音一般在2kHz一下: 采样频率的影响:采样率要大于等于信号特征频率的2倍;一般保证信号完整,需要使用10倍以上的…

Git权限管理

Git权限管理 简介:大家好,我是程序员枫哥,🌟一线互联网的IT民工、📝资深面试官、🌹Java跳槽网创始人。拥有多年一线研发经验,曾就职过科大讯飞、美团网、平安等公司。在上海有自己小伙伴组建的副…

纯血鸿蒙APP开发实战:如何添加TabBar

1.tabbar组件 tabbar组件是移动端开发经常使用的一个组件,底部固定工具栏,顶部tab工具栏等。 2.示例 EntryComponentstruct MainPage {State private selectedIndex: number 0;private controller: TabsController new TabsController()build() {Col…

PHP序列化、反序列化

目录 一、PHP序列化:serialize() 1.对象序列化 2.pop链序列化 3.数组序列化 二、反序列化:unserialize() 三、魔术方法 ​四、NSSCTF相关简单题目 1.[SWPUCTF 2021 新生赛]ez_unserialize 2.[SWPUCTF 2021 新生赛]no_wakeup 学习参考&#xff1…

Python学习从0开始——Kaggle机器学习004总结2

Python学习从0开始——Kaggle机器学习004总结2 一、缺失值二、分类变量2.1介绍2.2实现1.获取训练数据中所有分类变量的列表。2.比较每种方法方法1(删除分类变量)方法2(序数编码)方法3独热编码 三、管道3.1介绍3.2实现步骤1:定义预处理步骤步骤2:定义模型步骤3:创建和评估管道 四…

数据仓库核心:维度表设计的艺术与实践

文章目录 1. 引言1.1基本概念1.2 维度表定义 2. 设计方法2.1 选择或新建维度2.2 确定维度主维表2.3 确定相关维表2.14 确定维度属性 3. 维度的层次结构3.1 举个例子3.2 什么是数据钻取?3.3 常见的维度层次结构 4. 高级维度策略4.1 维度整合维度整合:构建…

HCIP的学习(28)

第九章,链路聚合和VRRP 链路聚合 ​ 目的:备份链路以及提高链路带宽。 ​ 链路聚合技术(Eth-Trunk):将多个物理接口捆绑成一个逻辑接口,将N条物理链路逻辑上聚合为一条逻辑链路。 正常情况下,…

Android开机动画的结束过程BootAnimation(基于Android10.0.0-r41)

文章目录 Android 开机动画的结束过程BootAnimation(基于Android10.0.0-r41) Android 开机动画的结束过程BootAnimation(基于Android10.0.0-r41) 路径frameworks/base/cmds/bootanimation/bootanimation_main.cpp init进程把我们的BootAnimation的二进制文件拉起来了&#xf…

STM32作业实现(五)温湿度传感器dht11

目录 STM32作业设计 STM32作业实现(一)串口通信 STM32作业实现(二)串口控制led STM32作业实现(三)串口控制有源蜂鸣器 STM32作业实现(四)光敏传感器 STM32作业实现(五)温湿度传感器dht11 STM32作业实现(六)闪存保存数据 STM32作业实现(七)OLED显示数据 STM32作业实现(八)触摸按…

6. MySQL 查询、去重、别名

文章目录 【 1. 数据表查询 SELECT 】1.1 查询表中所有字段使用 * 查询表的所有字段列出表的所有字段 1.2 查询表中指定的字段 【 2. 去重 DISTINCT 】【 3. 设置别名 AS 】3.1 为表指定别名3.2 为字段指定别名 【 5. 限制查询结果的条数 LIMIT 】5.1 指定初始位置5.2 不指定初…

利用短视频平台,轻松引流获客:自动私信评论策略全解析

在数字化时代,短视频已成为互联网流量的新蓝海,其独特的视觉吸引力和高度的用户粘性为各行各业提供了前所未有的营销机遇。无论是初创企业还是成熟品牌,都能通过短视频平台有效触达目标客户,实现高效引流与获客。本文将深入探讨如…

Three.js加入到可视化大屏,看看能否惊艳到你?

three.js 在可视化大屏上可以实现各种三维场景和动画效果,可以根据具体需求进行定制化开发,并结合其他技术,如数据可视化、交互设计等,实现更加丰富的可视化效果。 three.js 是一个基于 WebGL 的 JavaScript 3D 库,可…

LLM的基础模型5:Embedding模型

大模型技术论文不断,每个月总会新增上千篇。本专栏精选论文重点解读,主题还是围绕着行业实践和工程量产。若在某个环节出现卡点,可以回到大模型必备腔调或者LLM背后的基础模型新阅读。而最新科技(Mamba,xLSTM,KAN)则提…

dirfuzz-web敏感目录文件扫描工具

dirfuzz介绍 dirfuzz是一款基于Python3的敏感目录文件扫描工具,借鉴了dirsearch的思路,扬长避短。在根据自身实战经验的基础上而编写的一款工具,经过断断续续几个月的测试、修改和完善。 项目地址:https://github.com/ssrc-c/di…

C++240605

设计一个 Per类&#xff0c;类中包含**私有**成员:姓名、年龄、**指针成员**身高、体重&#xff0c; 再设计一个Stu类&#xff0c;类中包含**私有**成员:成绩、 Per类对象p1&#xff0c; 设计这 两个类 的 **构造函数、析构函数**。 #include <iostream>using namespace…

这才是大模型价格战背后的真相

想必大家今天肯定被各家大模型厂商的降价新闻刷圈了&#xff0c;如果说 Meta Llama 3 的开源是国外大模型市场的搅局者&#xff0c;那 DeepSeek-V2 就是国内大模型市场的鲶鱼&#xff0c;但是价格战背后是大模型基础设施优化带来的物美价廉&#xff0c;还是浑水摸鱼的噱头&…