【python高级】asyncio 并发编程

news2025/4/6 15:06:35

【大家好,我是爱干饭的猿,本文重点介绍python高级篇的事件循环,task取消和协程嵌套、call_soon、call_later、call_at、 call_soon_threadsafe、asyncio模拟http请求、asyncio同步和通信、aiohttp实现高并发实践。

后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】

上一篇文章:《【python高级】多线程、多进程和线程池编程》

1. 事件循环

  • 包含各种特定系统实现的模块化事件循环
  • 传输和协议抽象
  • 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
  • 模仿futures模块但适用于事件循环使用的Future类
  • 基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  • 模仿threading模块中的同步原语、可以用在单线程内的协程之间

1.1 开始一个协程

# 事件循环回调(驱动生成器)+epollIO多路复用)
# asyncio是python用于解决异io编程的一整套解决方案
# tornado, gevent, twisted (scrapy.django channels)
# tornado(实现web服务器) django+flask(uwsgi gunicorn+nginx)
# tornado可以直接部署,nginx+tornado

import asyncio
import time


async def get_html(url):
    print("start get url")
    # time.sleep(2)  执行此代码是顺序执行
    await asyncio.sleep(2)
    print("end get url")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [get_html("www.baidu.com") for i in range(10)]

    start_time = time.time()
    loop.run_until_complete(asyncio.wait(tasks))
    end_time = time.time()

    print("耗时:{}".format(end_time-start_time))

1.2 获取协程返回值和callback逻辑

import asyncio
import time
from functools import partial

# 获取协程返回值和callback逻辑
async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    return "body"

def callback(url, future):
    print("url:{}".format(url))
    print("send email to me")

if __name__ == '__main__':
    start_time = time.time()

    loop = asyncio.get_event_loop()

    # 方式1
    # get_future = asyncio.ensure_future(get_html("www.baidu.com"))
    # loop.run_until_complete(get_future)
    # print(get_future.result())
    # print("耗时:{}".format(time.time()-start_time))

    # 方式2
    # task = loop.create_task(get_html("www.baidu.com"))
    # loop.run_until_complete(task)
    # print(task.result())
    # print("耗时:{}".format(time.time()-start_time))

    # 加入callback
    task = loop.create_task(get_html("www.baidu.com"))
    # task.add_done_callback(callback) callback未传入参数
    task.add_done_callback(partial(callback, "www.baidu.com"))
    loop.run_until_complete(task)
    print(task.result())
    print("耗时:{}".format(time.time()-start_time))

1.3 await 和 gather

import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    start_time = time.time()
    # gather和wait的区别
    # gather更加high-leveL
    # gather可以进行分组

    # # 方法1
    # group1 = [get_html("www.baidu.com") for i in range(2)]
    # group2 = [get_html("www.baidu.com") for i in range(2)]
    # loop.run_until_complete(asyncio.gather(*group1, *group2))

    # 方法2
    group1 = [get_html("www.baidu.com") for i in range(2)]
    group2 = [get_html("www.baidu.com") for i in range(2)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    loop.run_until_complete(asyncio.gather(group1, group2))

    print("耗时:{}".format(time.time()-start_time))

2. task取消和协程嵌套

2.1 task 取消

import asyncio
import time

async def get_html(sleep_time):
    print("waiting")
    await asyncio.sleep(sleep_time)
    print("done after {}s".format(sleep_time))


if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)
    tasks = [task1, task2, task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        # 此报错可以在Linux中运行时按 ctrl+c 复现
        all_tasks = asyncio.all_tasks()
        for task in all_tasks:
            print("task cancel")
            print(task.cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

2.2 协程嵌套

import asyncio


async def compute(x,y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y


async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))


loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

子协程调用原理:
在这里插入图片描述

3. call_soon、call_later、call_at、 call_soon_threadsafe

import asyncio


def callback(sleep_times):
    print("sleep {} success".format(sleep_times))


def stoploop(loop):
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # 1. call_soon立即执行
    # loop.call_soon(callback, 2)
    # loop.call_soon(callback, 1)

    # 2. call_later 后于call_soon执行,多个call_later按照delay顺序执行
    # loop.call_later(2, callback, 2)
    # loop.call_later(1, callback, 1)
    # loop.call_later(3, callback, 3)

    # 3. call_at 指定时间执行
    # cur_time = loop.time()
    # loop.call_at(cur_time+2, callback, 2)
    # loop.call_at(cur_time+1, callback, 1)
    # loop.call_at(cur_time+3, callback, 3)

    # 4. call_soon_threadsafe和call_soon 使用方法一致,但是是线程安全的
    loop.call_soon_threadsafe(callback, 1)


    loop.call_soon(stoploop, loop)
    loop.run_forever()

4. ThreadPoolExecutor + asyncio

import asyncio
from concurrent.futures import ThreadPoolExecutor


def get_url(url):
    pass


if __name__ == "__main__ ":
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    tasks = []
    for url in range(20):
        url = "http://shop.dd.com/goods/{}/".format(url)
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time:{".format(time.time() - start_time))

5. asyncio模拟http请求

#requests -> urlib ->socket
import socket
import asyncio
from urllib.parse import urlparse

async def get_url(url ):
    # 通过socket请求htmL
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    # client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)
    writer.write("GET {HTTP/1.1r\nHost:{}r\nConnection:close\r\n\r\n".format(path, host).encode( "utf-8"))
    all_lines = []
    async for raw_line in reader:
        data = raw_line.decode("utf8")
        all_lines.append(data)
    html = "ln".join(all_lines)
    return html

if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = []
    for url in range(20):
        url = "http: // shop.projectsedu.com/goods/{0}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    loop.run_until_complete(asyncio.wait(tasks))
    print('last time:{}'.format(time.time()))

6. asyncio同步和通信

import asyncio
from asyncio import Lock, Queue

import aiohttp

cache = {}
lock = Lock()
# queue = Queue() 和 queue = [], 如果需要流量限制就需要使用Queue()

async def get_stuff(url) :
    async with lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET', url)
        cache[url] = stuff
        return stuff

async def parse_stuff():
    stuff = await get_stuff()
    #do some parsing

async def use_stuff():
    stuff = await get_stuff()
    #use stuff to do something interesting

7. aiohttp实现高并发实践

import re
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuery

start_url = 'https://developer.aliyun.com/article/698731'

waitting_urls = []
seen_urls = set()
stopping = False


async def fetch(url, session):
    async with session.get(url) as response:
        if response.status != 200:
            return None

        try:
            return await response.text(encoding='ISO-8859-1')
        except UnicodeDecodeError:
            return await response.read()


async def init_urls(url, session):
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)


def extract_urls(html):
    if html is None or not isinstance(html, str):
        return
    if not html.strip():  # 如果html内容为空或者只包含空白字符
        return

    try:
        urls = []
        pq = PyQuery(html)
        # 在这里继续处理pq对象
        pq = PyQuery(html)
        for link in pq.items("a"):
            url = link.attr("href")
            if url and url.startswith("http") and url not in seen_urls:
                urls.append(url)
                waitting_urls.append(url)
        return urls
    except Exception as e:
        print(f"Failed to parse HTML: {e}")


async def article_handler(url, session, pool):
    # 获取文章详情并解析入库
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)
    pq = PyQuery(html)
    title = pq("article-title").text()
    if len(title) == 0:
        print("No valid title found for article: {}".format(url))
    else:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 42;")
                insert_sql = "insert into article_test(title) values('{}')".format(title)
                await cur.execute(insert_sql)


