python使用websocket服务并在fastAPI中启动websocket服务

news2025/1/16 13:58:03
  1. 依赖
    pip install websockets-routes
  2. 代码
    import asyncio
    import websockets
    import websockets_routes
    from websockets.legacy.server import WebSocketServerProtocol
    from websockets_routes import RoutedPath
    
    # 初始化一个router对象
    router = websockets_routes.Router()
    
    
    # 消息监听
    @router.route("/command/{identification}")
    async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
        # 收到消息
        async for message in websocket:
            # 处理setting用户的业务
            if path.params['identification'] == 'setting':
                await websocket.send("你发给我的消息是:" + message)
            # 处理administrator用户的业务
            elif path.params["identification"] == 'administrator':
                await websocket.send("你发给我的消息是:" + message)
            else:
                await websocket.send("指令码错误")
    
    
    # 启动WebSocket服务器
    async def main():
        # 启动WebSocket服务
        async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
            await asyncio.Future()  # run forever
    
    
    # 启动WebSocket服务
    asyncio.run(main())
    
  3. 连接服务

  4. 在fastAPI中启动websocket服务

    import asyncio
    import websockets
    import websockets_routes
    from websockets.legacy.server import WebSocketServerProtocol
    from websockets_routes import RoutedPath
    
    # 初始化一个router对象
    router = websockets_routes.Router()
    
    
    # 消息监听
    @router.route("/command/{identification}")
    async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
        # 收到消息
        async for message in websocket:
            # 处理setting用户的业务
            if path.params['identification'] == 'setting':
                await websocket.send("你发给我的消息是:" + message)
            # 处理administrator用户的业务
            elif path.params["identification"] == 'administrator':
                await websocket.send("你发给我的消息是:" + message)
            else:
                await websocket.send("指令码错误")
    
    
    # 启动WebSocket服务器
    async def main():
        # 启动WebSocket服务
        async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
            await asyncio.Future()  # run forever
    
    
    def start():
        # 启动WebSocket服务
        asyncio.run(main())
    
    if __name__ == "__main__":
        # 开启一个线程去启动WebSocket服务器
        thread = Thread(target=start)
        thread.start()
        # 启动Web服务
        uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/44849.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Archlinux安装软件的那些事

个人主页:董哥聊技术我是董哥,嵌入式领域新星创作者创作理念:专注分享高质量嵌入式文章,让大家读有所得!文章目录1、ArchLinux1.1 ArchLinux原则1.2 软件包管理1.2.1 软件仓库1.2.2 包管理器2、Pacman2.1 pacman介绍2.…

什么是幂等性?四种接口幂等性方案详解!

幂等性在我们的工作中无处不在,无论是支付场景还是下订单等核心场景都会涉及,也是分布式系统最常遇到的问题,除此之外,也是大厂面试的重灾区。 知道了幂等性的重要性,下面我就详细介绍幂等性以及具体的解决方案&#…

SpringBoot中自动配置

