4.1 协程:协程基础

news2024/11/25 12:40:02

1.协程

协程,又称微线程。协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。 为啥说它是一个执行单元,因为它自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。

协程,也称为用户级线程,在不开辟线程的基础上完成多任务,也就是在单线程的情况下完成多任务,多个任务按照一定顺序交替执行 通俗理解只要在def里面只看到一个yield关键字表示就是协程。

在一个线程中如果遇到IO等待时间,线程不会傻傻等,利用空闲的时候再去干点其他事情;
在这里插入图片描述

(1)协程的发展历程

Python 对协程的支持经历了多个版本:

  • Python2.x 对协程的支持比较有限,通过 yield 关键字支持的生成器实现了一部分协程的功能但不完全。
  • 第三方库 gevent 对协程有更好的支持。
  • Python3.4 中提供了 asyncio 模块。
  • Python3.5 中引入了 async/await 关键字。
  • Python3.6 中 asyncio 模块更加完善和稳定。
  • Python3.7 中内置了 async/await 关键字。

(2)协程的特点

  • 单线程内切换,适用于IO密集型程序中,可以最大化IO多路复用的效果。
  • 无法利用多核。
  • 协程间完全同步,不会并行。不需要考虑数据安全。在单线程利用CPU和IO同时执行的原理,实现函数异步执行。

(3)协程是否需要加锁

https://zhuanlan.zhihu.com/p/169426477
同一个线程内的协程不需要加锁,不同线程内的协程就需要加锁。

2.协程和线程差异

在实现多任务时, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。

  • 一个进程至少有一个线程,进程里面可以有多个线程
  • 一个线程里面可以有多个协程

3. 协程知识点

https://zhuanlan.zhihu.com/p/169426477

(1)事件循环

理解成一个死循环,去检测并执行某些代码。

任务列表=[任务1,任务2,任务3 ..... ]
while True:
  可执行的人物列表,已完成的任务列表=去任务列表中检查所有任务,'可执行''已完成'的任务返回
  for 就绪任务 in 可执行任务列表:
    执行已就绪的任务
  for 已完成的任务 in 已完成的任务列表
    任务列表中移除 已完成
如果 任务列表 中的任务都已完成, 终止循环

① 旧的做法

import asyncio

# 去一个获取事件循环
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(say_after('hello', i)) for i in range(4)]
# 将任务放到'任务列表'
loop.run_until_complete(asyncio.wait(tasks))

② 新的做法

# coding=utf-8
import asyncio
import time
async def say_after(what, delay):
    await asyncio.sleep(delay)
    print(what)
async def main():
    # create_task封装了事件循环
    tasks = []
    task1 = asyncio.create_task(say_after('hello', 1))
    task2 = asyncio.create_task(say_after('world', 2))
    tasks.append(task1)
    tasks.append(task2)
    print(f"程序于 {time.strftime('%X')} 开始执行")
    await asyncio.wait(tasks)
    print(f"程序于 {time.strftime('%X')} 执行结束")

asyncio.run(main())

(2)协程函数与协程对象

协程函数:定义函数时候 async def 函数名 称为协程函数
协程对象:执行 协程函数() 得到线程对象.

async def func(): # 协程方法
  pass

result=func() # 协程对象,内部代码不会执行的

如果想要运行携程函数内部代码,必须要将协程对象交给事件循环来处理。

import asyncio
async def func(): # 协程方法
  print('函数哈')

# 旧的写法-----------------------
# 获取事件循环
# loop=asyncio.get_event_loop()
# 协程对象,内部代码不会执行的
# tasks = [asyncio.ensure_future(func()) for i in range(4)]
# 将任务放到'任务列表'
# loop.run_until_complete(asyncio.wait(tasks))


# python3.7及以上可以使用最新写法:------------------
async def main():
    tasks = []
    task1 = asyncio.create_task(func()))
    tasks.append(task1)
    await asyncio.wait(tasks)


asyncio.run(main())

(3)await 【重要,新版本写法】

