Python 并发编程

news2024/12/26 22:11:07

一.Python 对并发编程的支持

  • 多线程:threading,利用CPU和IO可同时执行的原理,让CPU不会干巴巴等待IO完成,而是切换到其他Task(任务),进行多线程的执行。
  • 多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务。
  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行。

额外的辅助功能:

  • 使用Lock对共享资源加锁,防止冲突访问。
  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果。
  • 使用subprocess启动外部程序的进程,并进行输入输出交互。

二.怎样选择多进程多线程多协程

Python 并发编程有三种方式

  • 多线程Thread
  • 多进程Process
  • 多协程Coroutine

1.什么是CPU密集型计算、IO密集型计算?

  • CPU密集型(CPU bound):

    CPU Bound的意思是任务收到CPU的限制,CPU达到顶峰。

    CPU密集型也叫做计算密集型,是指 I/O可以在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高。

    例如:压缩、解压缩、加密解密、正则表达式搜索

  • IO密集型(I/O bound):

    IO密集型指的是系统运作大部分的状况是CPU在等待 I/O(硬盘/内存/网络)的读/写操作,CPU占用率仍然较低。

    例如:文件处理程序、网络爬虫程序、读写数据库程序。

2.多线程、多进程、多协程对比

  • 多进程 Process (multiprocessing)

    一个进程中可以启动N个线程。

    • 优点:可以利用多核CPU并行运算。

    • 缺点:占用资源多、可启动数目比线程少。

    • 适用于:CPU密集型计算。

  • 多线程 Tread (threading)

    一个线程中可以启动N个协程。

    • 优点:相比于进程,更轻量级、占用资源少(资源指的是变量的存储)。

    • 缺点:

      • 相比进程:多线程只能并发执行,不能利用CPU的多核(由于GIL锁)。
      • 相比协程:启动数目有限制,占用内存资源,有线程切换开销。
    • 适用于:I/O密集型、同时运行的数目要求不多。

  • 多协程 Coroutine (asyncio)

    • 优点:内存开销最少、启动协程数量最多。
    • 缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂。
    • 适用于:I/O 密集型计算、需要超多任务、但有现成库支持的场景。
CPU密集型
IO密集型
待执行任务
任务特点?
使用多进程 multiprocessing
1.需要超多任务量? 2.有现成协程库支持? 3.协程复杂度可接受?
使用多线程 threading
使用多协程asyncio

三.Python 全局解释器锁GIL

1.Python速度慢的两大原因

​ 相比于C/C++/JAVA,Python确实慢,在一些特殊场景下,Python比C++慢100~200倍。由于速度慢的原因,很多公司的基础架构代码使用C/C++开发,比如各大公司阿里/腾讯/快手的推荐引擎、搜索引擎、存储引擎等底层对性能要求高的模块。

Python速度慢的原因:

  1. 动态类型语言、边解释边执行。
  2. GIL全局解释器锁,无法利用多核CPU并发执行。

2.GIL是什么?

