文章目录
- 1、背景
- 2、轮子
- 2.1、telnet
- 2.2、loguru DEBUG 日志分级
1、背景
最近业务这边需要用 Python 起一个 web 服务器,做 LLM 相关的业务处理。后台选用的是 django 框架做 web 框架,现在也算结项了。初次写 Python,造出来的轮子啥的总结一下:
- telnet 调试工具台
- 用来控制日志等级、一些开发调试用的接口。
- loguru DEBUG 分类日志打印
- 项目发布肯定不能日志乱打,默认开启的是 INFO 日志,但项目开发阶段内部有类似于 RECV、RESP、TIME 等相关的 debug 日志。如果仅有一个 DEBUG 日志的话,开启后 DEBUG 日志就很多,比如我只想看各个步骤的耗时,而不想看中间的内容。所以,我们可以通过 telnet + DEBUG 分级日志的形式,仅开启 DEBUG_LVL_TIME 等级,就可以了。
2、轮子
2.1、telnet
import sys
import socket
import threading
import traceback
from typing import Any
from logger.logger import logger
DISPATCH_FLAG = "self-"
class TelnetServer:
def __init__(self):
self.commands = {
"log": {
"handle": self.set_level,
"desc": "log [all, time, recv, resp, warning, info, error] : set log level, can use '|' (only debug) combine level",
},
"h": {
"handle": self.show_help,
"desc": "h : show command help",
},
"q": {
"desc": "q : quit the telnet",
},
}
self.host = "127.0.0.1"
self.port = None
self.start_port = 10000
self.end_port = 10015 # 设定端口范围
self.max_conns = 5
self.max_buf_size = 1024
self.stop_server = False # 停止服务器标志
self.serv_thread: threading.Thread = None
self.serv_socket: socket.socket = None
self.serv_sock_timeout = 5 # 服务器套接字超时时间,单位:秒
self.cli_sock_timeout = 300 # 服务器套接字超时时间,单位:秒
def set_level(self, args, **kwargs: Any):
if len(args) == 0:
return "Invalid log level"
lvls = args[0].split("|")
debug_level = 0
debug = False
general = False
for lvl in lvls:
if logger.is_debug(lvl):
debug = True
debug_level |= logger.level(lvl)
else:
general = True
logger.set_level(lvl) # 设置info以下级别别
if debug and general:
return "Invalid log level, can't use debug and general level together"
if debug:
logger.set_level(debug_level) # 设置指定debug级别
return f"log set to level[{lvls}]"
def show_help(self, args, **kwargs: Any):
help_msg = "======================command help======================\n\n"
for cmd, info in self.commands.items():
help_msg += f"{cmd} ---> {info.get('desc')}\n\n"
help_msg += "========================================================\n"
return help_msg
def handle_command(self, command, args):
cmd = command
if command.startswith(DISPATCH_FLAG):
cmd = command[len(DISPATCH_FLAG):]
if cmd not in self.commands:
return f"Unknown command[{command}]"
response = self.commands[cmd]["handle"](args) # 仅在有参数时传入
if not command.startswith(DISPATCH_FLAG):
self._dispatch_command(command, args)
return response
def _get_listening_ports(self, start_port, end_port):
listening_ports = []
for port in range(start_port, end_port + 1):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((self.host, port))
sock.listen(1)
except OSError:
listening_ports.append(port)
finally:
sock.close()
return listening_ports
def _dispatch_command(self, command, args):
ports = self._get_listening_ports(self.start_port, self.end_port)
for dst in ports:
if dst == self.port or command == "h":
continue
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((self.host, dst))
client.send(f"self-{command} {' '.join(args)}".encode("utf-8"))
# response = client.recv(self.max_buf_size).decode("utf-8")
# client.close()
# return response
except OSError as e:
continue
except Exception as e:
logger.warning(f"dispatch command catch an exception : {e}")
traceback.print_exc()
def _listen_available_port(self):
for port in range(self.start_port, self.end_port + 1):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(self.serv_sock_timeout)
s.bind((self.host, port))
s.listen(self.max_conns)
except OSError:
continue
self.port = port
self.serv_socket = s
return
def start(self):
self.serv_thread = threading.Thread(target=self.serve_loop)
self.serv_thread.start()
def stop(self):
# 等待服务器线程结束
self.stop_server = True
self.serv_thread.join()
logger.info("telnet server stopped")
def serve_loop(self):
self._listen_available_port()
logger.info(f"telnet server started on {self.host}:{self.port}")
while not self.stop_server: # 添加终止服务器标志判断
client = None
try:
client, _ = self.serv_socket.accept()
client.settimeout(self.cli_sock_timeout)
while True:
s = ">>> "
client.send(s.encode("utf-8"))
data = client.recv(self.max_buf_size).decode('utf-8').strip()
if not data:
continue
command, *args = data.split(" ")
if command == "q":
break
response = self.handle_command(command, args)
response += "\n"
if command.startswith(DISPATCH_FLAG):
client.close()
break
client.send(response.encode('utf-8'))
except socket.timeout:
pass
except Exception as e:
logger.error(f"catch an exception : {e}")
traceback.print_exc()
finally:
if client is not None:
client.close()
if self.serv_socket is not None:
self.serv_socket.close() # 关闭服务器套接字
telnetServ: TelnetServer = None
def start_telnet():
global telnetServ
if telnetServ is None:
telnetServ = TelnetServer()
telnetServ.start()
def stop_telnet():
logger.info("stop_telnet begin")
global telnetServ
telnetServ.stop()
logger.info("stop_telnet success")
目前只写了对日志的控制,如果需要一些业务自定义函数也可以很方便添加,在这里我就去除了。
因为我们的业务使用了 Python 多进程,涉及到多个进程的 telnet 通信,所以写了一些这里的操作,如果不需要直接去除即可。
2.2、loguru DEBUG 日志分级
import sys
from typing import Any
from loguru import logger
_lg = logger
DEFAULT_DEPTH = 1
DEBUG_LVL_NONE = 0
DEBUG_LVL_TIME = 0x00000001
DEBUG_LVL_RECV = 0x00000002
DEBUG_LVL_RESP = 0x00000004
DEBUG_LVL_ALL = 0xFFFFFFFF
debug_level = DEBUG_LVL_NONE
_level_map = {
"none": DEBUG_LVL_NONE,
"time": DEBUG_LVL_TIME,
"recv": DEBUG_LVL_RECV,
"resp": DEBUG_LVL_RESP,
"all": DEBUG_LVL_ALL,
"info": "INFO",
"warning": "WARNING",
"error": "ERROR",
}
def is_debug(level: Any) -> bool:
return isinstance(level, int) | isinstance(
_level_map.get(level, _level_map.get("info")), int
)
def level(level: str):
lvl = _level_map.get(level, _level_map.get("info"))
return _level_map.get(level, _level_map.get("info"))
def init():
global _lg
_lg.remove() # 清除当前所有的日志处理器
_lg = logger.opt(depth=DEFAULT_DEPTH) # 设置堆栈深度
_lg.add(sys.stdout, level="INFO") # 添加一个新的日志处理器,设置指定级别
def debug(level: int, message: str, *args: Any, **kwargs: Any):
if debug_level & level == level:
_lg.debug(message, *args, **kwargs)
def info(message: str, *args: Any, **kwargs: Any):
_lg.info(message, *args, **kwargs)
def warning(message: str, *args: Any, **kwargs: Any):
_lg.warning(message, *args, **kwargs)
def error(message: str, *args: Any, **kwargs: Any):
_lg.error(message, *args, **kwargs)
def set_level(lvl):
global debug_level
global _lg
_lg.remove() # 清除当前所有的日志处理器
_lg.opt(depth=DEFAULT_DEPTH) # 设置堆栈深度
if is_debug(lvl): # 检查 lvl 是否为 int 类型
debug_level = lvl
_lg.add(sys.stdout, level="DEBUG")
else:
_lg.add(sys.stdout, level=level(lvl))
日志等级配合 telnet 使用即可。
需要注意的是,我是封装了原生的 logger.info、warning 等接口,所以堆栈深度是深了一层,需要使用 lg.opt 设置一下堆栈深度,才能显示到打印的行,这个自己调试一下就了解了。
日志 DEBUG 分级也是一个比较常见的,按照 二进制 位进行分级的操作,当时网上搜了搜没看见特别合适的,就直接造个轮子用吧。