await+可等待的对象(协程对象,Future,Task对象) ~=(相当于) IO等待;await 就是等待对应的值得到结果后再继续向下走。

  • await 一个对象;
  • await asyncio.wait(多个可等待对象的列表);
import asyncio
async def others():
  print('start')
  await asyncio.sleep(2)
  print('end')
  return  '返回值'

async def func():
  print('执行协程函数内部代码')
  respo1=await others()
  print('IO请求结束结果为1',respo1)
  respo2=await others()
  print('IO请求结束结果为2',respo2)

asyncio.run(func())
执行协程函数内部代码
start
end
IO请求结束结果为1 返回值
start
end
IO请求结束结果为2 返回值

(4)Task对象【重要,新版本写法】

在事件循环中添加多个任务的。Task用于并发调度协程,通过asyncio.create_task(协程对象) 的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行.除了使用asyncio.create_task()函数以外,还可以用底层级的loop.create_task()或asyncio.ensure_future()函数.不建议手动实例化Task对象。

注意:asyncio.create_task()函数在python3.7中被加入,在python3.7之前可以改用底层级的asyncio.ensure_future()函数。

import asyncio

async def func():
  print(1)
  await asyncio.sleep(2)
  print(2)
  return 'func111'
async def main():
  print('start main')
  # 创建task对象,将当前执行func函数任务添加到事件循环中.
  task_list=[
    asyncio.create_task(func(),name='n1'),
    asyncio.create_task(func(),name='n2')
  ]
  print('main end')
  don,pending=await asyncio.wait(task_list,timeout=None) 
  # 最多等待1秒,如果任务没有完成,那么返回结果就是空的
  print(don)
  print(pending)
asyncio.run(main())

(5)asyncio.Future对象

Task继承Future,Task对象内部await结果的处理基于Future对象来的。Task和Future的区别:如果Future没有设置返回值那么await就会一直等待,Task函数执行完成后默认会调用set_result(‘666’)来结束。

下面是新旧写法的混合:

import asyncio
async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')

async def main():
    #获取当前事件循环
    loop=asyncio.get_running_loop()
    # 创建一个任务(Future对象),这个任务什么也没有干
    fut=loop.create_future()
    await loop.create_task(set_after(fut))
    # 等待任务最终结果(Future对象),没有结果则会一直等待下去
    data=await asyncio.wait(fut)
    print(data)

asyncio.run(main())

(6)uvloop 事件循环【旧】

是asyncio的事件循环的代替方案,事件循环>默认asyncio的事件循环。

pip install uvloop
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码 与之前一样
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)

注意事项:一个asgi->uvicorn 内部使用的就是uvloop。

4.简述实现协程的几种方式

(1)协程 - yield关键字

import time

def work1():
    while True:
        print("---work1---")
        yield
        time.sleep(0.5)
        
        
def work2():
    while True:
        print("---work2---")
        yield
        time.sleep(0.5)
         
def main():
    w1 = work1()
    w2 = work2()
    while True:
        next(w1)
        next(w2)
        
if __name__ == '__main__':
    main()

(2)协程 - greenlet

为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单
安装:

pip install greenlet
import time
from greenlet import greenlet


def test1():
    while True:
        print("---A---")
        gr2.switch()  # 切换到gr2中运行
        time.sleep(0.5)
        
        
def test2():
    while True:
        print("---B---")
        gr1.switch()
        time.sleep(0.5)
         
gr1 = greenlet(test1)
gr2 = greenlet(test2)


gr1.switch()


(3)协程 - gevent

greenlet 已经实现了协程,但是这个还的人工切换,太麻烦了。python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent。
其原理是当一个 greenlet 遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO

安装方法:

pip install gevent
import gevent

def f(n):
    for i in range(n):
        print(gevent.getcurrent(),i)
        #用来模拟一个耗时操作,注意不是time模块中的sleep,注意和time.sleep(1)的结果区别
        gevent.sleep(1)

g1 = gevent.spawn(f,5)
g2 = gevent.spawn(f,5)
g3 = gevent.spawn(f,5)