全局解释器锁(Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。

即使在多核心CPU上,使用GIL的解释器也只允许同一时间执行一个线程。
在这里插入图片描述

由于GIL的存在,即使电脑有多核CPU,单个时刻也只能使用1个核心,相比并发加速的C++/JAVA所以慢。

3.为什么有GIL?

简而言之:Python设计初期,为了规避并发问题引入了GIL,现在想去除却去不掉!

为了解决多线程之间数据完整性和状态同步问题

Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象。

开始:线程A和线程B都引用了对象obj,obj.ref_num = 2,线程A和B都想撤销对obj的引用。

线程A 线程B obj.ref_num -- 变成1 1 loop 此时发生多线程调度切换 2 obj.ref_num -- 变成0 3 if obj.ref_num == 0: free obj 4 loop 此时发生多线程调度切换 5 if obj.ref_num == 0(如果刚好执行到这一步会报错,): free obj(如果执行到这一步会释放其他对象的内存) 6 错误:obj已经不存在了,这两行代码可能破坏内存。 7 loop 线程A 线程B

GIL确实有好处:简化了Python对共享资源的管理。

四.怎样规避GIL带来的限制?

  1. 多线程threading 机制依然是有用的,用于IO密集型计算

    因为 I/O(read、write、send、recv、etc.)期间,线程会释放GIL,实现CPU和IO的并行,因此多线程用于IO密集型计算依然可以大幅提高速度,但是多线程用于CPU密集型计算时,只会更加拖慢速度。

  2. 使用multiprocessing的多进程机制实现并行计算、利用多核CPU优势,为了应对GIL的问题,Python提供了multiprocessing。

五.利用多线程,Pyhton爬虫被加速10倍数

1.Python创建多线程的方法

  1. 准备一个函数

    def my_func(a, b):
    	do_craw(a, b)
    
  2. 怎样创建一个线程

    import threading
    
    t = threading.Thread(target=my_func, args=(100, 200))
    
  3. 启动线程

    t.start()
    
  4. 等待结束

    t.join()
    

2.改写爬虫程序,变成多线程爬取

import requests
import threading

urls = [f'https://www.cnblogs.com/#p{page}' for page in range(51)]

def craw(url):
    res = requests.get(url)
    print('url: {}, len: {}'.format(url, len(res.text)))
    

def multi_thread():
    
    print('multi_thread begin')
    
    threads = []
    
    for url in urls:
        threads.append(threading.Thread(target=craw, args=(url, )))
        
    for thread in threads:
        # 启动线程
        thread.start()
        
    for thread in threads:
        # 等待结束
        thread.join()
    
    print('multi_thread end')
    

%%time
multi_thread()

3.速度对比:单线程爬虫VS多线程爬虫

import requests
import threading

urls = [f'https://www.cnblogs.com/#p{page}' for page in range(51)]


def craw(url):
    res = requests.get(url)
    print('url: {}, len: {}'.format(url, len(res.text)))
    

def single_thread():
    print('single_thread begin')
    
    for url in urls:
        craw(url)
    
    print('single_thread end')


def multi_thread():
    
    print('multi_thread begin')
    
    threads = []
    
    for url in urls:
        threads.append(threading.Thread(target=craw, args=(url, )))
        
    for thread in threads:
        # 启动线程
        thread.start()
        
    for thread in threads:
        # 等待结束
        thread.join()
    
    print('multi_thread end')
    

%%time
single_thread()

%%time
multi_thread()

六.Python实现生产者消费者爬虫

1.多组件的Pipeline技术架构

​ 复杂的事情一般不会一下子做完,而是分很多中间步骤一步步完成。
在这里插入图片描述

把很多事情分很多模块来处理的这种架构叫做Pipeline,每个处理器叫做Processor。其实生产者-消费者就是一个典型的Pipeline,第一个就是生产者,最后一个就是消费者。生产者生产的结果会通过中间数据传给消费者进行消费。生产者使用输入数据作为原料,消费者输出数据。

2.生产者消费者爬虫的架构

在这里插入图片描述

生产者-消费者爬虫架构就是说里面有两个Processor。第一个process获取待爬取的URL列表进行网页的下载,下载的内容放在下载好的网页队列中。消费者消费中间的数据,进行网页的解析并且把结果进行存储。

这样做的好处是:生产者和消费者可以由两拨人开发,并且配置不同的资源(如线程数)。

那么两个线程组之间的交互数据是怎么进行的?

3.多线程数据通信的queue.Queue

queue.Queue可以用于多线程之间的、线程安全的数据通信。

线程安全是指多个线程并发访问数据不会出现冲突。

  1. 导入quque库

    import queue
    
  2. 创建Queue对象

    q = queue.Queue()
    
  3. 添加元素

    # put 当队列满了之后,会阻塞,直到队列中有了存放位置才能put进去
    q.put(item)
    
  4. 获取元素

    # get 当队列中没有元素是,会阻塞,直到队列中有了数据
    item = q.get()
    
  5. 查询状态

    # 查询队列元素数量
    q.qsize()
    
    # 判断是否为空
    q.empty()
    
    # 判断是否已满
    q.full()
    

4.代码编写二实现生产者消费者爬虫

import time
import random
import requests
import threading
import queue
from bs4 import BeautifulSoup

def craw(url_queue, html_queue):
    
    while True:
        
        # 从队列中取出一个URL
        url = url_queue.get()
        
        # 请求获取页面
        html = requests.get(url).text
        
        # 将页面内容加入到队列中
        html_queue.put(html)
        
        # 打印信息
        print('线程名:{} URL:{} URL队列剩余数:{}'.format(threading.current_thread().name, url, url_queue.qsize()))        
        
        # 随机休眠
        time.sleep(random.randint(1, 2))
        

def parse(html_queue, data_queue):
    
    while True:
        
        # 从队列中取出信息
        html = html_queue.get()
        
        # 从网页中提取信息
        soup = BeautifulSoup(html, 'html.parser')
        links = soup.find_all('a', class_='post-item-title')
        
        for link in links:
            data_queue.put((link['href'], link.get_text()))
        
        print('线程名:{} 数据的数量:{}'.format(threading.current_thread().name, data_queue.qsize()))        

# URL队列
url_queue = queue.Queue()

# 网页队列
html_queue = queue.Queue()

# 数据队列
data_queue = queue.Queue()
urls = [f'https://www.cnblogs.com/#p{page}' for page in range(1, 51)]

for url in urls:
    url_queue.put(url)
    
# 创建三个线程去完成爬取
for idx in range(3):
    t = threading.Thread(target=craw, args=(url_queue, html_queue), name=f'craw_{idx}')
    t.start()


# 创建三个线程去完成解析
for idx in range(3):
    t = threading.Thread(target=parse, args=(html_queue, data_queue, ), name=f'parse_{idx}')
    t.start()
# 查看数据队列
list(data_queue.queue)

七.线程安全问题以及Lock解决方案

1. 线程安全概念介绍

线程安全是指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能能正确完成。

由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。

def draw(account, amount):
    """银行取钱"""
	if account.balance >= amount:
		account.balance -= amount

这样的代码看起来没有问题,但是在多线程环境下,就会出问题。因为多线程环境下,线程在不由自主的切换。

假设银行卡余额1000,两个线程同时取出800。余额1000大于800,进入if判断取钱

线程A 线程B if 余额1000大于800,进入if 1 loop 余额1000大于800,进入if 2 loop 余额1000减去800变成200 3 loop 余额200减去800变成-600 4 loop 线程A 线程B

2.Lock 用于解决线程安全问题

用法一:try_finally模式

import threading

lock = threading.lock()

lock.acquire()

try:
	# do something

finally:
	lock.release()

用法二:with模式

import threading

lock = threading.Lock()

with lock:
	# do something

3.示例代码解决问题以及解决方案

未加GIL锁:

import threading
import time

class Account:
    
    def __init__(self, balance):
        self.balance = balance

def draw(account, amount):
    """取钱"""
    
    if account.balance >= amount:
        # sleep 一定会导致线程阻塞和线程切换
        time.sleep(0.1)
        print(threading.current_thread().name, '取钱成功')
        
        account.balance -= amount
        
        print(threading.current_thread().name, '余额', account.balance)
    
    else:
        print(threading.current_thread().name, '余额不足!')


account = Account(1000)

theading_a = threading.Thread(name='theading_a', target=draw, args=(account, 800))
theading_b = threading.Thread(name='theading_b', target=draw, args=(account, 800))

theading_a.start()
theading_b.start()

执行结果:

theading_a 取钱成功
theading_a 余额 200
theading_b 取钱成功
theading_b 余额 -600

如果代码不做任何处理,该bug有时候出现,有时候不出现。如果代码中刚好进行了远程调用,或者sleep的话,那么该bug一定会出现。

加GIL锁:

import threading
import time

lock = threading.Lock()

class Account:
    
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    """取钱"""
    
    with lock:
        if account.balance >= amount:
            # sleep 一定会导致线程阻塞和线程切换

            print(threading.current_thread().name, '取钱成功')

            account.balance -= amount

            print(threading.current_thread().name, '余额', account.balance)

        else:
            print(threading.current_thread().name, '余额不足!')


account = Account(1000)

theading_a = threading.Thread(name='theading_a', target=draw, args=(account, 800))
theading_b = threading.Thread(name='theading_b', target=draw, args=(account, 800))

theading_a.start()
theading_b.start()

执行结果:

theading_a 取钱成功
theading_a 余额 200
theading_b 余额不足!

八.好用的线程池 ThreadPoolExecutor

1.线程池的原理

调用start
获取CPU资源
失去CPU资源
sleep/IO
sleep/IO结束
run方法执行完成
线程池的生命周期
新建线程
就绪
运行
阻塞
终止

新建线程系统需要分配资源、终止线程系统需要回收资源。

当系统中有大量的线程需要使用的时候,就会频繁的新建和终止线程,就会有很多的时间开销和线程的开销。

如果可以重用线程,则可以减去新建线程/终止线程的开销。

线程池的流转:

  1. 线程池里面是提前预先建好的线程。这些线程会被重复的使用。
  2. 任务队列,当一个新任务来的时候,并不是直接创建一个线程,而是放入任务队列中。
  3. 线程池里面的线程已经空闲的线程会依次取出任务进行执行,执行任务完成之后,会取下一个任务进行执行,如果没有任务线程会回到线程池但是并不会销毁,在线程池中等待下一个任务的到来。.
  4. 通过可重用的线程和任务队列实现了线程池。

在这里插入图片描述

2.使用线程池的好处

  1. 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源。
  2. 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理实际较短。
  3. 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题。
  4. 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁。

3.使用线程池改造爬虫程序

TreadPoolExecutor的使用语法

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

方法一:map函数,很简单,注意map的结果和入参是顺序对应的

with ThreadPoolExecutor() as pool:
	results = pool.map(craw, urls)
	
	for resutl in results:
		print(result)

方法二:future模式,更强大。注意如果用as_completed顺序是不定的

with ThreadPoolExecutor() as pool:
	futures = [pool.submit(craw, url) for url in urls]
	
	for future in futures:
		print(future.result())
	
	for future in as_completed(futures):
		print(future.result())

线程池爬虫完整代码:

import requests
import threading

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

from bs4 import BeautifulSoup

urls = [f'https://www.cnblogs.com/#p{page}' for page in range(1, 51)]

def craw(url):
    res = requests.get(url)
    
    return res.text
    

def parse(html):
    soup = BeautifulSoup(html, 'html.parser')
    
    links = soup.find_all('a', class_='post-item-title')
    
    return [(link["href"], link.get_text()) for link in links]


with ThreadPoolExecutor() as pool:
    htmls = pool.map(craw, urls)
    
    htmls = list(zip(urls, htmls))
    
    for url, html in htmls:
        print(url, len(html))


with ThreadPoolExecutor() as pool:
    futures = {}
    
    for url, html in htmls:
        future = pool.submit(parse, html)
        
        futures[future] = url
        
    for future, url in futures.items():
        print(url, future.result())

九.在Web服务中,使用线程池加速

1. Web服务架构以及特点

在这里插入图片描述

Web后台服务的特点:

  1. Web服务对响应时间要求非常高,比如要求200ms返回响应。
  2. Web服务有大量的以来IO操作的调用,比如磁盘文件、数据库、远程API。
  3. Web服务经常需要处理几万、几百万的同时请求。

2.使用线程池ThreadPoolExecutor加速

面对大量的请求,不能够无限制的创建线程,因为线程会消耗资源。

使用线程池的ThreadPoolExecutor的好处:

  1. 方便的将磁盘文件、数据库、远程API的IO调用并发执行
  2. 线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能。

3.代码用Flask实现Web服务并实现加速

原始版本:

import flask
import json
import time

app = flask.Flask(__name__)

def read_file():
    time.sleep(0.1)
    return 'file result'


def read_db():
    time.sleep(0.2)
    return 'db result'


def read_api():
    time.sleep(0.3)
    return 'api result'


@app.route('/')
def index():
    result_file = read_file()
    result_db = read_db()
    result_api = read_api()
    
    return json.dumps(
        {
            'result_file': result_file,
            'result_db': result_db,
            'result_api': result_api,
        }
    )

if __name__ == '__main__':
    app.run()

改造版本:

import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor


app = flask.Flask(__name__)

pool = ThreadPoolExecutor()


def read_file():
    time.sleep(0.1)
    return 'file result'


def read_db():
    time.sleep(0.2)
    return 'db result'


def read_api():
    time.sleep(0.3)
    return 'api result'


@app.route('/')
def index():
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)
    
    return json.dumps(
        {
            'result_file': result_file.result(),
            'result_db': result_db.result(),
            'result_api': result_api.result(),
        }
    )


