一、函数的使用方式
-
将函数视为“一等公民”
- 函数可以赋值给变量
- 函数可以作为函数的参数
- 函数可以作为函数的返回值
-
高阶函数的用法(
filter
、map
以及它们的替代品)items1 = list(map(lambda x: x ** 2, filter(lambda x: x % 2, range(1, 10)))) # filter()是一个过滤函数,字如其名 - 在一定范围内过滤出符合lambda的元素,并返回其迭代器形式 items2 = [x ** 2 for x in range(1, 10) if x % 2]
-
位置参数、可变参数、关键字参数、命名关键字参数
-
参数的元信息(代码可读性问题)
-
匿名函数和内联函数的用法(
lambda
函数) -
闭包和作用域问题
-
Python搜索变量的LEGB顺序(Local >>> Embedded >>> Global >>> Built-in)
-
global
和nonlocal
关键字的作用global
:声明或定义全局变量(要么直接使用现有的全局作用域的变量,要么定义一个变量放到全局作用域)。nonlocal
:声明使用嵌套作用域的变量(嵌套作用域必须存在该变量,否则报错)。
-
-
装饰器函数(使用装饰器和取消装饰器)
既然提到了装饰器函数,我们就从头了解一下什么是装饰器函数?
首先,装饰器本质上是一个接收函数作为参数并返回一个新函数的函数。 举一个简单例子:
def my_decorator(func):
def wrapper():
print("Something is happening before the function is called.")
func()
print("Something is happening after the function is called.")
return wrapper
@my_decorator
def say_hello():
print("Hello!")
say_hello()
执行say_hello()的结果将会是:
Something is happening before the function is called.
Hello!
Something is happening after the function is called.
所以,装饰器函数的参数为应为函数,装饰器函数中需要定义wrapper()函数。 而wrapper()函数被称为“包装器”或“包裹函数”,主要作用是扩展或修改被装饰函数的行为(wrapper函数可以传入参数)。functools.wraps装饰器可以来保留原始函数的名称和文档字符串。
例子:输出函数执行时间的装饰器。
def record_time(func):
"""自定义装饰函数的装饰器"""
@wrap(func)
def wrapper(*args, **kwargs):
start = time()
result = func(*args, **kwargs)
print(f'{func.__name__}: {time() - start}秒')
return result
return wrapper
如果装饰器不希望跟print
函数耦合,可以编写可以参数化的装饰器(带有参数的装饰器就是使用装饰器装饰函数的时候可以传入指定参数)。
from functools import wraps
from time import time
def record(output):
"""可以参数化的装饰器"""
def decorate(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time()
result = func(*args, **kwargs)
output(func.__name__, time() - start)
return result
return wrapper
return decorate
from functools import wraps
from time import time
class Record():
"""通过定义类的方式定义装饰器"""
def __init__(self, output):
self.output = output
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time()
result = func(*args, **kwargs)
self.output(func.__name__, time() - start)
return result
return wrapper
说明:由于对带装饰功能的函数添加了@wraps装饰器,可以通过func.__wrapped__
方式获得被装饰之前的函数或类来取消装饰器的作用。
例子:用装饰器来实现单例模式。既然谈到了单例模式,我们就来粗略了解一下到底什么是单例模式:
这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建。这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要实例化该类的对象。
单例模式是一种创建型设计模式,它确保一个类只有一个实例,并提供了一个全局访问点来访问该实例。
说白了也就是单例类只能有一个实例、单例类必须自己创建自己的唯一实例、单例类必须给所有其他对象提供这一实例。
例子:用装饰器来实现单例模式。
from functools import wraps
def singleton(cls):
"""装饰类的装饰器"""
instances = {}
@wraps(cls)
def wrapper(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return wrapper
@wraps
class President:
"""总统(单例类)"""
pass
提示:上面的代码中用到了闭包closure(闭包就是直接调用函数里面的函数,这样变量声明可以在外函数内部,第一次向上找到变量,第二次变量已存在,不用继续找。 如何实现呢?内函数作为外函数的返回值,再将外函数赋值给一个变量,再使用这个变量调用,这样就是直接调用的内函数而没有经过外函数),不知道你是否已经意识到了。还没有一个小问题就是,上面的代码并没有实现线程安全的单例,如果要实现线程安全的单例应该怎么做呢?
线程安全的单例装饰器。
from functools import wraps
from threading import RLock
def singleton(cls):
"""线程安全的单例装饰器"""
instances = {}
locker = RLock()
@wraps(cls)
def wrapper(*args, **kwargs):
if cls not in instances:
with locker:
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return wrapper
我们讲讲上面提到的RLock,RLock其实底层维护了一个互斥锁和一个计数器,我们借助源码分析一下:当一个线程通过acquire()获取一个锁时,首先会判断拥有锁的线程和调用acquire()的线程是否是同一个线程,如果是同一个线程,那么计数器+1,函数直接返回(return 1),如果两个线程不一致时,那么会通过调用底层锁(_allocate_lock())进行阻塞自己(也可能是获得锁)。
[threading] class _RLock: def __init__(self): self._block = _allocate_lock() # _thread模块中定义一个锁对象的方法 self._owner = None # 用来标记哪个线程获取了锁 self._count = 0 # 计数器 def acquire(self, blocking=True, timeout=-1): me = get_ident() if self._owner == me: self._count += 1 return 1 rc = self._block.acquire(blocking, timeout) if rc: self._owner = me self._count = 1 return rc def release(self): if self._owner != get_ident(): raise RuntimeError("cannot release un-acquired lock") self._count = count = self._count - 1 if not count: self._owner = None self._block.release()
提示:上面的代码用到了with
上下文语法来进行锁操作,因为锁对象本身就是上下文管理器对象(支持__enter__
和__exit__
魔术方法)。在wrapper
函数中,我们先做了一次不带锁的检查,然后再做带锁的检查,这样做比直接加锁检查性能要更好,如果对象已经创建就没有必须再去加锁而是直接返回该对象就可以了。
上下文管理器的主要原理是你的代码会放到 with
语句块中执行。 当出现 with
语句的时候,对象的 __enter__()
方法被触发, 它返回的值(如果有的话)会被赋值给 as
声明的变量。然后,with
语句块里面的代码开始执行。 最后,__exit__()
方法被触发进行清理工作。
__enter__(self)定义上下文管理器在 with 语句创建的块的开头应该做什么。__enter__的返回值 绑定到 with 语句的目标,即 as 语句后面的名称。
__exit__(self, exc_type, exc_value, exc_tb)此方法在执行流离开 with 上下文时调用。如果发生异常,则exc_type、exc_value和 分别exc_tb保存异常类型、值和回溯信息。
二、面向对象相关知识
2.1、三大支柱:封装、继承、多态
例子:工资结算系统。
"""
月薪结算系统 - 部门经理每月15000 程序员每小时200 销售员1800底薪加销售额5%提成
"""
from abc import ABCMeta, abstractmethod
class Employee(metaclass=ABCMeta):
"""员工(抽象类)"""
def __init__(self, name):
self.name = name
@abstractmethod
def get_salary(self):
"""结算月薪(抽象方法)"""
pass
class Manager(Employee):
"""部门经理"""
def get_salary(self):
return 15000
class Programmer(Employee):
"""程序员"""
def __init__(self, name, work_hour=0):
super().__init__(name)
self.work_hout = work_hour
def get_salary(self):
return self.work_hout * 200.0
class Salesman(Employee):
"""销售员"""
def __init__(self, name, sales=0.0):
self.sales = sales
super().__init__(name)
def get_salary(self):
return 1800.0 + self.sales * 0.05
class EmployeeFactory:
"""创建员工的工厂(工厂模式 - 通过工厂实现对象使用者和对象之间的解耦合)"""
@staticmethod
def create(emp_type, *args, **kwargs):
"""创建员工"""
all_emp_types = {'M':Manager, 'S':Salesman, 'P':Programmer}
cls = all_emp_types[emp_type]
return cls(*args, **kwargs) if cls else None
def main():
"""主函数"""
emps = [
EmployeeFactory.create('M', '曹操'),
EmployeeFactory.create('P', '荀彧', 120),
EmployeeFactory.create('P', '郭嘉', 85),
EmployeeFactory.create('S', '典韦', 123000),
]
for emp in emps:
print(f'{emp.name}: {emp.get_salary():.2f}元')
我们对上述代码的一些细节做一点讲解:
①from abc import ABCMeta, abstractmethod:这里面的ABCMeta是一个抽象基类的元类,顾名思义,它是用来定义抽象基类的,元类是创建类的类,它允许我们自定义类的创建过程。ABCMeta元类提供了更高级的抽象基类定义方式,它允许我们在类定义中使用特殊的语法来声明抽象方法。既然谈到了,我们不得不回去再说一下'ABC',ABC是Python中的一个装饰器,它用于定义一个抽象基类。抽象基类是一个接口或协议,它定义了子类必须实现的方法。通过使用ABC,我们可以创建一个抽象基类,并在其中定义抽象方法。子类必须实现这些抽象方法才能被认为是合法的。
②@staticmethod:静态方法,与类和实例都没有所谓的绑定关系,它只不过是碰巧存在类中的一个函数而已。不论是通过类还是实例都可以引用该方法。不实例化类的情况下直接访问该方法。
③@classmethod:类方法,可以自动绑定传入的类的对象,使得我们在调用它时使用任何方法。
2.2、类与类之间的关系
- is-a关系:继承
- has-a关系:关联 / 聚合 / 合成
- use-a关系:依赖
例子:扑克游戏。
"""
经验:符号常量总是优于字面常量,枚举类型是定义符号常量的最佳选择
"""
from enum import Enum, unique
import random
@unique
class Suite(Enum):
"""花色"""
SPADE, HEART, CLUB, DIAMOND = range(4)
def __lt__(self, other):
return self.value < other.value
class Card:
"""牌"""
def __init__(self, suite, face):
"""初始化方法"""
self.suite = suite
self.face = face
def show(self):
"""显示牌面"""
suites = ['♠︎', '♥︎', '♣︎', '♦︎']
faces = ['', 'A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K']
return f'{suites[self.suite.value]}{faces[self.face]}'
# suite.value:调用枚举类元素的值
def __repr__(self):
return self.show()
class Poker:
"""扑克"""
def __init__(self):
self.index = 0
self.cards = [Card(suite, face)
for suite in Suite
for face in range(1, 14)]
def shuffle(self):
"""洗牌(随机乱序)"""
random.shuffle(self.cards)
self.index = 0
def deal(self):
"""发牌"""
card = self.cards[self.index]
self.index += 1
return card
@property
def has_more(self):
return self.index < len(self.cards)
class Player:
"""玩家"""
def __init__(self, name):
self.name = name
self.cards = []
def get_one(self, card):
"""摸一张牌"""
self.cards.append(card)
def sort(self, comp=lambda card: (card.suite, card.face)):
"""整理手上的牌"""
self.cards.sort(key=comp)
def main():
"""主函数"""
poker = Poker()
poker.shuffle()
players = [Player('东邪'), Player('西毒'), Player('南帝'), Player('北丐')]
while poker.has_more:
for player in players:
player.get_one(poker.deal())
for player in players:
player.sort()
print(player.name, end=': ')
print(player.cards)
if __name__ == '__main__':
main()
说明:上面的代码中使用了Emoji字符来表示扑克牌的四种花色,在某些不支持Emoji字符的系统上可能无法显示。
富比较方法 | 使用 | 释义 | 释义 |
---|---|---|---|
object.__lt__(self, other) | x.__lt__(y) | x<y | less than |
object.__le__(self, other) | x.__le__(y) | x<=y | less and equal |
object.__eq__(self, other) | x.__eq__(y) | x==y | equal |
object.__ne__(self, other) | x.__ne__(y) | x!=y | not equal |
object.__gt__(self, other) | x.__gt__(y) | x>y | greater than |
object.__ge__(self, other) | x.__ge__(y) | x>=y | greater and equal |
①Python中两个字符串进行比较时候会是按照两个字符串的 Unicode 码位级别进行比较,而不是按照它们的长度来进行比较。如果想要根据它们的长度来进行比较,则重写上述方法即可。
②类可以通过定义 __repr__()
方法来控制此函数为它的实例所返回的内容。同样是重写此方法。在打印一个类或者类的对象情况下(创建类对象三),就会打印上述重写方法的内容。
③在定义枚举类时,通过@unique装饰器,可以确保枚举成员的值是唯一的。如果有相同值的成员,则会抛出ValueEror异常。
-
对象的复制(深复制/深拷贝/深度克隆和浅复制/浅拷贝/影子克隆)
-
垃圾回收、循环引用和弱引用
Python使用了自动化内存管理,这种管理机制以引用计数(引用计数是Python内存管理的核心机制之一。每个对象在创建时都会有一个引用计数,表示有多少个引用指向该对象。当引用计数变为零时,说明没有任何引用指向该对象,垃圾回收器就会释放该对象的内存。)为基础,同时也引入了标记-清除和分代收集两种机制为辅的策略。
typedef struct _object { /* 引用计数 */ int ob_refcnt; /* 对象指针 */ struct _typeobject *ob_type; } PyObject; /* 增加引用计数的宏定义 */ #define Py_INCREF(op) ((op)->ob_refcnt++) /* 减少引用计数的宏定义 */ #define Py_DECREF(op) \ //减少计数 if (--(op)->ob_refcnt != 0) \ ; \ else \ __Py_Dealloc((PyObject *)(op))
导致引用计数+1的情况:
- 对象被创建,例如
a = 23
- 对象被引用,例如
b = a
- 对象被作为参数,传入到一个函数中,例如
f(a)
- 对象作为一个元素,存储在容器中,例如
list1 = [a, a]
导致引用计数-1的情况:
- 对象的别名被显式销毁,例如
del a
- 对象的别名被赋予新的对象,例如
a = 24
- 一个对象离开它的作用域,例如f函数执行完毕时,f函数中的局部变量(全局变量不会)
- 对象所在的容器被销毁,或从容器中删除对象
引用计数可能会导致循环引用问题,而循环引用会导致内存泄露,如下面的代码所示。为了解决这个问题,Python中引入了“标记-清除”和“分代收集”。在创建一个对象的时候,对象被放在第一代中,如果在第一代的垃圾检查中对象存活了下来,该对象就会被放到第二代中,同理在第二代的垃圾检查中对象存活下来,该对象就会被放到第三代中。
# 循环引用会导致内存泄露 - Python除了引用技术还引入了标记清理和分代回收
# 在Python 3.6以前如果重写__del__魔术方法会导致循环引用处理失效
# 如果不想造成循环引用可以使用弱引用
list1 = []
list2 = []
list1.append(list2)
list2.append(list1)
循环引用:顾名思义就是一个对象里引用了另一个对象,创建二者时会互相循环引用。在Python中,每个对象都有一个与之关联的引用计数器。当对象被创建时,其引用计数初始化为1。每当对象被引用时,引用计数增加;每当对象的引用被删除时,引用计数减少。当引用计数降到0时,对象被标记为可回收。循环引用发生在两个或多个对象相互引用,形成一个闭环,导致它们的引用计数永远不会归零。这种情况下,即使这些对象不再被使用,它们也无法被垃圾回收器回收。循环引用会导致内存泄漏,因为这些无法回收的对象会一直占用内存空间。随着程序的运行,内存泄漏可能会导致程序消耗越来越多的内存,最终影响程序的性能甚至导致崩溃。
魔术方法__del__():在编写程序时,如果之前创建的类实例化对象后续不再使用,最好在适当位置手动将其销毁,释放其占用的内存空间,整个过程称为垃圾回收。大多数情况下,Python 开发者不需要手动进行垃圾回收,因为 Python 有自动的垃圾回收机制,能自动将不需要使用的实例对象进行销毁。无论是手动销毁还是 Python 自动销毁,都会调用__del__()
方法。删除一个对象时 python 解释器也会默认调用__del__
方法。
弱引用:强引用是最常见的引用方式,当我们有一个变量引用一个对象时,只要该变量存在,对象就不会被垃圾回收,直到所有强引用消失。而弱引用则不同,它不会阻止对象被垃圾回收,只有当没有其他强引用时,弱引用的对象才会被释放。弱引用不同于强引用,它并不阻止对象被垃圾回收。简单来说,如果一个对象只有弱引用,那么即使所有强引用都消失了,这个对象也不会立即被销毁,直到没有任何其他引用存在时,才会被垃圾回收器清理。弱引用通常用于实现缓存、数据持久化等场景,避免因为循环引用导致的对象无法被回收的问题。
Python的弱引用由weakref
模块提供支持。弱引用并不直接改变对象的状态,而是提供了一种间接的方式来访问对象,即使这个对象已经被垃圾回收,弱引用仍然可以获取到它,但不能阻止其被回收。这是通过一种特殊的引用计数机制实现的,对于强引用,每次引用都会增加对象的引用计数,而对于弱引用,不会增加。Python
标准库中的weakref
模块提供了弱引用的功能。weakref
模块有两个主要类:WeakRef
和WeakKeyDictionary
。
weakref
模块允许开发者创建对对象的弱引用,这意味着即使存在弱引用,对象的引用计数也不会增加。当对象的引用计数降到0时,即使存在弱引用,对象也会被垃圾回收。
-
以下情况会导致垃圾回收:
- 调用
gc.collect()
gc
模块的计数器达到阀值- 程序退出
如果循环引用中两个对象都定义了
__del__
方法,gc
模块不会销毁这些不可达对象,因为gc模块不知道应该先调用哪个对象的__del__
方法,这个问题在Python 3.6中得到了解决。也可以通过
weakref
模块构造弱引用的方式来解决循环引用的问题。 - 调用
2.3、混入(Mixin)
例子:自定义字典限制只有在指定的key不存在时才能在字典中设置键值对。
class SetOnceMappingMixin:
"""自定义混入类"""
__slots__ = ()
def __setitem__(self, key, value):
if key in self:
raise KeyError(str(key) + ' already set')
return super().__setitem__(key, value)
class SetOnceDict(SetOnceMappingMixin, dict):
"""自定义字典"""
pass
my_dict= SetOnceDict()
try:
my_dict['username'] = 'jackfrued'
my_dict['username'] = 'hellokitty'
except KeyError:
pass
print(my_dict)
①def __setitem__():在python中__setitem__(self,value,key)方法时python魔法方法的一种,这个方法会让类按照一定的方法存储和key映射的value。该值可以使用另一种魔法方法__getitem__(self,key)来获取。当期望定义的类具备按照键存储值时,即类能够执行data['key']=value。
②super().__setitem__(key, value):继承类链中,首先继承了SetOnceMappingMixin类的__setitem__方法,SetOnceMappingMixin类的__setitem__方法中,'return super().setitem(key, value)',此时在继承类链中往上找,找到了dict的__setitem__方法。进而实现对dict类中方法的调用。
③except KeyError:pass:遇到错误执行except代码块,pass忽略错误并继续往下运行。
2.4、元编程和元类
对象是通过类创建的,类是通过元类创建的,元类提供了创建类的元信息。所有的类都直接或间接的继承自object
,所有的元类都直接或间接的继承自type
。
例子:用元类实现单例模式。
import threading
class SingletonMeta(type):
"""自定义元类"""
def __init__(cls, *args, **kwargs):
cls.__instance = None
cls.__lock = threading.RLock()
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if cls.__instance is None:
with cls.__lock:
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
class President(metaclass=SingletonMeta):
"""总统(单例类)"""
pass
①cls.__instance = super().__call__(*args, **kwargs):__call__()
方法的作用其实是把一个类的实例化对象变成了可调用对象,也就是说把一个类的实例化对象变成了可调用对象,只要类里实现了__call__()
方法就行。在类中实现这一方法可以使该类的实例(对象)像函数一样被调用。默认情况下该方法在类中是没有被实现的。如果类定义了__call__
方法,那么它的实例可以变为可调用对象。
②class President(metaclass=SingletonMeta):metaclass的英文直译过来就是元类,这既是一个概念也可以认为是Python当中的一个关键字,不管怎么理解,对它的内核含义并没有什么影响。我们可以不必纠结,就认为它是类的类的意思即可。在这个用法当中,支持我们自己定义一个类,使得它是后面某一个类的元类。
-
面向对象设计原则
- 单一职责原则 (SRP)- 一个类只做该做的事情(类的设计要高内聚)
- 开闭原则 (OCP)- 软件实体应该对扩展开发对修改关闭
- 依赖倒转原则(DIP)- 面向抽象编程(在弱类型语言中已经被弱化)
- 里氏替换原则(LSP) - 任何时候可以用子类对象替换掉父类对象
- 接口隔离原则(ISP)- 接口要小而专不要大而全(Python中没有接口的概念)
- 合成聚合复用原则(CARP) - 优先使用强关联关系而不是继承关系复用代码
- 最少知识原则(迪米特法则,LoD)- 不要给没有必然联系的对象发消息
说明:上面加粗的字母放在一起称为面向对象的SOLID原则。
-
GoF设计模式
- 创建型模式:单例、工厂、建造者、原型
- 结构型模式:适配器、门面(外观)、代理
- 行为型模式:迭代器、观察者、状态、策略
例子:可插拔的哈希算法(策略模式)。策略模式(
Strategy Pattern
):定义一系列算法类,将每一个算法封装起来,并让它们可以相互替换,策略模式让算法独立于使用它的客户而变化,也称为政策模式(Policy
)。策略模式是一种对象行为型模式。在策略模式结构图中包含如下几个角色:
Context
(环境类):环境类是使用算法的角色,它在解决某个问题(即实现某个方法)时可以采用多种策略。在环境类中维持一个对抽象策略类的引用实例,用于定义所采用的策略。Strategy
(抽象策略类):它为所支持的算法声明了抽象方法,是所有策略类的父类,它可以是抽象类或具体类,也可以是接口。环境类通过抽象策略类中声明的方法在运行时调用具体策略类中实现的算法。ConcreteStrategy
(具体策略类):它实现了在抽象策略类中声明的算法,在运行时,具体策略类将覆盖在环境类中定义的抽象策略类对象,使用一种具体的算法实现某个业务处理。class StreamHasher: """哈希摘要生成器""" def __init__(self, alg='md5', size=4096): self.size = size alg = alg.lower() self.hasher = getattr(__import__('hashlib'), alg.lower())() def __call__(self, stream): return self.to_digest(stream) def to_digest(self, stream): """生成十六进制形式的摘要""" for buf in iter(lambda: stream.read(self.size), b''): self.hasher.update(buf) return self.hasher.hexdigest() def main(): """主函数""" hasher1 = StreamHasher() with open('Python-3.7.6.tgz', 'rb') as stream: print(hasher1.to_digest(stream)) hasher2 = StreamHasher('sha1') with open('Python-3.7.6.tgz', 'rb') as stream: print(hasher2(stream)) if __name__ == '__main__': main()
①alg='md5':这个神秘的'md5'是什么意思呢?在python3的标准库中,已经移除了md5,而关于hash加密算法都放在hashlib这个标准库中,如SHA1、SHA224、SHA256、SHA384、SHA512和MD5算法等。MD5即Message-Digest Algorithm 5(信息-摘要算法5),用于确保信息传输完整一致。是计算机广泛使用的杂凑算法之一(又译摘要算法、哈希算法)。MD5的作用是让大容量信息在用数字签名软件签署私人密钥前被"压缩"成一种保密的格式(就是把一个任意长度的字节串变换成一定长的16进制数字串)。
②alg.lower():将字符串中的所有大写字母转换为小写字母。alg.lower()
将这个字符串转换为小写。这是因为在Python的hashlib
模块中,所有的哈希算法名称都是小写字母。
③getattr(__import__('hashlib'), alg.lower())():importlib 是一个模块,使用时需要首先导入该模块。__import__
是Python内置的一个函数,用于动态导入模块。通常,我们会使用import hashlib
来导入模块,但__import__
允许我们在运行时动态地指定要导入的模块名。getattr 是内置函数,用于返回一个对象的属性值。getattr
函数用于获取对象的属性。这里,它首先通过__import__('hashlib')
获取hashlib
模块对象,然后通过alg.lower()
获取算法名称的小写字符串,最终从hashlib
模块中获取对应的算法函数或类。例如,如果alg
是'SHA256'
,那么alg.lower()
就是'sha256'
,getattr
就会从hashlib
模块中获取sha256
这个函数。通过在getattr
调用的结果后面加上一对圆括号()
,我们立即调用了获取到的函数或可调用对象。这意味着如果alg
是'SHA256'
,这行代码就会动态地从hashlib
模块中获取sha256
函数,并立即调用它。
④iter(lambda: stream.read(self.size), b''):iter(function, sentinel)
是Python中的一个迭代函数,它接收两个参数:一个函数function
和一个哨兵值sentinel
。在这个例子中,function
是一个匿名函数(lambda函数),lambda: stream.read(self.size)
。这个函数的作用是调用stream.read(self.size)
,从数据流stream
中读取self.size
字节的数据。sentinel
是b''
,一个空字节串。当function
的返回值等于sentinel
时,迭代停止。这个迭代会不断地从stream
中读取self.size
字节的数据,直到读取到数据的末尾(stream.read()
在到达数据流末尾时会返回空字节串b''
)。
⑤self.hasher.update(buf):在每次迭代中,从数据流中读取的数据块(buf
)会被传递给self.hasher.update(buf)。
self.hasher
是一个哈希对象,可能是如hashlib.sha256()
或hashlib.md5()
等创建的。update
方法用于将新数据添加到当前哈希计算中。
⑥self.hasher.hexdigest():当数据流中的所有数据都被读取并用于更新哈希对象后,迭代结束。return self.hasher.hexdigest()
返回当前哈希对象的十六进制摘要字符串。这个字符串是数据的唯一标识符(或至少是非常可能唯一的),基于所使用的哈希算法。
三、迭代器和生成器
3.1、迭代器是实现了迭代器协议的对象。
- Python中没有像
protocol
或interface
这样的定义协议的关键字。协议是一种抽象类型,定义了一组方法,但不提供这些方法的实现;而接口是一种具体的类型,它定义了一组方法并提供了这些方法的实现。 - Python中用魔术方法表示协议。
__iter__
和__next__
魔术方法就是迭代器协议。__next__
方法用于获取迭代器中的下一个元素,并在没有更多元素可供迭代时引发StopIteration
异常。class Fib(object): """迭代器""" def __init__(self, num): self.num = num self.a, self.b = 0, 1 self.idx = 0 def __iter__(self): return self def __next__(self): if self.idx < self.num: self.a, self.b = self.b, self.a + self.b self.idx += 1 return self.a raise StopIteration()
3.2、生成器是语法简化版的迭代器。
def fib(num):
"""生成器"""
a, b = 0, 1
for _ in range(num):
a, b = b, a + b
yield a
3.3、生成器进化为协程。
生成器对象可以使用send()
方法发送数据,发送的数据会成为生成器函数中通过yield
表达式获得的值。这样,生成器就可以作为协程使用,协程简单的说就是可以相互协作的子程序。
当生成器函数需要接收一个可变的动态的参数来改变函数输出结果时,我们可以考虑使用send方法。
- send方法和next方法非常像,他比next方法多了一个传入参数的功能。
- send方法的参数就是给生成器注入的数据。
- send方法的返回值是生成器输出的下一个值。
- 调用Generator函数返回一个遍历器对象,代表Generator函数的内部指针。以后每次调用遍历器对象的next()方法,就会返回一个有着value和done属性的对象,value表示yield后的表达式的值,done属性是一个布尔值,表示函数是否遍历结束。
-
在使用send方法获取生成器值的时候,传入的参数会成为生成器上一条yield表达式的值,生成器拿到这个值后会运行到下一条yield表达式的地方。因此刚开始运行生成器的时候,他是从头执行的,此时生成器内部还没有
yield
表达式等待接收值(在生成器启动之前,还没有 yield 语句,无法接收真实的值)。也就是说生成器还没有走到第一个 yield 语句,如果我们发生一个真实的值,这时是没有人去“接收”它的。所以首次调用send方法时,只能传入None,要是传入其他值,就会抛出异常。
def calc_avg():
"""流式计算平均值"""
total, counter = 0, 0
avg_value = None
while True:
value = yield avg_value
total, counter = total + value, counter + 1
avg_value = total / counter
gen = calc_avg()
next(gen) # 使用next函数来激活生成器,等同于 gen.send(None)
print(gen.send(10)) # 在每次迭代时,我们使用yield语句暂停生成器的执行,等待接收一个值。
# 当我们调用生成器的send方法时,该值将作为yield语句的返回值,并且生成器将从暂停的地方恢复执行。
print(gen.send(20))
print(gen.send(30))
# ①yield会将程序暂停在此处等待send()/next()出现才能往下运行,且每次执行都是在遇到下一个yield时停止
# ②执行send(10)时,先将10赋值给左侧value,之后将这个暂停的yield激活,运行代码直至重回此yield
# ③重回此yield后,就会触发yield的功能,将得到的avg_value的值通过send()的调用返回,最后再将程序暂停在此处
# send会引发一次generator的next,让generator继续执行,传过来的value会赋值给yield关键字左边的变量,而yield关键字右边的值就会返还给send。
四、并发编程
Python中实现并发编程的三种方案:多线程、多进程和异步I/O。并发编程的好处在于可以提升程序的执行效率以及改善用户体验;坏处在于并发的程序不容易开发和调试,同时对其他程序来说它并不友好。
-
多线程:Python中提供了
Thread
类并辅以Lock
、Condition
、Event
、Semaphore
和Barrier
。Python中有GIL来防止多个线程同时执行本地字节码,这个锁对于CPython是必须的,因为CPython的内存管理并不是线程安全的,因为GIL的存在多线程并不能发挥CPU的多核特性。
既然谈到了GIL,我们就来稍微了解一下什么是GIL?GIL是Python解释器中的一种机制,它是一把全局锁,用于保护解释器免受多线程并发访问的影响。这意味着Python在同一时刻只允许一个线程执行Python字节码。GIL实际上是一个互斥锁,在Python解释器层面上实现。由于GIL的存在,同一时刻只有一个线程能够获得解释器的控制权,其他线程被阻塞,无法执行Python字节码。这意味着在多核CPU上,Python的多线程程序可能无法充分利用多核性能。
"""
面试题:进程和线程的区别和联系?
进程 - 操作系统分配内存的基本单位 - 一个进程可以包含一个或多个线程
线程 - 操作系统分配CPU的基本单位
并发编程(concurrent programming)
1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行
2. 改善用户体验 - 让耗时间的操作不会造成程序的假死
"""
import glob
import os
import threading
from PIL import Image
PREFIX = 'thumbnails'
def generate_thumbnail(infile, size, format='PNG'):
"""生成指定图片文件的缩略图"""
file, ext = os.path.splitext(infile)
file = file[file.rfind('/') + 1:] # rfind() 返回字符串最后一次出现的位置,如果没有匹配项则返回 -1。在这里是找到文件名
outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}'
img = Image.open(infile)
img.thumbnail(size, Image.ANTIALIAS) # 制作当前图片的缩略图。
img.save(outfile, format)
def main():
"""主函数"""
if not os.path.exists(PREFIX):
os.mkdir(PREFIX)
for infile in glob.glob('images/*.png'):
for size in (32, 64, 128):
# 创建并启动线程
threading.Thread(
target=generate_thumbnail,
args=(infile, (size, size))
).start()
if __name__ == '__main__':
main()
①import glob:glob是python自己带的一个文件操作相关模块,用它可以查找符合自己目的的文件,类似于Windows下的文件搜索,支持通配符操作,,?,[]这三个通配符,代表0个或多个字符,?代表一个字符,[]匹配指定范围内的字符,如[0-9]匹配数字。
②from PIL import Image:PIL 是 Python Imaging Library 的缩写,它是 Python 中用于图像处理的一个强大的库。而 Image 模块则是 PIL 库中的一个子模块,提供了处理图像的各种功能。
③os.path.splitext(path):分割路径,返回路径名和文件扩展名的元组。在这里,ext代表扩展,并具有指定路径的扩展部分,而root是除ext部分以外的所有内容。如果指定的路径没有任何扩展名,则Ext为空。如果指定的路径有前导句号(' . '),它将被忽略。
path:表示文件系统路径的类路径对象。类路径对象是表示路径的str或bytes对象。
返回类型:该方法返回一个元组,表示指定路径名的根和ext部分。
例如,考虑以下路径名称:
path name root ext
/home/User/Desktop/file.txt /home/User/Desktop/file .txt
/home/User/Desktop /home/User/Desktop {empty}
file.py file .py
.txt .txt {empty}
④(size, size):size: 缩略图尺寸,是一个二元元组,形如(width, height),注意这里的width和height都不应超过源图片尺寸。其中一个值可以是None,这时会根据不为None的那个值等比缩放图片。
多个线程竞争资源的情况。
"""
多线程程序如果没有竞争资源处理起来通常也比较简单
当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱
说明:临界资源就是被多个线程竞争的资源
"""
import time
import threading
from concurrent.futures import ThreadPoolExecutor
class Account(object):
"""银行账户"""
def __init__(self):
self.balance = 0.0
self.lock = threading.Lock()
def deposit(self, money):
# 通过锁保护临界资源
with self.lock:
new_balance = self.balance + money
time.sleep(0.001)
self.balance = new_balance
def main():
"""主函数"""
account = Account()
# 创建线程池
pool = ThreadPoolExecutor(max_workers=10)
futures = []
for _ in range(100):
future = pool.submit(account.deposit, 1)
futures.append(future)
# 关闭线程池
pool.shutdown()
for future in futures:
future.result()
print(account.balance)
if __name__ == '__main__':
main()
①from concurrent.futures import ThreadPoolExecutor:使用 ThreadPoolExecutor 来实例化线程池对象。传入max_workers参数来设置线程池中最多能同时运行的线程数目。
②使用 submit 函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。通过 submit 函数返回的任务句柄,能够使用 done() 方法判断任务是否结束。
③使用 cancel() 方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。
④使用 result() 方法可以获取任务的返回值,这个方法是阻塞的。
修改上面的程序,启动5个线程向账户中存钱,5个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading
模块的Condition
来实现线程调度,该对象也是基于锁来创建的,代码如下所示:
"""
多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
"""
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep
import threading
class Account:
"""银行账户"""
def __init__(self, balance=0):
self.balance = balance
lock = threading.RLock()
self.condition = threading.Condition(lock)
def withdraw(self, money):
"""取钱"""
with self.condition:
while money > self.balance:
self.condition.wait()
new_balance = self.balance - money
sleep(0.001)
self.balance = new_balance
def deposit(self, money):
"""存钱"""
with self.condition:
new_balance = self.balance + money
sleep(0.001)
self.balance = new_balance
self.condition.notify_all()
def add_money(account):
while True:
money = randint(5, 10)
account.deposit(money)
print(threading.current_thread().name,
':', money, '====>', account.balance)
sleep(0.5)
def sub_money(account):
while True:
money = randint(10, 30)
account.withdraw(money)
print(threading.current_thread().name,
':', money, '<====', account.balance)
sleep(1)
def main():
account = Account()
with ThreadPoolExecutor(max_workers=15) as pool:
for _ in range(5):
pool.submit(add_money, account)
for _ in range(10):
pool.submit(sub_money, account)
if __name__ == '__main__':
main()
①wait(timeout=None)
- 等待通知或超时。如果线程没有获取到锁就调用了此方法,那么将引发 RuntimeError 异常
- 本方法会释放隐性锁,然后阻塞直到被其他线程的调用此条件变量的 notify() 或 notify_all() 唤醒,或超时。一旦被唤醒或超时,该线程将立即重新获取锁并返回
- timeout 参数是以秒为单位的浮点数
- 如果隐性锁是一个 RLock 对象,因为调用它的 release() 方法未必能够释放该锁,所以本方法会使用 RLock 对象的一个内部接口,该接口可以立即释放多重迭代的 RLock 锁。并且在需要重新获取锁的时候,也会使用一个类似的内部接口来恢复多重的迭代级别
- 本方法所阻塞的线程如果是被唤醒的,那么本方法会返回一个 True,如果是超时了,则返回 False
②notify_all()
- 唤醒正在等待本条件变量的所有线程。
③notify(n=1)
- 本方法默认用于唤醒处于等待本条件变量的线程。如果调用本方法的线程并没有获得锁,将引发 RuntimeError 异常
- 本方法至多可唤醒所有正在等待本条件变量的线程中的 n 个。如果调用时没有线程处于等待操作,那么本方法的调用是一个空操作
- 现在版本对本方法的实现为:在有足够多处于等待状态的线程的条件下,本方法将正好唤醒其中的 n 个,而不是像上一条中讲的“至多 n 个”。不过这种行为并不可靠。在将来,本方法很可能偶尔唤醒超过 n 条线程
多进程:多进程可以有效的解决GIL的问题,实现多进程主要的类是Process
,其他辅助的类跟threading
模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing
模块中有一个Queue
类,它基于管道和锁机制提供了多个进程共享的队列。下面是官方文档上关于多进程和进程池的一个示例。
"""
多进程和进程池的使用
多线程因为GIL的存在不能够发挥CPU的多核特性
对于计算密集型任务应该考虑使用多进程
time python3 example22.py
real 0m11.512s
user 0m39.319s
sys 0m0.169s
使用多进程后实际执行时间为11.512秒,而用户时间39.319秒约为实际执行时间的4倍
这就证明我们的程序通过多进程使用了CPU的多核特性,而且这台计算机配置了4核的CPU
"""
import concurrent.futures
import math
PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5
def is_prime(n):
"""判断素数"""
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
"""主函数"""
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
①math.floor:将一个给定的数值向下取整,返回不大于这个数值的最大整数。
②zip(PRIMES, executor.map(is_prime, PRIMES)):
zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表。如果各个迭代器的元素个数不一致,则返回列表长度与最短的对象相同,利用 * 号操作符,可以将元组解压为列表。(zip 方法在 Python 2 和 Python 3 中的不同:在 Python 3.x 中为了减少内存,zip() 返回的是一个对象。如需展示列表,需手动 list() 转换。)
map() 会根据提供的函数对指定序列做映射。第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。(Python 2.x 返回列表。Python 3.x 返回迭代器。)
③ProcessPoolExecutor 是 Python concurrent.futures 模块中的一个类,它用于创建一个进程池,以并行方式执行多个任务。这个类是 Executor 抽象类的具体实现,它利用多进程来提高计算密集型任务的执行效率,绕过了全局解释器锁(GIL)的限制。
重点:多线程和多进程的比较。
以下情况需要使用多线程:
- 程序需要维护许多共享的状态(尤其是可变状态),Python中的列表、字典、集合都是线程安全的,所以使用线程而不是进程维护共享状态的代价相对较小。
- 程序会花费大量时间在I/O操作上,没有太多并行计算的需求且不需占用太多的内存。
以下情况需要使用多进程:
- 程序执行计算密集型任务(如:字节码操作、数据处理、科学计算)。
- 程序的输入可以并行的分成块,并且可以将运算结果合并。
- 程序在内存使用方面没有任何限制且不强依赖于I/O操作(如:读写文件、套接字等)。
异步处理:从调度程序的任务队列中挑选任务,该调度程序以交叉的形式执行这些任务,我们并不能保证任务将以某种顺序去执行,因为执行顺序取决于队列中的一项任务是否愿意将CPU处理时间让位给另一项任务。异步任务通常通过多任务协作处理的方式来实现,由于执行时间和顺序的不确定,因此需要通过回调式编程或者future
对象来获取任务执行的结果。Python 3通过asyncio
模块和await
和async
关键字(在Python 3.7中正式被列为关键字)来支持异步处理。
"""
异步I/O - async / await
"""
import asyncio
def num_generator(m, n):
"""指定范围的数字生成器"""
yield from range(m, n + 1)
async def prime_filter(m, n):
"""素数过滤器"""
primes = []
for i in num_generator(m, n):
flag = True
for j in range(2, int(i ** 0.5 + 1)):
if i % j == 0:
flag = False
break
if flag:
print('Prime =>', i)
primes.append(i)
await asyncio.sleep(0.001)
return tuple(primes)
async def square_mapper(m, n):
"""平方映射器"""
squares = []
for i in num_generator(m, n):
print('Square =>', i * i)
squares.append(i * i)
await asyncio.sleep(0.001)
return squares
def main():
"""主函数"""
loop = asyncio.get_event_loop()
future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
future.add_done_callback(lambda x: print(x.result()))
loop.run_until_complete(future)
loop.close()
if __name__ == '__main__':
main()
①loop=asyncio.get_event_loop() 获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;loop=asyncio.get_running_loop() 返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的;loop=asyncio.set_event_loop(loop) 设置一个事件循环为当前线程的事件循环;loop=asyncio.new_event_loop() 创建一个新的事件循环。
②asyncio.gather 是Python中的一个高级异步API,它允许你并发运行多个异步操作,并在它们全部完成后返回结果。这个函数是处理并发任务时非常有用的工具,因为它可以帮助你有效地管理多个协程。当你需要同时执行多个协程并收集它们的结果时,可以使用 asyncio.gather。这个函数接受一个可等待对象的序列,并返回一个Future对象,该对象将在所有给定的协程完成时解析为一个包含所有结果的元组。
③在Python中,add done_callback()是一个方法,用于向concurrent.futures.future对象中添加一个回调函数。当Future对象的计算完成时,回调函数会被自动调用。这个方法通常用于异步编程中,当某个异步操作完成后,需要执行一些特定的操作或逻辑。具体来说,当我们使用concurrent.futures模块中的ThreadpoolExecutor或processPoolExecutor创建线程池或进程池时,我们可以将我们的任务提交给线程池或进程池来执行,这些任务会返回一个Future对象。我们可以使用add done callback()方法向这些Future对象中添加回调函数,以便在任务完成时执行一些操作,例如获取任务结果、更新状态等等。
⑤协程可以:
- 等待一个 future 结束
- 等待另一个协程(产生一个结果,或引发一个异常)
- 产生一个结果给正在等它的协程
- 引发一个异常给正在等它的协程
- asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程。
说明:上面的代码使用
get_event_loop
函数获得系统默认的事件循环,通过gather
函数可以获得一个future
对象,future
对象的add_done_callback
可以添加执行完成时的回调函数,loop
对象的run_until_complete
方法可以等待通过future
对象获得协程执行结果。
Python中有一个名为aiohttp
的三方库,它提供了异步的HTTP客户端和服务器,这个三方库可以跟asyncio
模块一起工作,并提供了对Future
对象的支持。Python 3.6中引入了async
和await
来定义异步执行的函数以及创建异步上下文,在Python 3.7中它们正式成为了关键字。下面的代码异步的从5个URL中获取页面并通过正则表达式的命名捕获组提取了网站的标题。
import asyncio
import re
import aiohttp
PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')
async def fetch_page(session, url):
# 基本请求用法
async with session.get(url, ssl=False) as resp:
return await resp.text() # 可以在括号中指定解码方式,编码方式;resp.read()适合读取图像
async def show_title(url):
async with aiohttp.ClientSession() as session:
html = await fetch_page(session, url)
print(PATTERN.search(html).group('title'))
def main():
urls = ('https://www.python.org/',
'https://git-scm.com/',
'https://www.jd.com/',
'https://www.taobao.com/',
'https://www.douban.com/')
loop = asyncio.get_event_loop()
cos = [show_title(url) for url in urls]
loop.run_until_complete(asyncio.wait(cos))
loop.close()
if __name__ == '__main__':
main()
①group()同group(0)就是匹配正则表达式整体结果,group(1)列出第一个括号匹配部分,group(2)列出第二个括号匹配部分,group(3)列出第三个括号匹配部分。没有匹配成功的,re.search()返回None.
②aiohttp.ClientSession 是用于执行HTTP请求的推荐接口。它封装了一个连接池(connector实例),默认支持keepalive连接。除非你的应用程序在其生命周期内需要连接到大量不同的服务器,否则建议使用单个会话以利用连接池。
-
不要为每个请求创建新的 ClientSession 实例。通常,一个应用程序只需要一个会话来执行所有请求。
-
更复杂的情况可能需要为每个站点创建一个会话,例如,一个会话用于GitHub,另一个会话用于Facebook API。但是,为每个请求创建会话是一个非常糟糕的主意。
-
会话内部包含一个连接池。连接重用和保持活动状态(默认情况下均处于启用状态)可能会提高整体性能。
-
如果你需要在多个请求之间重用 ClientSession,可以在 async with 代码块中完成所有请求,或者手动创建并管理 ClientSession 实例。如果选择手动管理,记得在完成请求后关闭会话。
③requests中的session对象能够让我们跨http请求保持某些参数,即让同一个session对象发送的请求头携带某个指定的参数。
session.get(url)
是一个方法调用,用于向指定的url
发送一个HTTP GET请求。这里的session
是一个requests.Session
对象,它允许你跨请求保持某些参数,比如cookies、HTTP头信息等,从而模拟一个用户的会话(session)。
当你调用session.get(url)
时,requests
库会执行以下操作:
- 构建一个HTTP GET请求,目标地址为
url
。 - 如果
session
对象中有预先设置的cookies、HTTP头信息、认证信息等,这些都会被添加到请求中。 - 发送这个GET请求到服务器。
- 接收服务器的响应,并将其封装成一个
Response
对象。 - 返回这个
Response
对象给你,你可以通过它访问响应的内容、状态码、HTTP头等。
Response
对象提供了多种方法和属性来访问响应的不同部分,比如:
response.text
:以字符串形式返回响应体的内容。response.content
:以字节形式返回响应体的内容。response.status_code
:返回HTTP状态码,比如200表示成功,404表示未找到。response.headers
:返回一个字典,包含响应的HTTP头信息。
使用session
对象的好处之一是它可以自动处理cookies,这对于需要登录或保持会话状态的网站特别有用。你可以在一个session
对象上连续发送多个请求,而不需要每次都手动处理cookies。
重点:异步I/O与多进程的比较。
当程序不需要真正的并发性或并行性,而是更多的依赖于异步处理和回调时,
asyncio
就是一种很好的选择。如果程序中有大量的等待与休眠时,也应该考虑asyncio
,它很适合编写没有实时数据处理需求的Web应用服务器。
Python还有很多用于处理并行任务的三方库,例如:joblib
、PyMP
等。实际开发中,要提升系统的可扩展性和并发性通常有垂直扩展(增加单个节点的处理能力)和水平扩展(将单个节点变成多个节点)两种做法。可以通过消息队列来实现应用程序的解耦合,消息队列相当于是多线程同步队列的扩展版本,不同机器上的应用程序相当于就是线程,而共享的分布式消息队列就是原来程序中的Queue。消息队列(面向消息的中间件)的最流行和最标准化的实现是AMQP(高级消息队列协议),AMQP源于金融行业,提供了排队、路由、可靠传输、安全等功能,最著名的实现包括:Apache的ActiveMQ、RabbitMQ等。
要实现任务的异步化,可以使用名为Celery
的三方库。Celery
是Python编写的分布式任务队列,它使用分布式消息进行工作,可以基于RabbitMQ或Redis来作为后端的消息代理。