async def consumer(pool):
    async with aiohttp.ClientSession() as session:
        while not stopping:
            if len(waitting_urls) == 0:
                await asyncio.sleep(1)
                continue

            url = waitting_urls.pop()
            print("start get url: {}".format(url))
            if re.match('https://world.taobao.com', url):
                continue
            if re.match('http://.*?developer.aliyun.com/article/\d+/', url):
                if url not in seen_urls:
                    asyncio.ensure_future(article_handler(url, session, pool))
                    await asyncio.sleep(2)  # 适当的等待时间
            else:
                if url not in seen_urls:
                    await init_urls(url, session)


async def main(loop):
    # 等待MySQL建立连接
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='*****',
                                      db='aiomysql_test', loop=loop,
                                      charset='utf8', autocommit=True)
    async with aiohttp.ClientSession() as session:
        html = await fetch(start_url, session)
        seen_urls.add(start_url)
        extract_urls(html)
    asyncio.ensure_future(consumer(pool))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        stopping = True
        loop.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1195293.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

完整版付费进群带定位源码

看到别人发那些不是挂羊头卖狗肉,要么就是发的缺少文件引流的。恶心的一P 这源码是我付费花钱买的分享给大家,功能完整。 搭建教程 nginx1.2 php5.6--7.2均可 最好是7.2 第一步上传文件程序到网站根目录解压 第二步导入数据库(shujuk…

滤波器实现

滤波器实现 卷积和滤波 滤波的数学基础是卷积。对于有限冲激响应 (FIR) 滤波器,滤波运算的输出 y(k) 是输入信号 x(k) 与冲激响应 h(k) 的卷积: y(k)∞∑l−∞h(l) x(k−l). 如果输入信号也是有限长度的,您可以使用 MATLAB conv 函数来执行…

java项目之高校奖学金管理系统(ssm框架+源码+文档)

风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的高校奖学金管理系统。项目源码以及部署相关请联系风歌,文末附上联系信息 。 项目简介: 管理员:首…

es6过滤对象里面指定的不要的值filter过滤

//过滤出需要的值this.dataItemTypeSelectOption response.data.filter(ele > ele.dictValue tree||ele.dictValue float4);//过滤不需要的值this.dataItemTypeSelectOption response.data.filter((item) > {return item.dictValue ! "float4"&&it…

Hbase 迁移小结:从实践中总结出的最佳迁移策略

在数据存储和处理领域,HBase作为一种分布式、可扩展的NoSQL数据库,被广泛应用于大规模数据的存储和分析。然而,随着业务需求的变化和技术发展的进步,有时候我们需要将现有的HBase数据迁移到其他环境或存储系统。HBase数据迁移是一…

观点|周鸿祎:大模型真正的竞争在于使其与用户场景相结合

【网易科技11月9日报道】目前,人工智能技术尚未达到向手机一样的刚性、高频需求,各国和企业都在加大研发和应用力度,探索不同的技术路线和商业模式。 360集团创始人、董事长周鸿祎在2023世界互联网大会乌镇峰会上表示,目前人工智能…

文件扩展名批量修改:txt文件扩展名批量修改为doc文档,高效办公的方法

在我们的日常工作中,经常需要处理大量的文本文件,这些文件可能以.txt为扩展名,而我们需要将其修改为.doc扩展名以方便进一步的操作。这种情况下,我们引用云炫文件管理器来将扩展名批量修改,提升办公的效率。在进行文件…

【数据结构】堆详解!(图解+源码)

🎥 屿小夏 : 个人主页 🔥个人专栏 : 数据结构解析 🌄 莫道桑榆晚,为霞尚满天! 文章目录 🌤️前言🌤️堆的理论☁️二叉树的顺序存储☁️堆的概念 🌤️堆的实现…

Android 进阶——Binder IPC之学习Binder IPC架构及原理概述(十二)

文章大纲 引言一、Binder IPC 基础架构1、Binder IPC核心角色2、Binder IPC的数据流 二、Binder IPC 协议通信流程三、Binder IPC 核心角色详解1、Server 进程及Server 组件2、Client进程及Client组件3、Service Manager 与实名 Binder4、Binder 驱动 四、Binder 通信过程五、开…

解锁潜在商机的钥匙——客户管理系统公海池

在竞争激烈的市场环境下,企业需要更智能、高效的方式管理客户,从而挖掘潜在商机。客户管理系统的公海池,就是为此而生的利器,让你轻松解锁商机,提升客户管理效能。 公海池,打破信息孤岛,释放潜在…

树之二叉排序树(二叉搜索树)

什么是排序树 说一下普通二叉树可不是左小右大的 插入的新节点是以叶子形式进行插入的 二叉排序树的中序遍历结果是一个升序的序列 下面是两个典型的二叉排序树 二叉排序树的操作 构造树的过程即是对无序序列进行排序的过程。 存储结构 通常采用二叉链表作为存储结构 不能 …

一致性算法介绍(二)

1.4. NWR N :在分布式存储系统中,有 多少份备份数据 W :代表一次成功的更新操作要求至少有 w 份数据写入成功 R : 代表一次成功的读数据操作要求至少有 R 份数据成功读取 NWR值的不同组合会产生不同的一致性效果,当WR…

【LeetCode刷题笔记】堆和优先级队列

358. K 距离间隔重排字符串 解题思路: 大根堆 + 队列 , 1)首先 计数数组 统计 每个字符出现的次数 ,然后将 计数 > 0 的 字符 和 次数 一起放入 大根堆 ,大根堆中