if __name__ == '__main__':
    app.run()

十.使用多进程multiprocessing加速程序的运行

1.有了多线程threading,为什么还要用多进程multiprocessing

虽然有全局解释器锁GIL,但是因为有IO的存在,多线程依然可以加速运行。
在这里插入图片描述

CPU密集型计算线程的自动切换反而变成了负担,多线程甚至减慢了运行速度。
在这里插入图片描述

multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是多进程在多CPU上并行执行。

2.多进程multiprocessing知识梳理(对比多线程threading)

语法条目多线程多进程
引入模块from threading import Threadfrom multiprocessing import Process
新建t = Thread(target=func, args=(100, ))p = Process(target=f, args=(‘bob’, ))
启动t.start()p.start()
等待结束t.join()p.join()
数据通信import queue
q = queue.Queue()
q.put(item)
item = q.get()
from multiprocessing import Queue
q = Queue()
q.put(item)
item = q.get()
线程安全加锁from threading import Lock
lock = Lock()
with lock:
# do something
from multiprocessing import Lock
lock = Lock()
with lock:
# do something
池化技术from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
# 方法一
results = executor.map(func, [1,2, 3])

# 方法二
result = future.result()
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
# 方法一
results = executor.map(func, [1, 2, 3])

# 方法二
results = future.result()

