目录
一、Watchdog示例
二、aiohttp服务配置热更新
在同事的golang代码中学习到了config.json热更新的功能,这里自己也学习了一下python写web服务的时候怎么来实现配置的热更新。主要是利用Watchdog这个第三方python库,来监控文件系统的改变,从而实现不用重启代码热更新的功能。
一、Watchdog示例
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class HotUpdateEventHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
return
print(f'File created: {event.src_path}')
def on_modified(self, event):
if event.is_directory:
return
print(f'File modified: {event.src_path}')
if __name__ == "__main":
path = "/root" # 检测根目录下文件和目录的改变
# 处理器
event_handler = HotUpdateEventHandler()
# 监控器
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
# 其他的功能逻辑
do_something()
except KeyboardInterrupt:
observer.stop()
observer.join()
流程:
1、定义事件处理器(文件的修改、删除等对应逻辑)
2、定义监控器
3、监控器中执行调度,传入事件处理器、监控路径和是否递归监控
重要的两个class
watchdog.observers.Obsever
watchdog.events.FileSystemEventHandler
Obsever发起监控线程,FileSystemEventHandler实现具体的文件系统变化的逻辑
在实现具体的功能的时候,需要自己重写FileSystemEventHandler中的方法
主要常用的就是以上的方法
其中event具有的属性如下
二、aiohttp服务配置热更新
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import aiohttp
from aiohttp import web
import os
config_path = "./test.json"
config = json.load(open(config_path))
def reload_config():
"""热重载配置"""
global config
try:
new_config = json.load(open(config_path))
print('Config reloaded successfully.')
config = new_config
except Exception as e:
print(f'Failed to reload config: {e}')
class ConfigReloadEventHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.src_path == config_path:
print(f"Config file modified: {event.src_path}. Reloading...")
reload_config()
async def post(request:web.Request):
req = await request.json()
send_data = config
return web.json_response(send_data)
async def main():
app = web.Application()
app.add_routes([
web.post('/nlp', post)
])
return app
# 使用方法
if __name__ == "__main__":
config_event_handler = ConfigReloadEventHandler()
path, filename = os.path.split(config_path)
observer = Observer()
observer.schedule(config_event_handler,path,recursive=False)
observer.start()
try:
local_ip = "0.0.0.0"
port = "6666"
web.run_app(main(), port=port)
except KeyboardInterrupt:
observer.stop()
observer.join()
使用aiohttp启动一个web 服务,同时热更新test.json配置文件,作为全局变量config在web服务中使用
test.json内容
{"name":"huangyang"}
请求结果
修改test.json文件内容
{"name":"huangyang_new"}
再次请求结果为
服务端日志
参考文章
watchdog 2.1.5 documentation
解密Python Watchdog:实时监控文件系统的终极解决方案