g1.join()
g2.join()
g3.join()

或者给程序打补丁代替gevent.sleep(1)。

from gevent import monkey
import gevent
import random
import time

# 有耗时操作时需要
monkey.patch_all()  # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块

def coroutine_work(coroutine_name):
    for i in range(5):
        print(coroutine_name, i)
        time.sleep(random.random())

gevent.joinall([
        gevent.spawn(coroutine_work, "work1"),
        gevent.spawn(coroutine_work, "work2")
])

(4)协程 - asyncio装饰器【旧写法】

要实现异步并行,需要将协程函数打包成一个任务(Task)。注意:遇到IO等耗时操作会自动切换.

import asyncio
import time
@asyncio.coroutine
def func1():
    print(1)
    yield  from asyncio.sleep(3) # 遇到耗时后会自动切换到其他函数中执行
    print(2)

@asyncio.coroutine
def func2():
    print(3)
    yield  from asyncio.sleep(2)
    print(4)

@asyncio.coroutine
def func3():
    print(5)
    yield  from asyncio.sleep(2)
    print(6)

tasks=[
    asyncio.ensure_future( func1() ),
    asyncio.ensure_future( func2() ),
    asyncio.ensure_future( func3() )
]

# 协程函数使用 func1()这种方式是执行不了的
start=time.time()
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(func1()) 执行一个函数
end=time.time()
print(end-start) # 只会等待3秒

(5)协程 - asyc&await 关键字

① 旧的写法

import asyncio
import time

async def func1():
    print(1)
    await asyncio.sleep(3) # 遇到耗时后会自动切换到其他函数中执行
    print(2)


async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)


async def func3():
    print(5)
    await asyncio.sleep(2)
    print(6)

tasks=[
    asyncio.ensure_future( func1() ),
    asyncio.ensure_future( func2() ),
    asyncio.ensure_future( func3() )
]

# 协程函数使用 func1()这种方式是执行不了的
start=time.time()
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(func1()) 执行一个函数
end=time.time()
print(end-start) # 只会等待3秒

② 新的写法

import asyncio
import time

async def func1():
    print(1)
    await asyncio.sleep(3) # 遇到耗时后会自动切换到其他函数中执行
    print(2)


async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)


async def func3():
    print(5)
    await asyncio.sleep(2)
    print(6)


async def main():
    tasks = [
        asyncio.create_task(func1()),
        asyncio.create_task(func2()),
        asyncio.create_task(func3()),
    ]
    await asyncio.wait(tasks)

# 协程函数使用 func1()这种方式是执行不了的
start=time.time()
asyncio.run(main())
end=time.time()
print(end-start) # 只会等待3秒

(6)注意事项

其实 asyncio 和 asyc 几乎是互相关联的。

(7)最新的实现方式

在这里插入图片描述

5.进程、线程、协程对比

https://zhuanlan.zhihu.com/p/169426477

  • 进程是资源分配的单位;
  • 线程是操作系统调度的单位;
  • 进程切换需要的资源很最大,效率很低;
  • 线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下);
  • 协程切换任务资源很小,效率高;
  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发。

协程是一种比线程更加轻量级的存在,协程不是被操作系统内核所管理,而完全是由用户态程序所控制。 协程与线程以及进程的关系如下图所示。可见,协程自身无法利用多核,需要配合进程来使用才可以在多核平台上发挥作用。
在这里插入图片描述

  • 协程之间的切换不需要涉及任何 System Call(系统调用)或任何阻塞调用。
  • 协程只在一个线程中执行,切换由用户态控制,而线程的阻塞状态是由操作系统内核来完成的,因此协程相比线程节省线程创建和切换的开销。
  • 协程中不存在同时写变量的冲突,因此,也就不需要用来守卫关键区块的同步性原语,比如:互斥锁、信号量等,并且不需要来自操作系统的支持。

(1)协程的真实本质