05:49

3.代码实战:单线程、多线程、多进程对比CPU密集计算速度

CPU密集型计算:100次"判断大数字是否是素数"的计算
在这里插入图片描述

由于GIL的存在,多线程比单线程计算的还慢,而多进程可以明显加快执行速度。

import math

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

# 定义一个重复的素数列表
PRIMES = [112272535095293] * 100

def is_prime(n):
    """一个数字除了1和自身整除不能被其他数字整除"""
    if n < 2:
        return False
    elif n == 2:
        return True
    elif n % 2 == 0:
        return False
    
    sqrt_n = int(math.floor(math.sqrt(n)))
    
    for i in range(3, sqrt_n+1, 2):
        if n % i == 0:
            return False
    
    else:
        return True

def single_thread():
	"""单线程"""
    for number in PRIMES:
        is_prime(number)
        

%%time
single_thread()
# CPU times: total: 54.4 s
# Wall time: 1min 8s
def multi_thread():
	"""多线程"""
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


%%time
multi_thread()
# CPU times: total: 43.8 s
# Wall time: 1min 8s
def multi_process():
	"""多进程"""
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)
        

%%time
multi_process()
# CPU times: total: 15.6 ms
# Wall time: 119 ms

十一、在Flask服务中使用进程池加速

在前面我们提到多线程应用于IO型的应用,而多进程可以加速CPU密集型的计算。Flask Web服务是一种特殊的场景,在这种场景中,我们大部分情况下使用多线程加速就可以了。但有些应用,也会遇到CPU密集型的计算,那么怎么在Flask Web服务中使用进程池来加速是一个问题。

