一、令牌桶算法(Token Bucket)
原理其实很简单,就是设置同一时刻服务器能处理请求的最大数量,如果超过这个数据,则需要等待,或者不处理请求。相当于设置最大并发量,但是细节是,还设置了生成令牌的速率,可以避免后面的请求失败。保证长期请求。具体流程为:
- 设置桶的大小,默认是满桶,再设置生成令牌的时间。
- 生成的令牌如果在满桶的情况下会溢出
- 请求时必须先获取一个令牌,获取后在才可以进行数据请求,会消耗一个桶中的令牌
- 如果获取不到令牌则会请求失败,或者等待。
#TokenBucket.py
import time
class TokenBucket:
def __init__(self, rate, capacity):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_time = time.time()
def get_token(self)-> bool:
now = time.time()
elapsed = int(now - self.last_time)
self.last_time = now
self.tokens += elapsed * self.rate
if self.tokens > self.capacity:
self.tokens = self.capacity
if self.tokens >= 1:
self.tokens -= 1
return True
else:
return False
import tornado.ioloop
import tornado.web
from TokenBucket import TokenBucket
class MainHandler(tornado.web.RequestHandler):
def initialize(self, bucket):
self.bucket : TokenBucket = bucket
async def get(self):
if self.bucket.get_token():
self.write("请求成功")
else:
self.set_status(429)
self.write("请求被拒绝:太多请求")
def make_app():
bucket = TokenBucket(rate=1, capacity=5)
return tornado.web.Application([
(r"/", MainHandler,dict(bucket=bucket)),
],debug=True)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
print("请求地址为 http://localhost:8888")
tornado.ioloop.IOLoop.current().start()
这里将桶的大小设置为5,速率为一秒钟生成一个令牌,没有令牌就会请求失败,这样可以很好的限制统同一时刻的并发量问题,并且也不会影响后续的请求,因为令牌是不断生成的。
二、漏桶算法(Leaky Bucket)
严格控制请求处理速率,适合需要稳定流量控制的场景。非常类似于令牌桶算法,也是设置一个漏桶的大小,以及流速。将请求比做水滴,每次请求都会进桶,根据流速流出,相当于这个请求完毕。如果桶满的话,需要等待或者不处理。相比于令牌桶的话,请求更加稳定,因为设置的速率,确保系统能够平稳处理流量。例如令牌桶可能会出现一下高并发请求、一下无请求。而漏桶算法就不会,因为速率是恒定的。
- 设置桶的大小以及速率
- 桶满的情况会溢出,等待或者丢弃
- 漏桶会以固定速率处理请求
- 直到所有请求完毕
#LeakyBucket.py
import time
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity
self.leak_rate = leak_rate
self.tokens = 0
self.last_check = time.time()
def can_process(self)-> bool:
current_time = time.time()
elapsed_time = current_time - self.last_check
self.last_check = current_time
processed_tokens = elapsed_time * self.leak_rate
self.tokens = max(0, self.tokens - processed_tokens)
if self.tokens < self.capacity:
self.tokens += 1
return True
else:
return False
import tornado from LeakyBucket import LeakyBucket
class MainHandler(tornado.web.RequestHandler):
def initialize(self, bucket):
self.bucket : LeakyBucket = bucket
async def get(self):
if self.bucket.can_process():
self.write("请求成功")
else:
self.set_status(429)
self.write("请求失败")
self.finish()
def make_app():
bucket = LeakyBucket(capacity=5, leak_rate=1)
return tornado.web.Application([
(r"/", MainHandler, dict(bucket=bucket)),
],debug = True)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
print("服务器启动:地址为 http://127.0.0.1:8888") tornado.ioloop.IOLoop.current().start()
三、对比
为了更好的突出令牌桶算法和漏桶算法的区别,这里使用python写一段脚本进行测试
import requests
for i in range(11):
response = requests.get("http://localhost:8888")
print(response.status_code, response.text)
令牌桶算法
漏桶算法
这样就可以很直观的感受变化。桶令牌算法可能会出现很大的波动,而漏桶算法在解决大量请求时,处理的方式就特别稳定。具体的速率需要根据实际情况。
四、固定窗口计数器(Fixed Window Counter)
简单来说就是在指定时间的内可以允许的请求数量,也就是将这个时间段比作窗口。这个窗口在一定时间内只能处理指定的请求数据,超过的请求等待或者不处理。通过不断重置起始时间来保证窗口。
- 设置窗口大小以及最大请求数量
- 请求时检查时间是否超过,再检查请求数量是否超过
- 超过则拒绝请求。
#FixedWindowCounter.py
import time
class FixedWindowCounter:
def __init__(self, limit, window_size):
self.limit = limit
self.window_size = window_size
self.request_count = 0
self.window_start = time.time()
def can_process(self) -> bool:
current_time = time.time()
if current_time - self.window_start >= self.window_size:
self.window_start = current_time
self.request_count = 0
if self.request_count < self.limit:
self.request_count += 1
return True
else:
return False
import tornado
from FixedWindowCounter import FixedWindowCounter
class MainHandler(tornado.web.RequestHandler):
def initialize(self, counter):
self.counter : FixedWindow = counter
async def get(self):
if self.counter.can_process():
self.write("请求成功")
else:
self.set_status(429)
self.write("请求被拒绝")
def make_app():
counter = FixedWindowCounter(limit=100, window_size=60)
return tornado.web.Application([
(r"/", MainHandler, dict(counter=counter)),
], debug=True)
if __name__ == "__main__":
app = make_app()
server = tornado.httpserver.HTTPServer(app)
server.listen(8888)
print("服务器正在监听端口 http://localhost:8888")
tornado.ioloop.IOLoop.current().start()
这里将窗口的的大小设置为一分钟,最多允许100次请求。一分钟内超过的请求不处理。这样的劣势是太过集中,并且存在边界问题,也就是在窗口马上结束,下一个窗口马上开始时突然大量请求。
五、滑动窗口计数器(Sliding Window Counter)
相比于固定窗口计数器多了一步清除旧的请求。在每次请求时会清除旧的请求。通过动态调整窗口的起点和终点,确保在任意连续时间段内最多处理设定的请求数。当一个新请求到达时,滑动窗口会移除当前窗口外的过期请求,然后检查当前窗口内的请求数是否超过限制。更精确地控制请求流量,减少边界突发流量问题。
- 设置窗口大小以及最大请求量
- 检查请求,过滤掉当前时间窗口外的过期请求
- 判断窗口容量,添加请求的时间戳
- 否则不予请求
#SlidingWindowCounter.py
import time
class SlidingWindowCounter:
def __init__(self, limit, window_size):
self.limit = limit
self.window_size = window_size
self.requests = []
def can_process(self) -> bool:
current_time = time.time()
self.requests = [req
for req in self.requests
if req > current_time - self.window_size]
if len(self.requests) < self.limit:
self.requests.append(current_time)
print("请求成功")
return True
else:
return False
import tornado
from SlidingWindowCounter import SlidingWindowCounter
class MainHandler(tornado.web.RequestHandler):
def initialize(self, counter):
self.counter : SlidingWindowCounter = counter
async def get(self):
if self.counter.can_process():
self.write("请求成功")
else:
self.set_status(429)
self.write("请求被拒绝")
def make_app():
counter = SlidingWindowCounter(limit=10, window_size=10)
return tornado.web.Application([
(r"/", MainHandler, dict(counter=counter)),
], debug=True)
if __name__ == "__main__":
app = make_app()
server = tornado.httpserver.HTTPServer(app)
server.listen(8888)
print("服务器正在监听端口 http://localhost:8888")
tornado.ioloop.IOLoop.current().start()
六、请求队列(Request Queue)
依赖于队列的先进先出的性质,将请求放入队列中,按顺序处理,超出的请求拒绝或者丢弃。达到限流的目的。
#FixedWindowQueueCounter.py
from collections import deque
class FixedWindowQueueCounter:
def __init__(self, limit):
self.limit = limit
self.queue = deque()
def can_process(self) -> bool:
if len(self.queue) < self.limit:
self.queue.append(1)
return True
else:
return False
def process_request(self):
if self.queue:
self.queue.popleft()
import tornado
from FixedWindowQueueCounter import FixedWindowQueueCounter
class MainHandler(tornado.web.RequestHandler):
def initialize(self, counter):
self.counter: FixedWindowQueueCounter = counter
async def get(self):
if self.counter.can_process():
self.write("请求成功")
self.counter.process_request()
else:
self.set_status(429)
self.write("请求被拒绝")
def make_app():
counter = FixedWindowQueueCounter(limit=10)
return tornado.web.Application([
(r"/", MainHandler, dict(counter=counter)),
], debug=True)
if __name__ == "__main__":
app = make_app()
server = tornado.httpserver.HTTPServer(app)
server.listen(8888)
print("服务器正在监听端口 http://localhost:8888")
tornado.ioloop.IOLoop.current().start()