协程通过 “挂起点” 来主动 yield(让出)CPU,并保存自身的状态,等候恢复。例如:首先在 funcA 函数中执行,运行一段时间后调用协程,协程开始执行,直到第一个挂起点,此后就像普通函数一样返回 funcA 函数。 funcA 函数执行一些代码后再次调用该协程,注意,协程这时就和普通函数不一样了。协程并不是从第一条指令开始执行而是从上一次的挂起点开始执行,执行一段时间后遇到第二个挂起点,这时协程再次像普通函数一样返回 funcA 函数,funcA 函数执行一段时间后整个程序结束。
在这里插入图片描述

(2)抢占式调度:多线程、多进程

在 I/O 密集型场景中,抢占式调度的解决方案是 “异步 + 回调” 机制。
在这里插入图片描述
其存在的问题是,在某些场景中会使得整个程序的可读性非常差。以图片下载为例,图片服务中台提供了异步接口,发起者请求之后立即返回,图片服务此时给了发起者一个唯一标识 ID,等图片服务完成下载后把结果放到一个消息队列,此时需要发起者不断消费这个 MQ 才能拿到下载是否完成的结果。
在这里插入图片描述
在这里插入图片描述

可见,整体的逻辑被拆分为了好几个部分,各个子部分都会存在状态的迁移,日后必然是 BUG 的高发地。

(3)用户态协同式调度:协程

而随着网络技术的发展和高并发要求,协程所能够提供的用户态协同调度机制的优势,在网络操作、文件操作、数据库操作、消息队列操作等重 I/O 操作场景中逐渐被挖掘。

协程将 I/O 的处理权从内核态的操作系统交还给用户态的程序自身。用户态程序在执行 I/O 时,主动的通过 yield(让出)CPU 的执行权给其他协程,多个协程之间处于平等、对称、合作的关系。
在这里插入图片描述

(4)协程的运行原理

当程序运行时,操作系统会为每个程序分配一块同等大小的虚拟内存空间,并将程序的代码和所有静态数据加载到其中。然后,创建和初始化 Stack 存储,用于储存程序的局部变量,函数参数和返回地址;创建和初始化 Heap 内存;创建和初始化 I/O 相关的任务。当前期准备工作完成后,操作系统将 CPU 的控制权移交给新创建的进程,进程开始运行。

一个进程可以有一个或多个线程,同一进程中的多个线程将共享该进程中的全部系统资源,如:虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈和线程本地存储。

在这里插入图片描述
协程通过 “挂起点” 来主动 yield(让出)CPU,并保存自身的状态,等候恢复。例如:首先在 funcA 函数中执行,运行一段时间后调用协程,协程开始执行,直到第一个挂起点,此后就像普通函数一样返回 funcA 函数。 funcA 函数执行一些代码后再次调用该协程,注意,协程这时就和普通函数不一样了。协程并不是从第一条指令开始执行而是从上一次的挂起点开始执行,执行一段时间后遇到第二个挂起点,这时协程再次像普通函数一样返回 funcA 函数,funcA 函数执行一段时间后整个程序结束。
在这里插入图片描述
协程之所可以能够 “主动让出” 和 “被恢复”,是解析器在函数运行时堆栈中保存了其运行的 Context(上下文)。
在这里插入图片描述

6. 案例

(1)geven并发下载

能够看到是先发送的获取baidu的相关信息,然后依次是itcast、itheima,但是收到数据的先后顺序不一定与发送顺序相同,这也就体现出了异步,即不确定什么时候会收到数据,顺序不一定。

from gevent import monkey
import gevent
import urllib.request

# 有耗时操作时需要
monkey.patch_all()

def my_downLoad(url):
    print('GET: %s' % url)
    resp = urllib.request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(my_downLoad, 'http://www.baidu.com/'),
        gevent.spawn(my_downLoad, 'http://www.itcast.cn/'),
        gevent.spawn(my_downLoad, 'http://www.itheima.com/'),
])

(2)geven实现多个视频下载

from gevent import monkey
import gevent
import urllib.request

#有IO才做时需要这一句
monkey.patch_all()