import math
import json
from concurrent.futures import ProcessPoolExecutor

import flask

app = flask.Flask(__name__)


def is_prime(n):
    """一个数字除了1和自身整除不能被其他数字整除"""
    if n < 2:
        return False
    elif n == 2:
        return True
    elif n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))

    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False

    else:
        return True


@app.route('/is_prime/<numbers>')
def api_is_prime(numbers):
    print(numbers)

    number_list = [int(x) for x in numbers.split(',')]

    results = process_pool.map(is_prime, number_list)

    return json.dumps(dict(zip(number_list, results)))


if __name__ == '__main__':
    process_pool = ProcessPoolExecutor()
    app.run()

多进程和多线程的一个区别:多进程它们的环境之间都是相互完全 隔离的。就是当定义这个pool的时候,它所依赖的函数必须都已经声明完成了。

process_pool必须放在所有申明函数的最下面才能正常使用,还需要将process_pool的定义放到main函数里面。

​ 在以上的尝试和对比中,我们也看到对于多线程的使用其实非常灵活,定义在哪里都没有问题,因为它们共享当前进程的所有的环境但是多进程这里在使用过程中就遇到了一些问题,有时候需要查资料进行解决。所以说大部分情况下只使用多线程就ok了,真的遇到了CPU密集型的计算,想办法引入多进程解决问题。在flask程序中使用多进程的方式就在main函数里面,在app.run()之前初始化进程池。然后在所有的函数里面就可以使用这个进程池。

