引言
当我们访问大模型相关的API服务时,通常会遇到速率限制(即限流),它用于防止用户向某个API发送大量请求,防止请求过载,确保每个人都能公平地访问API。
速率限制的方式
速率限制通常有以下几种形式:
- RPM(requests per minute) 每分钟请求数
- PRD(requests per day) 每天请求数
- TPM(tokens per minute) 每分钟token数
- TPD(tokens per day) 每天token数
- IPM(images per minute) 每分钟图像数:比如针对图像生成模型
- IPD(images per day) 每天图像数
速率限制可能会因为其中任一选项中达到峰值而触发。比如RPM限制为20,TPM限制为100k,假设一分钟内发送了20个请求,每个请求只有100(0.1k)个token,那么RPM的限制会触发,即使这20个请求内没有发满100k个token。
如何处理速率限制
当大量调用OpenAI API时,可能会遇到429: Too Many Reuqests
或RateLimitError
的错误消息,表示超过速率限制。
⚠️ 持续重试不能解决该问题。这里说的处理,是指尽可能不要超过这个速率限制,如果想从根源上解决一种方法是自己部署。
以硅基流动提供的免费嵌入模型为例,它的RPM是2000,即一分钟内只能发送2000个请求,其实不算低;但是TPM只有500k,假设文本块平均为500个token,实际上每分钟只能发送1000个文本块。
错误示范
from langchain_core.embeddings import Embeddings
from itertools import islice
batch_size = 32 # 硅基流动嵌入模型最大批大小为32
def batch_list(it, size=batch_size):
it = iter(it)
return iter(lambda: list(islice(it, size)), [])
class SiliconflowEmbeddings(Embeddings):
def __init__(self, model_name: str):
self.model_name = model_name
self.api_url = "https://api.siliconflow.cn/v1/embeddings"
def embed_documents(self, texts: List[str]) -> List[List[float]]:
payload = {"model": self.model_name, "input": texts, "encoding_format": "float"}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
}
embeds = []
for batch in tqdm(batch_list(texts), total=len(texts) // batch_size):
payload["input"] = batch
response = requests.post(self.api_url, json=payload, headers=headers)
embeds.extend([d["embedding"] for d in response.json()["data"]])
return embeds
def embed_query(self, text: str) -> List[float]:
return self.embed_documents([text])[0]
如果我们以512 token为一个文本块,假设一篇文档被拆分为2562个文本块,那么直接运行上面的代码会报错:
Get 2562 chunks
Storing...
15%|███████████████████████████▉ | 12/80 [00:06<00:35, 1.94it/s]
2024-09-16 17:27:42.706 Uncaught app exception
embeds.extend([d["embedding"] for d in response.json()["data"]])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable
这是因为它触发了速率限制,如果查看response.status_code
会发现它为429
。
查看硅基流动文档中关于429状态码的描述,正好就是RateLimit。
使用指数退避重试
指数退避(exponential backoff)通过反馈成倍地降低某个过程的速率,以逐渐找到合适的速率。
即在遇到速率限制错误时先进行短暂的睡眠,然后重试失败的请求。如果该请求仍然失败,则增加睡眠的时间并重复该过程,直到请求成功或达到最大重试次数。注意在实现的时候,可能不是固定的成倍数(比如睡眠时间成2),而是会增加一定的随机性。这里增加随机性防止多个设备执行退避重试时产生的同步化。
根据参考1中的方案。要使用指数退避重试,下面先介绍两个库,最后介绍自己实现。
使用tenacity库
要开始,执行pip install tenacity
安装依赖。
from langchain_core.embeddings import Embeddings
from itertools import islice
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
code_success = 200
class SiliconflowEmbeddings(Embeddings):
def __init__(self, model_name: str):
self.model_name = model_name
self.api_url = "https://api.siliconflow.cn/v1/embeddings"
# 最多重试12次
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(12))
def _post(self, payload, headers):
response = requests.post(self.api_url, json=payload, headers=headers)
if response.status_code != code_success:
raise Exception(f"Error code: {response.status_code}")
return response
def embed_documents(self, texts: List[str]) -> List[List[float]]:
payload = {"model": self.model_name, "input": texts, "encoding_format": "float"}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
}
embeds = []
for batch in tqdm(batch_list(texts), total=len(texts) // batch_size):
payload["input"] = batch
response = self._post(payload, headers)
embeds.extend([d["embedding"] for d in response.json()["data"]])
return embeds
def embed_query(self, text: str) -> List[float]:
return self.embed_documents([text])[0]
tenacity
提供一个retry
注解,我们可以加到真实调用API处,比如我们封装一下requests.post
的代码,检查它返回的状态码,如果是429我们就抛出异常,触发retry
里面的重试。这里为了简单,只要不是200就抛出异常。
这样可以处理速率限制,注意如果没有速率限制,正常情况下只需要30秒即可完成嵌入操作:
19%|██████████████████████████████████▉ | 15/80 [00:07<00:30, 2.15it/s]
当触发速率限制后,在重试中会进行一些休眠,因此最终的完成的时间会大大超过这个30秒,但至少成功嵌入了:
81it [04:25, 3.27s/it]
使用backoff库
要开始,执行pip install backoff
安装依赖。
import backoff
@backoff.on_exception(backoff.expo, Exception, max_time=60)
def _post(self, payload, headers):
response = requests.post(self.api_url, json=payload, headers=headers)
if response.status_code != code_success:
raise Exception(f"Error code: {response.status_code}")
return response
使用backoff
和tenacity
差不多,它也提供了一个注解backoff.on_exception
。它会会使用一个叫做jitter
的函数来增加随机性避免同步化带来的冲突:
jitter: A function of the value yielded by wait_gen returning
the actual time to wait. This distributes wait times
stochastically in order to avoid timing collisions across
concurrent clients. Wait times are jittered by default
using the full_jitter function. Jittering may be disabled
altogether by passing jitter=None.
自己实现指数退避重试
我们可以看看如何手动实现指数退避重试,以认识到它的实现原理非常简单:
import random
import time
code_success = 200
code_rate_limit = 429
# 自定义异常
class ConflictError(Exception):
status_code: int = 429
def retry_with_exponential_backoff(
func,
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 12,
errors: tuple = (ConflictError,),
):
def wrapper(*args, **kwargs):
num_retries = 0
delay = initial_delay
# 循环直到一次成功的响应或达到max_retries次数
while True:
try:
return func(*args, **kwargs)
# 在指定的错误上重是
except errors as e:
# 增加重试次数
num_retries += 1
# 检查是否达到最大重试次
if num_retries > max_retries:
raise Exception(
f"Maximum number of retries ({max_retries}) exceeded."
)
# 增加延迟 random.random() 生成 0到1之间的随机数,这里是为了增加随机性
delay *= exponential_base * (1 + jitter * random.random())
# 休眠
time.sleep(delay)
# 如果不是指定的异常则抛出
except Exception as e:
raise e
return wrapper
我们判断状态码,如果是429则抛出ConflictError
,否则直接返回response
。
@retry_with_exponential_backoff
def _post(self, payload, headers):
response = requests.post(self.api_url, json=payload, headers=headers)
if response.status_code == code_rate_limit:
raise ConflictError()
return response
参考
- https://platform.openai.com/docs/guides/rate-limits
- https://cloud.siliconflow.cn/rate-limits