- 依赖
pip install websockets-routes
- 代码
import asyncio import websockets import websockets_routes from websockets.legacy.server import WebSocketServerProtocol from websockets_routes import RoutedPath # 初始化一个router对象 router = websockets_routes.Router() # 消息监听 @router.route("/command/{identification}") async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath): # 收到消息 async for message in websocket: # 处理setting用户的业务 if path.params['identification'] == 'setting': await websocket.send("你发给我的消息是:" + message) # 处理administrator用户的业务 elif path.params["identification"] == 'administrator': await websocket.send("你发给我的消息是:" + message) else: await websocket.send("指令码错误") # 启动WebSocket服务器 async def main(): # 启动WebSocket服务 async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089): await asyncio.Future() # run forever # 启动WebSocket服务 asyncio.run(main())
-
连接服务
-
在fastAPI中启动websocket服务
import asyncio import websockets import websockets_routes from websockets.legacy.server import WebSocketServerProtocol from websockets_routes import RoutedPath # 初始化一个router对象 router = websockets_routes.Router() # 消息监听 @router.route("/command/{identification}") async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath): # 收到消息 async for message in websocket: # 处理setting用户的业务 if path.params['identification'] == 'setting': await websocket.send("你发给我的消息是:" + message) # 处理administrator用户的业务 elif path.params["identification"] == 'administrator': await websocket.send("你发给我的消息是:" + message) else: await websocket.send("指令码错误") # 启动WebSocket服务器 async def main(): # 启动WebSocket服务 async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089): await asyncio.Future() # run forever def start(): # 启动WebSocket服务 asyncio.run(main())
if __name__ == "__main__": # 开启一个线程去启动WebSocket服务器 thread = Thread(target=start) thread.start() # 启动Web服务 uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)