FastAPI 学习之路(四十九)WebSockets(五)修复接口测试中的问题

news2024/12/26 13:51:51

其实代码没有问题,但是我们忽略了一个问题,就是在正常的开发中,肯定是遇到过这样的情况,我们频繁的有客户端链接,断开连接,需要统一的管理这些链接,那么应该如何管理呢。其实可以声明一个类去管理这些链接。接下来我们看下该如何优化。

一、优化测试接口方法

1.定义链接管理类,处理所有链接

"""
websocket 链接管理
"""


from typing import List, Dict

from starlette.websockets import WebSocket


class ConnectionManager:

    def __init__(self):
        """存放链接"""
        self.active_connections: List[Dict[str, WebSocket]] = []

    async def connect(self, user: str, ws: WebSocket):
        """链接"""
        self.active_connections.append({"user": user, "ws": ws})

    async def disconnect(self, user: str, ws: WebSocket):
        """断开链接,移除"""
        self.active_connections.remove({"user": user, "ws": ws})

2.修改应用代码

我们增加了链接,移除链接的操作,那么对应修改下代码

from connection_tool import ConnectionManager
from starlette.websockets import WebSocketDisconnect
ws_manager = ConnectionManager()


@app.websocket("/items/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    cookie_or_token: str = Depends(get_cookie_or_token),
):
    await websocket.accept()
    await ws_manager.connect(cookie_or_token, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message is: {data}")
    except WebSocketDisconnect as e:
        await ws_manager.disconnect(cookie_or_token, websocket)

3.测试 

这样我们在链接处理的时候就可以正常处理了。之前报错是因为我们没有正常的关闭链接导致的,那么我们再测试一下

"""
测试websockets
"""

from fastapi.testclient import TestClient
from main import app


def test_websocket():
    client = TestClient(app)
    with client.websocket_connect("/items/ws?token=fake-token") as websocket:
        websocket.send_text("Hello, this is testing websocket")
        data = websocket.receive_text()
        print(data)
        assert str(data) == f"Message is: Hello, this is testing websocket"


if __name__ == '__main__':
    test_websocket()

此时,发现代码不会再报错 

 二、增加测试用例并优化

1.增加用例代码

import unittest

from fastapi.testclient import TestClient

from main import app


class FastApiTestWeb(unittest.TestCase):

    def setUp(self) -> None:
        self.client = TestClient(app)

    def tearDown(self) -> None:
        self.client = None

    def test_websocket(self):
        with self.client.websocket_connect("/items/ws?token=fake-token") as websocket:
            websocket.send_text("Hello, this is using test case to test websocket")
            data = websocket.receive_text()
            print(data)
            assert str(data) == "Message is: Hello, this is using test case to test websocket"

    def test_websocket_again(self):
        with self.client.websocket_connect("/items/ws?token=fake-token") as websocket:
            websocket.send_text("Hello, this is using test case to test websocket again")
            data = websocket.receive_text()
            print(data)
            assert str(data) == "Message is: Hello, this is using test case to test websocket again"


if __name__ == '__main__':
    unittest.main()

2.执行用例

这样我们的一个测试用例就更加的完整了。我们执行正常是没有报错的

3. 查看代码的覆盖率 

pip install coverage

我们想要看下代码的覆盖率,应该如何看呢。我是用的coverage。

然后再report

  我们想看html测试报告,可以运行下 coverage html。

然后打开index.html

因为我的main.py还有其他的方法,我们还需要点进去看我们对应方法的覆盖率。

如果想要将覆盖率都达到100%,还需要针对其他方法增加测试用例。

到这里,我们对于WebSockets接口测试完毕,但是如果我们想实现上线通知,下线通知,如何实现呢?见下一节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1922267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

218.贪心算法:分发糖果(力扣)

核心思想 初始化每个学生的糖果数为1: 确保每个学生至少有一颗糖果。从左到右遍历: 如果当前学生的评分高于前一个学生,则当前学生的糖果数应比前一个学生多一颗。从右到左遍历: 如果当前学生的评分高于后一个学生,则…

01对话系统---文字渐出和停顿效果

实现代码 using System.Collections; using System.Collections.Generic; using UnityEngine; using TMPro; using System.Text.RegularExpressions; using System;/// <summary> /// 增加文本时间停顿功能 /// 增加文字渐出&#xff0c;&#xff08;单个字符逐渐显现&a…

Unity免费领场景多人实时协作地编2人版局域网和LAN联机类似谷歌文档协同合作搭建场景同步资产设置编辑付费版支持10人甚至更多20240709

大家有没有用过谷歌文档、石墨文档、飞书文档等等之类的协同工具呢&#xff1f; Blender也有类似多人联机建模的插件&#xff0c; Unity也有类似的多人合作搭建场景的插件啦。 刚找到一款免费插件&#xff0c;可以支持2人局域网和LAN联机地编。 付费的版本支持组建更大的团队。…

从汇编层看64位程序运行——静态分析和动态分析入门

大纲 GDBIDA总结参考资料 之前一直谈各种相对宏观的工具怎么使用&#xff0c;比如Flink、RabbitMQ等。最近想聊聊比较微观的技术&#xff0c;用各种“显微镜”去看看运行在系统层的二进制码是什么样子。当然二进制码比较难以记忆&#xff0c;于是我会从二进制码的助记符——汇编…

IOS上微信小程序密码框光标离开提示存储密码解决方案

问题&#xff1a; ios密码框输入密码光标离开之后会提示存储密码的弹窗 解决方案 1、在苹果手机上面把 “自动填充密码”关闭&#xff0c;但是苹果这个默认开启&#xff0c;而且大部分客户也不会去自己关闭。 2、欺骗苹果手机&#xff0c;代码实现。 先说解决思路&#xf…

java内部类的本质

定义在类内部&#xff0c;可以实现对外部完全隐藏&#xff0c;可以有更好的封装性&#xff0c;代码实现上也往往更为简洁。 内部类可以方便地访问外部类的私有变量&#xff0c;可以声明为private从而实现对外完全隐藏。 在Java中&#xff0c;根据定义的位置和方式不同&#xf…

mmaction2的GPU环境配置记录RTX3090,cuda12.2,ubuntu22.04版本

1、配置镜像源 最重要的一个步骤,先看下镜像源地址,如果镜像源有问题,所有的包安装都会有问题 镜像源地址获取地址:选择对应的ubuntu版本号,将里面的镜像源地址复制出来,更新到服务器 ubuntu | 镜像站使用帮助 | 清华大学开源软件镜像站 | Tsinghua Open Source Mirro…

【web]-sql注入-bestdb