第一种: 给容器中的组件加上 ConfigurationProperties注解即可 测试: Component ConfigurationProperties(prefix "mycar") public class Car {private String brand;private Integer price;private Integer seatNum;public Integer getSeat…

币圈已死,绿色积分是全新的赛道吗?

近几年来,移动互联网行业的迅猛发展,快速改变着社会业态。尽管如此,仍有大量企业线上线下处于割裂状态,2020 年一场疫情的突然爆发,并持续到 2022年,对零售行业造成流量崩塌、供应链中断、市场供需下滑等压…

现代 CSS 高阶技巧,完美的波浪进度条效果。

将专注于实现复杂布局,兼容设备差异,制作酷炫动画,制作复杂交互,提升可访问性及构建奇思妙想效果等方面的内容。 在兼顾基础概述的同时,注重对技巧的挖掘,结合实际进行运用,欢迎大家关注。 正…

金属非金属如何去毛刺 机器人浮动去毛刺

毛刺的产生 在金属非金属零件的加工中,由于切削加工过程中塑性变形引起的毛边,或者是铸造、模锻等加工的飞边,或是焊接挤出的残料,这些与所要求的形状、尺寸有所出入,在被加工零件上派生出的多余部分即为毛刺&#xf…

音视频开发之 ALSA实战!

前言: 今天我们来分享一个开源的音频采集代码,现在大部分音频采集都是通过ALSA框架去采集,如果大家把ALSA采集代码学懂,那么大部分的音频采集都可以搞定。这个代码是用ALSA进行音频PCM的采集并保存到本地文件。一、alsa框架的介绍…

C#语言实例源码系列-实现输入框焦点变色和窗体拖拽改变大小

专栏分享点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册 👉关于作者 众所周知,人生是一个漫长的流程,不断克服困难,不断反思前进的过程。在这个过程中…

002.组合总和|||——回溯算法

1.题目链接: 216. 组合总和 III 2.解题思路: 2.1.题目要求: 给一个元素数量k和一个元素和n,要求从范围[1,2,3,4,5,6,7,8,9]中返回所有元素数量为k和元素和为n的组合。(每个数字只能使用一次) 比如输入k…

深度学习快速入门----Pytorch 系列2

注:参考B站‘小土堆’视频教程 视频链接:【PyTorch深度学习快速入门教程(绝对通俗易懂!)【小土堆】 上一篇:深度学习快速入门----Pytorch 1 文章目录八、神经网络--非线性激活九、神经网络--线性层及其他层…

作为IT行业过来人,我有3个重要建议给后辈程序员!

见字如面,我是军哥!作为一名 40 岁的 IT 老兵,我在年轻时踩了不少坑,我总结了其中最重要的 3 个并一次性分享给你,文章不长,你一定要看完哈~1、重视基础还不够,还要注重技术广度和深…

第2-4-8章 规则引擎Drools实战(1)-个人所得税计算器

文章目录9. Drools实战9.1 个人所得税计算器9.1.1 名词解释9.1.2 计算规则9.1.2.1 新税制主要有哪些变化?9.1.2.2 资较高人员本次个税较少,可能到年底扣税增加?9.1.2.3 关于年度汇算清缴9.1.2.4 个人所得税预扣率表(居民个人工资、…

科教导刊杂志科教导刊杂志社科教导刊编辑部2022年第27期目录

前沿视角《科教导刊》投稿:cn7kantougao163.com 新时代研究生教育质量评价指标体系的框架构建 李军伟;赵永克;杨丹; 1-3 基于现代学徒制的“多主体、双标准、五维度”人才培养质量评价体系构建 汪帆;刘严; 4-6 高教论坛 新工科背景下地方性院校第二课堂…

【云原生】Docker容器服务更新与发现之consul

内容预知 1.consul的相关知识 1.1 什么是注册与发现 1.2 什么是consul 1.3 zookeeper和consul的区别 2. consul 部署 2.1 部署consul服务器 2.2 registrator服务器 3.consul-template 的引入 3.1 consul-template的作用 3.2 consul-template的具体部署运用 &…

微信开发者工具C盘占用大的问题

将User Data 下的文件迁移到其他盘,比如 D盘,E盘,F盘 步骤如下: 1.找到微信开发者工具C盘所在的缓存目录,一般为 C:\Users\ 你的用户名\AppData\Local\微信开发者工具\User Data 将里面的内容全部剪切到其它盘符&…

从鹅厂实例出发!分析Go Channel底层原理

本文是基于Go1.18.1源码的学习笔记。Channel的底层源码从Go1.14到现在的Go1.19之间几乎没有变化,这也是Go最早引入的组件之一,体现了Go并发思想:Do not communicate by sharing memory; instead, share memory by communicating.不要通过共享…

Playwright 简明入门教程:录制自动化测试用例,结合 Docker 使用

本篇文章聊聊如何使用 Playwright 进行测试用例的录制生成,以及如何在Docker 容器运行测试用例,或许是网上最简单的入门教程。 写在前面 Playwright 是微软出品的 Web 自动化测试工具和框架,和 Google Puppeteer 有着千丝万缕的关系。前一阵…

[附源码]计算机毕业设计springboot儿童早教课程管理系统论文2022

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

如何治理 Electron 版本淘宝直播应用崩溃?

经过几个月的努力,基于Electron框架开发的新版淘宝直播推流软件终于上线了。随之而来的就是线上用户反馈的各种问题,其中最影响用户体验的当属应用崩溃问题了。当应用程序出现未 catch 的异常时就会发生崩溃,本文介绍了客户端应用崩溃的处理流…

javaSE - Arrays - 数组的定义与使用

一、数组基本用法 1.1、什么是数组 数组本质上就是让我们能 “批量” 创建相同类型的变量 也可以说是存储一组相同数据类型的数据的集合 如: 如果需要表示两个数据, 那么直接创建两个变量即可 int a; int b 如果需要表示五个数据, 那么可以创建五个变量 int a1; int a2; int …