def my_downLoad(file_name, url):
    print('GET: %s' % url)
    resp = urllib.request.urlopen(url)
    data = resp.read()

    with open(file_name, "wb") as f:
        f.write(data)

    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(my_downLoad, "1.mp4", 'http://oo52bgdsl.bkt.clouddn.com/05day-08-%E3%80%90%E7%90%86%E8%A7%A3%E3%80%91%E5%87%BD%E6%95%B0%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93%EF%BC%88%E4%B8%80%EF%BC%89.mp4'),
        gevent.spawn(my_downLoad, "2.mp4", 'http://oo52bgdsl.bkt.clouddn.com/05day-03-%E3%80%90%E6%8E%8C%E6%8F%A1%E3%80%91%E6%97%A0%E5%8F%82%E6%95%B0%E6%97%A0%E8%BF%94%E5%9B%9E%E5%80%BC%E5%87%BD%E6%95%B0%E7%9A%84%E5%AE%9A%E4%B9%89%E3%80%81%E8%B0%83%E7%94%A8%28%E4%B8%8B%29.mp4'),
])

(3)asyncio协程对比

① 同步的情况下

import requests
import time
def downlod_image(url):
    print('开始下载')
    res=requests.get(url).content
    print('下载完成')
    # 图片保存到本地文件
    fileName=url.rsplit('-')[-1]
    with open(fileName,'wb') as wf:
        wf.write(res)

url_list=[
    'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
    'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
    'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
]

if __name__=='__main__':
    start=time.time()
    for item in url_list:
        downlod_image(item)
    end=time.time()
    print('本次耗时',end-start) # 本次耗时 1.5057339668273926
本次耗时 0.6898803710937

② 协程方式情况下【新写法】

import asyncio

import requests
import time
import aiohttp
async def downlod_image(session,url):
    print('开始下载')
    async with session.get(url,verify_ssl=False) as res:
        content=await res.content.read()
        print('下载完成')
        # 图片保存到本地文件
        fileName=url.rsplit('-')[-1]
        with open(fileName,'wb') as wf:
            wf.write(content)

url_list=[
    'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
    'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
    'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
]
async def main():
    async with aiohttp.ClientSession() as session:
        tasks=[asyncio.create_task(downlod_image(session,url)) for url in url_list]
        await asyncio.wait(tasks)
if __name__=='__main__':
    start=time.time()
    asyncio.run(main())
    end=time.time()
    print("本次耗时",end-start) # 本次耗时 0.3856923580169678
本次耗时 0.4509727954864502

(4)asyncio+不支持异步的模块

① 旧写法

# 案例:asyncio+不支持异步的模块
import asyncio
import requests
import time
import aiohttp
async def downlod_image(url):
    print('开始下载')
    loop=asyncio.get_event_loop()
    # 开启线程进行请求后转换成协程进行等待
    future=loop.run_in_executor(None,requests.get,url)
    res=await future
    print('下载完成')
    # 图片保存到本地文件
    fileName=url.rsplit('-')[-1]
    with open(fileName,'wb') as wf:
        wf.write(res.content)

url_list=[
    'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
    'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
    'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
]
def main():
    tasks=[downlod_image(url) for url in url_list]
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

if __name__=='__main__':
    start=time.time()
    main()
    end=time.time()
    print("本次耗时",end-start) # 本次耗时 0.4465653896331787
本次耗时 1.613558292388916

② 新写法

# 案例:asyncio+不支持异步的模块
import asyncio
import requests
import time
import aiohttp
async def downlod_image(url):
    print('开始下载')
    loop=asyncio.get_event_loop()
    # 开启线程进行请求后转换成协程进行等待
    future=loop.run_in_executor(None,requests.get,url)
    res=await future
    print('下载完成')
    # 图片保存到本地文件
    fileName=url.rsplit('-')[-1]
    with open(fileName,'wb') as wf:
        wf.write(res.content)


async def main():
    url_list = [
        'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
        'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
        'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
    ]
    tasks=[downlod_image(url) for url in url_list]
    # tasks=[asyncio.create_task(downlod_image(url)) for url in url_list]
    await asyncio.wait(tasks)


