想实现一个web请求管理分发服务,需要有如下功能:
1、第三方服务上报心跳,管理服务能监控第三方服务是否存活
2、管理服务支持http和ws服务的转发
3、管理服务支持最基础的转发策略,比方说轮询
直接上代码
一、网络和路由接口
async def main():
app = web.Application()
manger = Manger()
app.add_routes([
web.post('/nlp', manger.post),
web.get('/ws', manger.stream_nlp),
web.post('/heartbeat', manger.heartbeat)
])
asyncio.create_task(manger.watch_service_nodes())
return app
if __name__ == '__main__':
if not os.path.exists("./logs"):
os.makedirs("./logs")
local_ip = "0.0.0.0"
with open('./config.json','r',encoding='utf-8') as f:
port = json.load(fp=f)['listen_port']
web.run_app(main(), port=port)
manger类中完成心跳监测、http_post转发、ws请求转发以及服务节点状态监控,其中服务节点状态监控需要另起一个协程来实现挂起。
二、manager类部分
属性和方法如上所示
1、服务注册——根据第三方服务上报的心跳,对service_nodes进行初始化,注册服务名标签、请求次数、存活时间等。
def register_node(self, node_dict):
self.rlock.acquire()
service_name = node_dict['servicename']
service_name = service_name.split('_')
tag = ""
if len(service_name) == 2:
tag = service_name[-1]
service_name = service_name[0]
addr_ip = node_dict['addrip']
if service_name not in self.service_nodes:
node = {
"servicename":service_name,
"access_count":0,
"ip_details":{
addr_ip: {
"addr_ip": addr_ip,
"access_count": 0,
"alive_time": 5,
"tag": tag
}
}
}
self.service_nodes[service_name] = node
else:
node = self.service_nodes[service_name]
if addr_ip not in node['ip_details']:
node['ip_details'][addr_ip] = {
"addr_ip": addr_ip,
"access_count": 0,
"alive_time": 5,
"tag": tag
}
else:
node['ip_details'][addr_ip]["alive_time"] = 5
self.rlock.release()
return True
2、心跳服务接口
async def heartbeat(self, request:web.Request):
req = await request.json()
self.logger.debug(f"{req}")
for ele in req['params']['data']:
# asyncio.to_thread(self.register_node(ele))
self.register_node(ele)
return web.json_response('{"status":1}')
接收第三方服务的请求,并调用register_node注册函数对service_nodes进行初始化或者更新
3、服务节点监测功能
async def watch_service_nodes(self):
logger = Logger(log_name="node_log",log_level=10, log_file='./logs/node.log').logger
while True:
logger.info(f"{self.service_nodes}")
self.rlock.acquire()
# 记录需要删除的ip节点和servicename
node2delete_map = {}
for k, node in self.service_nodes.items():
ip_details = node['ip_details']
for addr,_ in ip_details.items():
node['ip_details'][addr]['alive_time'] -= 1
if node['ip_details'][addr]['alive_time'] <=0:
if k not in node2delete_map:
node2delete_map[k] = {
"addr2delete":[addr],
"alive_addr_count":len(ip_details) -1
}
else:
if addr not in node2delete_map[k]['addr2delete']:
node2delete_map[k]['addr2delete'].append(addr)
node2delete_map[k]['alive_addr_count'] -= 1
for k ,v in node2delete_map.items():
if v['alive_addr_count'] == 0:
del self.service_nodes[k]
else:
for addr in v['addr2delete']:
del self.service_nodes[k]['ip_details'][addr]
self.rlock.release()
await asyncio.sleep(0.2)
启动的协程挂起这个函数,执行service_nodes的更新功能——每隔0.2秒就更新一次,service_nodes中的每个node存活时间-1,如果存活时间为0就删除这个node
4、服务节点选择
当用户请求的时候,manger可以根据请求体中service_name以及service_nodes维护的服务状态情况,选择合适的服务节点进行请求转发。
async def select_node(self,service_name):
self.rlock.acquire()
addr = None
if service_name in self.service_nodes:
node = self.service_nodes[service_name]
ips = []
access_count = node['access_count']
for k,_ in node['ip_details'].items():
ips.append(k)
ips.sort()
index = access_count%len(ips)
addr = ips[index]
node['access_count'] = access_count + 1
node['ip_details'][addr]['access_count'] += 1
self.rlock.release()
return addr
else:
self.rlock.release()
return addr
注意上述3个方法涉及到对同一个字典的键值对更新和删除操作,有资源的竞争,需要加锁。使用重入锁或者普通的Lock这里应该都可以的,都在同一个进程中也没有涉及到递归嵌套的情况。/5、
5、http转发
async def post(self, request:web.Request):
req = await request.json()
self.logger.info(f"receive request: {json.dumps(req, ensure_ascii=False)}")
service_name = req['service_name']
addr = await self.select_node(service_name=service_name)
if not addr:
return self.build_fail_resp("model not support")
url = f"http://{addr}/nlp"
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, data=json.dumps(req), verify_ssl=False, timeout=60) as response:
res = await response.json()
except TimeoutError as e:
self.logger.info(f"{e}")
return self.build_fail_resp("model time out")
except Exception as e:
self.logger.info(f"{e}")
return self.build_fail_resp("model interal error")
return web.json_response(res)
def build_fail_resp(self, msg):
data = {
'id': 0,
'jsonrpc': '2.0',
'ret': -1,
'error_text': f"{msg}"
}
return web.json_response(data)
用户请求的时候post函数监听到请求时,调用select_node选择对应的服务地址,然后使用sync with aiohttp.ClientSession() as session上下文session自动管理会话,异步的调用第三方服务,得到响应后再响应给用户。
6、ws请求转发
async def stream_nlp(self, request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# 接收客户端的消息
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
if msg.data == 'close':
self.logger.info("ws close")
await ws.close()
else:
try:
req = msg.json()
except Exception as e:
self.logger.info("Invalid JSON format")
res = {
"ret":1,
"flag":1,
"result":"Invalid JSON format"
}
await ws.send_str(json.dumps(res, ensure_ascii=False))
await ws.close()
return ws
self.logger.info(f"ws request: {req}")
service_name = req['service_name']
self.logger.info(f"service_name: {service_name}")
addr = await self.select_node(service_name=service_name)
self.logger.info(f"add: {addr}")
if addr:
# 连接到上游 WebSocket 服务
upstream_ws_url = f"ws://{addr}/ws"
self.logger.info(f"upstream_ws_url: {upstream_ws_url}")
async with aiohttp.ClientSession() as session:
# 连接到 upstream WebSocket 服务器
async with session.ws_connect(upstream_ws_url) as upstream_ws:
try:
if not upstream_ws.closed:
await upstream_ws.send_str(json.dumps(req,ensure_ascii=False))
# 接收来自 upstream ws 服务器的消息
async for up_msg in upstream_ws:
if up_msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(up_msg.data)
ret = data['ret']
flag = data['flag']
result = data['result']
if ret != 0:
self.logger.info(f"{result}")
res = {
"ret": 1,
"flag": 1,
"result": result
}
await ws.send_str(json.dumps(res, ensure_ascii=False))
await ws.close()
return ws
else:
if flag == 0:
res = {
"ret": 0,
"flag": 0,
"result": result
}
else:
res = {
"ret": 0,
"flag": 1,
"result": result
}
self.logger.info(f"ws last send_str: {json.dumps(res, ensure_ascii=False)}")
# self.logger.info(f"ws closed: {ws.closed}")
await ws.send_str(json.dumps(res, ensure_ascii=False))
elif up_msg.type == web.WSMsgType.CLOSED:
break
else:
self.logger.info(f"upstream_ws closed")
res = {
"ret": 1,
"flag": 1,
"result": f"upstream_ws {upstream_ws_url} closed"
}
await ws.send_str(json.dumps(res, ensure_ascii=False))
await ws.close()
except Exception as e:
self.logger.info(f"ws state ws.closed: {ws.closed}")
self.logger.info(f"upstream_ws state upstream_ws.closed: {upstream_ws.closed}")
self.logger.info(f"error occur ws close : {e}")
await ws.close()
finally:
# 确保关闭上游 WebSocket 连接
if not upstream_ws.closed:
await upstream_ws.close()
self.logger.info(f"finally ws close")
await ws.close()
else:
res = {
"ret": 1,
"flag": 1,
"result": "model not support"
}
await ws.send_str(json.dumps(res, ensure_ascii=False))
await ws.close()
elif msg.type == web.WSMsgType.BINARY:
res = {
"ret": 1,
"flag": 1,
"result": "msg type not support"
}
await ws.send_str(json.dumps(res, ensure_ascii=False))
await ws.close()
else:
await ws.close()
break
return ws
ws请求的服务端监听用户请求
ws = web.WebSocketResponse()
await ws.prepare(request)
解析用户的请求拿到msg,调用select_node选择对应的服务地址,然后使用async with session.ws_connect(upstream_ws_url) as upstream_ws异步请求上游ws服务,发送请求,收到upstream_ws响应数据然后再流式的返回给用户。
7、日志记录
def create_logger(self):
log_level = 20 # info
log_path = "./logs/server.log"
logger = logging.getLogger(__name__)
logger.setLevel(level=log_level)
formatter = logging.Formatter("%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s")
# 创建一个handler,用于写入日志文件,按大小覆盖
# file_handler = logging.handlers.RotatingFileHandler(filename=log_path, maxBytes=838860800, backupCount=20, encoding='utf-8')
# 按日期覆盖
file_handler = logging.handlers.TimedRotatingFileHandler(filename=log_path, when='D', interval=1,
encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(level=log_level)
logger.addHandler(file_handler)
# 创建一个handler,用于将日志输出到控制台
console = logging.StreamHandler()
console.setLevel(level=log_level)
console.setFormatter(formatter)
logger.addHandler(console)
return logger
以上就是全部的小功能代码,比较简单,还可以设计缓存功能,组成batch然后请求诸如模型推理这类第三方服务。