十二、Python异步IO实现并发爬虫

1.单线程爬虫的执行路径

在这里插入图片描述

2.协程:单线程内实现并发

核心原理:用一个超级循环(其实就是while true)循环,这个超级循环是可以自己控制的。

核心原理:配合IO多路复用原理(IO时CPU可以干其他事情)
在这里插入图片描述

《The one loop》

至尊循环驭众生

至尊循环寻众生

至尊循环引众生

普照众生欣欣荣

3.Python异步IO库介绍:asyncio

async:异步

io:输入输出

import asyncio

# RuntimeError: This event loop is already running 出现该问题时导入nest_asyncio解决
# import nest_asyncio
# nest_asyncio.apply()

urls = [f'https://www.cnblogs.com/#p{page}' for page in range(1, 51)]

# 获取事件循环(里面就是一个while true)
loop = asyncio.get_event_loop()


async def get_url(url):
    await asyncio.sleep(1)
    print(url)


# 定义协程 async说明这个函数是一个协程
async def myfunc(url):
    # await 非常重要 对应IO 进行到这个一步骤时,不进行阻塞,而是让超级循环进入下一个程序的执行
    await get_url(url)
   

# 创建task列表
tasks = [loop.create_task(myfunc(url)) for url in urls]

# 执行爬虫事件列表
loop.run_until_complete(asyncio.wait(tasks))

注意:

  1. 要用在异步IO编程中,依赖的库必须支持异步IO特性
  2. 爬虫应用中:requests不支持异步,需要用aiohttp
import asyncio
import time

import aiohttp

urls = [f'https://www.cnblogs.com/#p{page}' for page in range(1, 31)]


async def async_craw(url):

    async with aiohttp.ClientSession() as session:

        async with session.get(url) as res:

            result = await res.text()

            print('craw url: {} {}'.format(url, len(result)))


loop = asyncio.get_event_loop()

tasks = [loop.create_task(async_craw(url)) for url in urls]


start_time = time.time()
loop.run_until_complete(asyncio.wait(tasks))
print(time.time() - start_time)

十三、在异步IO中使用信号量控制爬虫并发度

信号量(Semaphore)

信号量(Semaphore)又称为信号、旗语,是一个同步对象,用于保持0至指定最大值之间的一个计数值。

  • 当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一
  • 当线程完成一次对semaphore对象的释放(release)时,计数值加一
  • 当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象编程signaled状态
  • semaphore对象的计数值大于0,为signaled状态,计数值等于0,为nosignaled状态。

使用方式一:

sem = asyncio.Semaphore(10)

# ...later
# 可用保证并发度处于指定的数量之内
async with sem:
	# work with shared resoure

使用方式二:

sem = asyncio.Semaphore(10)

# ...later
await sem.acquire()
try:
	# work with shared resoure
finally:
	sem.release()

实例:

import asyncio
import time

import aiohttp

urls = [f'https://www.cnblogs.com/#p{page}' for page in range(1, 31)]

# 设置并发度为10
semaphore = asyncio.Semaphore(10)


async def async_craw(url):

    async with semaphore:

        print('craw url: ', url)

        async with aiohttp.ClientSession() as session:

            async with session.get(url) as res:

                result = await res.text()

                # 这里休眠看执行情况 会看到这里是10个执行完成之后,接着又执行10个
                await asyncio.sleep(5)

                print('craw url: {} {}'.format(url, len(result)))


loop = asyncio.get_event_loop()

tasks = [loop.create_task(async_craw(url)) for url in urls]


start_time = time.time()
loop.run_until_complete(asyncio.wait(tasks))
print(time.time() - start_time)

十四、使用subprocess启动电脑任意程序,听歌、解压缩、自动下载等

1.使用subprocess启动电脑的子进程

subproces模块:

  • 允许生成新的进程
  • 连接它们的输入、输出、错误管道
  • 并且获取它们的返回码

应用场景:

  • 每天定时08:00自动打开酷狗音乐播放歌曲
  • 调用7z.exe自动解压.7z文件
  • 通过Python远程提交一个torrent种子文件,用电脑启动下载

2.subprocess的实例

用默认的应用程序打开歌曲文件

注:windows下是start、macOs是open、Linux是see