docker创建并访问本地前端

docker创建并访问本地前端,直接上命令: 安装nginx镜像: docker pull nginx 查看已安装的nginx: docker images 创建DockerFile文件,直接在当前文件夹种创建 touch Dockerfile 在Dockerfile写入内容: F…

HHDESK端口转发监控服务

端口转发是一种网络技术,用于将外部网络请求转发到内部网络中的特定设备或服务。它允许通过公共网络访问内部网络中的资源,提供了灵活性和便利性。 传统的端口转发方式是通过配置路由器的端口映射,但这需要具备网络知识和一定的技术操作&…

操作系统 | 编写内核

🌈个人主页:Sarapines Programmer🔥 系列专栏:《操作系统实验室》🔖少年有梦不应止于心动,更要付诸行动。 目录结构 1. 操作系统实验之编写内核 1.1 实验目的 1.2 实验内容 1.3 实验步骤 1.4 实验过程 …

Git系列之Git集成开发工具及git扩展使用

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《Git实战开发》。🎯🎯 &a…

MATLAB中deconvwnr函数用法

目录 语法 说明 示例 使用 Wiener 滤波对图像进行去模糊处理 deconvwnr函数的功能是使用 Wiener 滤波对图像进行去模糊处理。 语法 J deconvwnr(I,psf,nsr) J deconvwnr(I,psf,ncorr,icorr) J deconvwnr(I,psf) 说明 J deconvwnr(I,psf,nsr) 使用 Wiener 滤波算法对…

Clickhouse SQL

insert insert操作和mysql一致 标准语法:insert into [table_name] values(…),(….)从表到表的插入:insert into [table_name] select a,b,c from [table_name_2] update 和 delete ClickHouse 提供了 Delete 和 Update 的能力,这类操作…

深入理解 TCP;场景复现,掌握鲜为人知的细节

握手失败 第一次握手丢失了,会发生什么? 当客户端想和服务端建立 TCP 连接的时候,首先第一个发的就是 SYN 报文,然后进入到 SYN_SENT 状态。 在这之后,如果客户端迟迟收不到服务端的 SYN-ACK 报文(第二次…