打开页面后&#xff0c;如图 查看源代码&#xff0c;发现有段注释&#xff0c;尝试sql注入 <!-- $sql "SELECT * FROM users WHERE id $query OR username \"$query\"";--> 1、尝试万能密码 1 or 11# / admin&#xff0c; 提示F…

Linux磁盘-MBRGPT

作者介绍&#xff1a;简历上没有一个精通的运维工程师。希望大家多多关注作者&#xff0c;下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 Linux磁盘涉及到的命令不是很多&#xff0c;但是在实际运维中的作用却很大&#xff0c;因为Linux系统及业务都会承载到硬盘上…

2024年新一代WebOffice内嵌网页组件——猿大师办公助手

背景 WebOffice控件这个中间件软件产品已存在二十余年&#xff0c;在国内众多大中小型企业、各级政府机关、科研机构和学校等事业单位的OA、ERP、文档系统、云盘等信息化B/S系统中得到了大量使用&#xff0c;为我国的信息化事业也做出了不小贡献。随着操作系统、浏览器及Offic…

随笔一、泰山派RK3566开发板调试串口波特率修改

摘要&#xff1a;立创泰山派RK3566开发板默认调试串口波特率是1500000bps&#xff0c;一般串口助手工具没有此波特率&#xff0c;为适应各种调试环境需要&#xff0c;打算修改调试串口波特率为115200bps 需要修改三个部分 1. uboot引导部分 修改tspi_linux_sdk/u-boot/config…

面试项目 | 带你玩转大学生智能汽车项目

文章目录 前言智能车系统介绍1.从智能车说起2.整体系统结构3.经验之谈 系统分层设计1.分层与模块化2.整个系统的总体设计控制与图像层嵌入式平台层 机械与硬件设计部分1.硬件布线2.电机H桥3.机械这块简单说几句吧 嵌入式平台软件搭建1.从任务调度说起大循环调度定时任务调度实时…

[吃瓜教程]南瓜书第6章软间隔与支持向量回归

1.软间隔支持向量机的模型 之前讨论的支持向量机的一个重要的假设前提是它的数据集是线性可分的。然而在现实任务中&#xff0c;线性不可分的情形才是最常见的&#xff0c;因此需要允许支持向量机犯错。这就是接下来要说的软间隔的支持向量机。 2.软间隔支持向量机的策略 从…

PropertySourcesPropertyResolver

SpringBoot应用中涉及配置相关的东西&#xff0c;例如配置文件、环境变量、系统变量等都是基于类ApplicationServletEnvironment的构造器开启的&#xff0c;如下其抽象父类AbstractEnvironment构造器&#xff1a; public abstract class AbstractEnvironment{private final Mu…

TypeError: load() got an unexpected keyword argument ‘loader‘

定义函数 get_yaml_data用来读取 login_data.yml 文件并打印其内容。 import yamldef get_yaml_data(path):with open(path,moder,encodingutf-8)as f:data yaml.load(f,loader yaml.FullLoader)return dataif __name____main__:pathr..\config\desired_caps.ymlprint(get_y…

7 月12日学习打卡--栈和队列的相互转换

hello大家好呀&#xff0c;本博客目的在于记录暑假学习打卡&#xff0c;后续会整理成一个专栏&#xff0c;主要打算在暑假学习完数据结构&#xff0c;因此会发一些相关的数据结构实现的博客和一些刷的题&#xff0c;个人学习使用&#xff0c;也希望大家多多支持&#xff0c;有不…

如何写一个好标题,让你的论文接受率提升300%?(附公式模版)

我是娜姐 迪娜学姐 &#xff0c;一个SCI医学期刊编辑&#xff0c;探索用AI工具提效论文写作和发表。 一篇论文的标题虽然只有短短2-3行&#xff0c;一般不超过15个单词&#xff0c;但是不可否认&#xff0c;标题的阅读量是最大的&#xff0c;因为你只有先通过标题激发了审稿人和…

大模型备案,这样操作就对了!全程指导助你成功备案

在人工智能技术日新月异的今天&#xff0c;大型语言模型&#xff08;简称“大模型”&#xff09;作为AI领域的璀璨明珠&#xff0c;正逐步渗透到我们生活的方方面面&#xff0c;从智能客服到内容创作&#xff0c;从个性化推荐到医疗健康&#xff0c;其影响力不可小觑。 然而&am…

一个极简的 Vue 示例

https://andi.cn/page/621516.html

Linux 04:进程概念

1. 操作系统(Operator System) 概念 任何计算机系统都包含一个基本的程序集合&#xff0c;称为操作系统(OS)。笼统的理解&#xff0c;操作系统包括&#xff1a; 内核&#xff08;进程管理&#xff0c;内存管理&#xff0c;文件管理&#xff0c;驱动管理&#xff09;。其他程序…