# windows环境需要加shell=True
proc = subprocess.Popen(['start', 'xxx.mp3'], shell=True)
 
proc.communicate()

用7z.exe解压7z压缩文件

proc = subprocess.Popen([r'C:\Program Files\7-Zip\7z.exe', 'x', './data/7z_test.7z', '-o ./datas/exetract_7z_test', '-aoa'], shell=True)

proc.communicate()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/75834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微前端总结

微前端概述 微前端概念是从微服务概念扩展而来的&#xff0c;摒弃大型单体方式&#xff0c;将前端整体分解为小而简单的块&#xff0c;这些块可以独立开发、测试和部署&#xff0c;同时仍然聚合为一个产品出现在客户面前。可以理解微前端是一种将多个可独立交付的小型前端应用…

使用极限学习机进行股市预测(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 极限学习机&#xff08;Extreme Learning Machine,ELM&#xff09;作为前馈神经网络学习中一种全新的训练框架,在行为识别、情…

EPICS -- 使用asynPortDriver类编写示一个示例程序

本示例展示了如何使用asynPortDriver类编写一个EPICS端口驱动程序的示例。 这个驱动程序参数库中一个有5个参数&#xff0c;分别支持5个EPICS记录。 如下是具体步骤&#xff1a; 1&#xff09; 用makeBaseApp.pl脚本建立这个IOC应用程序的框架&#xff1a; [blctrlmain-mach…

IDEA中如何实现git的cherry-pick可视化操作?

目录 问题现象&#xff1a; 问题分析&#xff1a; 解决方法&#xff1a; 拓展&#xff1a;如何回退提交记录&#xff1f; 问题现象&#xff1a; 今天在学习了git的cherry-pick功能&#xff0c;于是引出了一个问题&#xff1a; IDEA中如何实现git的cherry-pick可视化操作&am…

【Docker学习教程系列】7-如何将本地的Docker镜像发布到阿里云

在上一篇中&#xff0c;我们使用docker commit 命令&#xff0c;创建了一个带有vim的Ubuntu镜像。那么怎么将这个镜像分享出去呢&#xff1f;本文就来讲解如何将本地的docker镜像发布到阿里云上。 本文主要内容&#xff1a; 1&#xff1a;本地镜像发布到阿里云流程 2&#xf…

Linux网络原理与编程(4)——第十四节 传输层协议

目录 前置知识 再谈端口号 几个函数 netstat telnet UDP报文 UDP协议端格式 UDP首部&#xff1a; UDP的特点 UDP的缓冲区 TCP报文详解 Tcp报头详解 传输层是在应用层的下面一层&#xff0c;我们在讲解传输层协议之前&#xff0c;先来说一说一些前置知识即命令函数等…

看完就会flink基础API

文章目录一、执行环境&#xff08;Execution Environment&#xff09;1、创建执行环境2、执行模式(Execution Mode)3、触发程序执行二、源算子&#xff08;Source&#xff09;1、数据源类准备2、从集合中读取数据3、从文件中读取数据4、从Socket中读取数据5、从Kafka中读取数据…

ISCSLP 2022 Program|希尔贝壳邀您参加第十三届中文口语语言处理国际会议

第十三届中文口语语言处理国际会议将于2022年12月11-14日&#xff08;本周日~下周三&#xff09;正式开启&#xff0c;本次会议中的部分Session将通过语音之家视频号进行线上直播&#xff0c;欢迎大家参加&#xff01; 官网&#xff1a;www.iscslp2022.org 大会简介 中文口语…

ABAP学习笔记之——第十二章:SALV

一、概要 在Net Weaver 2004 平台上集合了利用函数和 GRID 的功能发布的 SALV 包程序 (SALV也称为 New ALV)。SALV 可以像利用函数生成 ALV 的不用创建屏幕就可以调用的全屏式ALV&#xff0c;还可以利用控制器在屏幕的控制器中显示 ALV。利用GRID 的ALV 虽然不能注册成 Batch …

速卖通知识产权规则介绍,如何才能规避侵权的问题?

最近有商家咨询到我&#xff0c;关于速卖通对商品知识产权是怎么评判的&#xff0c;今天就来给大家科普一下平台关于知识产权的规则介绍 速卖通平台严禁用户未经授权发布、销售涉嫌侵犯第三方知识产权的商品或发布涉嫌侵犯第三方知识产权的信息。 若卖家发布涉嫌侵犯第三方知…

从股票市场选择配对的股票:距离计算方法

我们来看看如何定义距离的计算方法。 回想一下&#xff0c;在共同趋势模型中&#xff0c;协整的必要条件是从共同趋势得到的新息序列必须完全相关。APT中的共同因子回报就理解成是从共同趋势得到的新息序列&#xff0c;因此&#xff0c;新息序列之间的相关系数就是共同因子回报…

网关性能大PK,Spring Cloud Gateway让人大失所望

现在的架构基本都是使用微服务的&#xff0c;而网关作为微服务的统一门户在架构模式中用得越来越多&#xff0c;API网关是所有客户端的单一入口点。 API网关模式是微服务体系结构的一个很好的起点&#xff0c;因为它能够将特定的请求路由到我们从整体上分离的不同服务。事实上…

嘿嘿嘿,10个我经常逛的“小网站”,不骗人

反思一下&#xff0c;为什么你会点进来~ 咳咳&#xff0c;步入正题。 工欲善其事必先利其器&#xff0c;你们的收藏夹里收藏了多少“小网站”&#xff1f;今天和大家分享10个我压箱底的效率工具/平台&#xff0c;现在用不上不要紧&#xff0c;赶紧收藏最关键&#xff01; 1.G…

【产品设计】APP常见的6种图片浏览模式

六种是指比较常见的图片浏览模式&#xff0c;是在这四种的基础上加上手势来实现另一种读图方式&#xff0c;这里不绝对说就这么几种&#xff0c;那么就谈谈个人对每种图片浏览模式说说自己在实际使用和工作中的一些理解&#xff0c;以及这些模式在实际设计中&#xff0c;哪一种…

智能聊天机器人––ChatGPT初体验

最近几天&#xff0c;由OpenAI公司发布的ChatGPT聊天机器人火了&#xff0c;小杨也怀着对新鲜事物的好奇&#xff0c;亲自体验了一下这个火爆全网&#xff0c;让人沉迷其中无法自拔的ChatGPT聊天机器人&#xff0c;经过体验&#xff0c;我只想用一个字来表达我的感受&#xff0…

【LeetCode每日一题】——142.环形链表 II

文章目录一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【解题思路】七【题目提示】八【题目进阶】九【时间频度】十【代码实现】十一【提交结果】一【题目类别】 链表 二【题目难度】 中等 三【题目编号】 142.环形链表 II 四【题目描述】 …

Java百度地图全套教程(地图找房、轻骑小程序、金运物流等)

课程简介&#xff1a; 本课程基于百度地图技术&#xff0c;将企业项目中地图相关常见应用场景的落地实战&#xff0c;包括有地图找房、轻骑小程序、金运物流等。同时还讲了基于Netty实现高性能的web服务&#xff0c;来处理高并发的问题。 学完本课程能够收获&#xff1a;百度…

支持本地挂载的网盘文件列表工具AList

什么是 Alist&#xff1f; AList 是一个支持多存储的文件列表程序&#xff0c;使用 Gin 框架和 Solidjs 库。可以将常见的 18 种网盘整合在一起&#xff0c;并支持 WebDAV 客户端访问。 之前老苏写过一篇 Alist&#xff0c; 但此 Alist 非彼 Alist&#xff0c;之前的 A 应该是 …

React基础知识(React基本使用、JSX语法、React模块化与组件化)(一)

系列文章目录 文章目录系列文章目录一、React简介1.1 React的特点1.2 React高效的原因二、React基本使用2.1 引入react相关js库2.2 创建虚拟DOM的两种方法2.3 虚拟DOM和真实DOM三、JSX基本使用3.1 jsx使用四、React模块化与组件化4.1 函数式组件4.2 类式组件一、React简介 1.英…

Kafka消费者组消费进度监控

针对Kafka消费者&#xff0c;重要的就是监控消费进度或者是消费的滞后程度&#xff0c;有个专业名词 消费者 Lag 或 Consumer Lag。 滞后程度&#xff08;Consumer Lag&#xff09;是指消费者当前落后生产者的程度。比如Kafka 生产者向某主题成功生产了 100 万条消息&#xff0…