if __name__=='__main__':
    start=time.time()
    asyncio.run(main())
    end=time.time()
    print("本次耗时",end-start) # 本次耗时 0.4465653896331787
本次耗时 0.7298567295074463

(5)协程下载图片

① 旧写法

import asyncio
from pathlib import Path
import logging
from urllib.request import urlopen, Request
import os
from time import time
import aiohttp
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png',
                        'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg',
                        'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png']
async def download_image_async(session, dir, img_url):
    download_path = dir / os.path.basename(img_url)
    async with session.get(img_url) as response:
        with download_path.open('wb') as f:
            while True:
                # 在 async 函数中使用 await 关键字表示等待 task 执行完成,也就是等待 yeild 让出控制权。
                # 同时,asyncio 使用事件循环 event_loop 来实现整个过程。
                chunk = await response.content.read(512)
                if not chunk:
                    break
                f.write(chunk)
    logger.info('Downloaded: ' + img_url)
# 使用 async 关键字声明一个异步/协程函数。
# 调用该函数时,并不会立即运行,而是返回一个协程对象,后续在 event_loop 中执行。
async def main():
    images_dir = Path("codeflex_images")
    Path("codeflex_images").mkdir(parents=False, exist_ok=True)
    async with aiohttp.ClientSession() as session:
        tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS]
        await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
    start = time()
    # event_loop 事件循环充当管理者的角色,将控制权在几个协程函数之间切换。
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())
    logger.info('Download time: %s seconds', time() - start)

2022-11-24 10:04:54,108 - __main__ - INFO - Download time: 33.83906531333923 seconds

② 新写法

import asyncio
from pathlib import Path
import logging
from urllib.request import urlopen, Request
import os
from time import time
import aiohttp
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png',
                        'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg',
                        'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png']
async def download_image_async(session, dir, img_url):
    download_path = dir / os.path.basename(img_url)
    async with session.get(img_url) as response:
        with download_path.open('wb') as f:
            while True:
                # 在 async 函数中使用 await 关键字表示等待 task 执行完成,也就是等待 yeild 让出控制权。
                # 同时,asyncio 使用事件循环 event_loop 来实现整个过程。
                chunk = await response.content.read(512)
                if not chunk:
                    break
                f.write(chunk)
    logger.info('Downloaded: ' + img_url)
# 使用 async 关键字声明一个异步/协程函数。
# 调用该函数时,并不会立即运行,而是返回一个协程对象,后续在 event_loop 中执行。
async def main():
    images_dir = Path("codeflex_images")
    Path("codeflex_images").mkdir(parents=False, exist_ok=True)
    async with aiohttp.ClientSession() as session:
        tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS]
        # tasks = [asyncio.create_task((download_image_async(session, images_dir, img_url))) for img_url in CODEFLEX_IMAGES_URLS]
        await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
    start = time()
    # event_loop 事件循环充当管理者的角色,将控制权在几个协程函数之间切换。
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main())
    logger.info('Download time: %s seconds', time() - start)

7. 异步操作数据

(1)异步redis

在使用python代码操作redis时,链接/操作/断开都是网络IO。

pip install aioredis
import aioredis
import asyncio
async def execute(address,password):
    print('start-',address)
    # 网络IO操作:先去链接47.93.4.189:6666,遇到IO自动切换任务,去连接47.93.4.198.2712
    redis=await aioredis.create_redis_pool(address,password=password)
    # 网络IO操作:遇到IO会自动切换任务
    await redis.hmset_dict('car',key=1,key2=2,key3=3)
    # 网络IO操作:遇到IO会自动切换任务
    result=await redis.hgetall('car',encoding='utf-8')
    print(result)
    redis.close()
    # 网络IO操作:遇到IO会自动切换任务
    await redis.wait_closed()
    print('end-',address)
task_list=[
    execute('redis://47.93.4.189:6666','123456'),
    execute('redis://47.93.4.198.2712','123456')
]

asyncio.run(asyncio.wait(task_list))

(2)异步MySQL

pip install aiomysql
import asyncio
import aiomysql
async def execute():
    # 网络IO操作:链接mysql
    conn=await aiomysql.connect(host='127.0.0.1',port=3306,user='root',password='123',db='mysql')
    # 网络IO操作,创建CURSOR
    cur=await conn.cursor()
    # 网络IO操作,执行SQL
    await cur.execute('select * from user')
    # 网络IO操作,执行SQL
    result=await cur.fetchall()
    print(result)
    # 网络IO操作,执行SQL
    await cur.close()
    conn.close()

asyncio.run(execute())
import asyncio
import aiomysql
async def execute(host,password):
    # 网络IO操作:链接mysql
    conn=await aiomysql.connect(host=host,port=3306,user='root',password=password,db='mysql')
    # 网络IO操作,创建CURSOR
    cur=await conn.cursor()
    # 网络IO操作,执行SQL
    await cur.execute('select * from user')
    # 网络IO操作,执行SQL
    result=await cur.fetchall()
    print(result)
    # 网络IO操作,执行SQL
    await cur.close()
    conn.close()
task_list=[
    execute('127.0.0.1','123456'),
    execute('192.168.3.112','1a23456')
]

asyncio.run(asyncio.wait(task_list))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/114447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++类和对象概念及实现详解(上篇)

文章目录 一、什么是类和对象呢? 1、类的引入 2、类的定义 3、类的访问限定符 4、类对象的储存方式 5、this指针的特性 二、类的六个默认成员函数详解 1、构造函数 2、析构函数 3、未完待续…… 标题:类和对象概念及实现详解(上篇&#xff0…

vue3 antd table表格——自定义单元格样式(二)利用rowClassName给table添加行样式

vue3 antd项目实战——修改ant design vue组件中table表格的默认样式(二)知识调用场景复现修改table表格的行样式一、rowClassName添加行样式二、表格的不可控操作写在最后知识调用 文章中可能会用到的知识链接vue3ant design vuets实战【ant-design-vu…

从头开始用树莓派做一个NAS【最新超详细教程】

一、概述 众所周知在办公的时候两台电脑之间经常倒数据资料非常麻烦,而NAS可以很好的解决这个问题。树莓派搭建NAS方法有很多,我们之前也拍过直接用Samba、FTP这些来实现NAS功能,但是这些需要你会在命令行进行配置,而且对于新手用…

【Linux】Linux权限管理

目录一.Linux用户权限1.权限的概念2.用户分类3.切换用户4.sudo提权二.Linux文件权限1.文件属性2.文件类型3.文件角色划分4.基本权限三.文件访问权限的相关设置方法1.chmod2.chown3.charp4.file5.权限拒绝四.默认权限umask五.目录的权限六.粘滞位1.背景2.准备3.情况4.粘滞位一.L…

初识Docker:(1)什么是docker

初识Docker:(1)什么是docker项目部署的问题Docker总结项目部署的问题 大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题: 依赖关系复杂,容易出现兼容性问题开发、测试、生产环境有差…

git revert以及revert的恢复

一:背景与方案 在工作中遇见的这样的场景: 场景一: 已经merge到待发布的版本分支中的功能需要移除当前的分支,改在后续版本发布,示意图如下,展示的是commit序列, 这里想要移除的功能是commi…

[python库] base64库的基本使用

1. base64是什么 base64是一种二进制到文本格式的编码方式。具体来说就是将byte数组编码为字符串的方法,而编码出来的字符串只包含ASCII基础字符。 虽然说base64是一种编码方式,但是它并不推荐作为常规的加密算法使用,因为该算法的加解密算法…

Android开发进阶——binder通讯学习

什么是binder 通常意义下,binder指的是一种通信机制对Server端来说,Binder指的是Binder本地对象,对于Client端来说,Binder指的是Binder代理对象对于传输过程而言,binder是可以跨进程传输的对象 Binder的基本原理 Bi…

【工作流Activiti7】7、Activiti7+SpringBoot

1. 版本问题 1.1. Activiti版本 7.1.0-M6是最后一个支持JDK1.8的版本,此后的版本都要求JDK11以上 目前,Activiti最新版本是7.6.0,它是用JDK11编译的,因此要想使用最新版7.6.0必须升级JDK版本,不能再用1.8 同时&…

【数组中数字出现的次数-有限状态自动机】

数组中数字出现的次数一,有限状态自动机解法二,一般解法想必大家对数组中数字出现的次数的这种题并不少见, 主要有三种: 1,找出数组中只出现一次的数字(其他数字出现两次) 2,找出数组…

渗透测试指操作系统漏洞发现与防御概述

今天继续给大家介绍渗透测试相关知识,本文主要内容是渗透测试指操作系统漏洞发现与防御概述。 免责声明: 本文所介绍的内容仅做学习交流使用,严禁利用文中技术进行非法行为,否则造成一切严重后果自负! 再次强调&#x…

Android四大组件之Service

文章目录Android四大组件之默默劳动的Service什么是ServiceAndroid多线程编程线程的基本用法在子线程中更新UI解析异步消息处理机制MessageHandlerMessageQueueLooper异步消息的整个流程使用AsyncTaskService的基本用法定义一个Service启动和停止ServiceActivity和Service进行通…

【4】axi协议学习

1、axi背景介绍: Advanced extensible Interface(AXI)是为了满足高性能系统设计而定义的一套独立通道协议,首次是在2003年发布的AMBA3标准中出现,经历AMBA4,目前已经到达AMBA5版本。 2、axi 特性: AXI满足如下的特性: 适合于高带宽,低延迟的设计 不需要通过复杂的桥…

去耦电容和旁路的概念说明与应用说明

回想当初第一眼看觉得,这啥玩意自己又菜了! 电容:本质就是充放电,实际应该我们围着这个来转 一、解释 旁路电容从英文角度看,就是抄小路的意思,意思当有一个干扰来临时候,可以通过这个电容抄…

LeetCode 1739. 放置盒子:数学 思维

【LetMeFly】1739.放置盒子 力扣题目链接:https://leetcode.cn/problems/building-boxes/ 有一个立方体房间,其长度、宽度和高度都等于 n 个单位。请你在房间里放置 n 个盒子,每个盒子都是一个单位边长的立方体。放置规则如下: …

世界杯竞猜项目Dapp-第五章(合约升级)

目前主流有三种合约升级方法 transparent 方式;(通用,业务逻辑和代理逻辑解耦合,比较贵)uups 方式;(代理逻辑集成到了业务逻辑,通过继承来实现,便宜)beacon 方式&#x…

408 考研《操作系统》第三章第二节:基本分页存储管理、两级页表、基本分段存储管理方式、段页式管理方式

文章目录教程1. 基本分页存储管理的基本概念1.1 连续分配方式的缺点1.2 把“固定分区分配”改造为“非连续分配版本”1.3 什么是分页存储1.4 如何实现地址的转换?1.5 逻辑地址结构1.6 重要的数据结构——页表1.7 知识回顾与重要考点2. 基本地址变换机构2.1 例题2.2 …

node.js+uni计算机毕设项目购物小程序(程序+小程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置: Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术: Express框架 Node.js Vue 等等组成,B/S模式 Vscode管理前后端分离等…

不写一行代码(一):实现安卓基于GPIO的LED设备驱动

文章目录系列文章一、前言二、准备工作2.1 内核版本2.2 内核文档:bindings->leds2.3 文档解析: leds-gpio.txt三、编写DTS3.1 查原理图,挑选GPIO3.2 编写DTS文件四、编译测试4.1 编译dt.img4.2 烧录dt.img五、基于fs的测试5.1 测试命令5.2…

第二十五章 数论——约数

第二十五章 数论——约数一、什么是约数二、约数的求解——试除法1、问题2、思路分析3、代码实现三、约数个数1、问题描述2、算法思路3、代码实现四、约数之和1、问题描述2、算法思路3、代码实现五、最大公约数——欧几里德算法1、问题描述2、算法思路(1&#xff09…