RPC 接口测试技术 —— websocket 自动化测试实践!

news2025/3/15 11:34:14

WebSocket 是一种在单个 TCP 连接上进行全双工通信 (Full Duplex 是通讯传输的一个术语。通信允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时(瞬时)进行信号的双向传输( A→B 且 B→A )。指 A→B 的同时 B→A,是瞬时同步的) 的协议。

WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API (WebSocket API 是一个使用 WebSocket 协议的接口,通过它来建立全双工通道来收发消息) 也被 W3C 定为标准。

而 HTTP 协议就不支持持久连接,虽然在 HTTP1.1 中进行了改进,使得有一个 keep-alive,在一个 HTTP 连接中,可以发送多个 Request,接收多个 Response。

但是在 HTTP 中 Request = Response 永远是成立的,也就是说一个 request 只能有一个 response。而且这个 response 也是被动的,不能主动发起。

websocket 常用于社交 / 订阅、多玩家游戏、协同办公 / 编辑、股市基金报价、体育实况播放、音视频聊天 / 视频会议 / 在线教育、智能家居与基于位置的应用。

websocket 接口不能使用 requests 直接进行接口的调用,可以依赖第三方库的方式来实现调用,以下内容介绍如何调用第三方库实现 websocket 的接口自动化测试。

实战

使用 python 语言实现 websocket 的接口自动化

环境准备

1. 安装 pyhton3 环境下载需要的运行库 2. 下载需要的运行库 pip install websocket-client

实战演示

  • 连接 websoket 服务器
import logging
from websocket import create_connection
logger = logging.getLogger(__name__)
url = 'ws://echo.websocket.org/' #一个在线的回环websocket接口,必须以websocket的方式连接后访问,无法直接在网页端输入该地址访问
wss = create_connection(url, timeout=timeout)

  • 发送 websocket 消息
wss.send('Hello World')

  • 接收 websocket 消息
res = wss.recv()
logger.info(res)

  • 关闭 websocket 连接
wss.close()

  • websocket 第三方库的调用不支持直接发送除字符串外的其他数据类型,所以在发送请求之前需要将 Python 结构化的格式,转换为成为字符串类型或者 json 字符串后,再发起 websocket 的接口请求
#待发送的数据体格式为:
data= {
    "a" : "abcd",
	    "b" : 123
		    }
			# 发送前需要把数据处理成 json 字符串
			new_data=json.dumps(data,ensure_ascii=False)
			wss.send(new_data)
			
			```
			
			- 接收的数据体的处理:如果接口定义为 json 的话,由于数据的传输都是字符串格式的,需要对接收的数据体进行转换操作
			```
			#    接收的数据体的格式也为字符串
			logger.info(type(res)) # <class 'str'>
			
			```
			
			
			对于响应内容进行格式转换处理:
			
			```
			def load_json(base_str):
			    if isinstance(base_str, str):
				        try:
						            res = json.loads(base_str)
									            return load_json(res)
												        except JSONDecodeError:
														            return base_str
																	    elif isinstance(base_str, list):
																		        res = []
																				        for i in base_str:
																						            res.append(load_json(i))
																									        return res
																											    elif isinstance(base_str, dict):
																												        for key, value in base_str.items():
																														            base_str[key] = load_json(value)
																																	        return base_str
																																			    return base_str
																																				
																																				```
																																				
																																				- websocket 接口自动化测试,二次封装 demo 展示
																																				web_socket_util.py 封装 websocket 接口通用操作:
																																				```
																																				import logging
																																				import json
																																				from websocket import create_connection
																																				logger = logging.getLogger(__name__)
																																				class WebsocketUtil():
																																				
																																				    def conn(self, uri, timeout=3):
																																					        '''
																																							        连接web服务器
																																									        :param uri: 服务的url
																																											        :param timeout: 超时时间
																																													        :return:
																																															        '''
																																																	        self.wss = create_connection(uri, timeout=timeout)
																																																			
																																																			    def send(self, message):
																																																				        '''
																																																						        发送请求数据体
																																																								        :param message: 待发送的数据信息
																																																										        :return:
																																																												        '''
																																																														        if not isinstance(message, str):
																																																																            message = json.dumps(message)
																																																																			        return self.wss.send(message)
																																																																					
																																																																					    def load_json(self, base_str):
																																																																						        '''
																																																																								        进行数据体的处理
																																																																										        :param base_str: 待处理的数据体
																																																																												        :return:
																																																																														        '''
																																																																																        if isinstance(base_str, str):
																																																																																		            try:
																																																																																					                res = json.loads(base_str)
																																																																																									                return self.load_json(res)
																																																																																													            except JSONDecodeError:
																																																																																																                return base_str
																																																																																																				        elif isinstance(base_str, list):
																																																																																																						            res = []
																																																																																																									            for i in base_str:
																																																																																																												                res.append(self.load_json(i))
																																																																																																																            return res
																																																																																																																			        elif isinstance(base_str, dict):
																																																																																																																					            for key, value in base_str.items():
																																																																																																																								                base_str[key] = self.load_json(value)
																																																																																																																												            return base_str
																																																																																																																															        return base_str
																																																																																																																																	
																																																																																																																																	    def recv(self, timeout=3):
																																																																																																																																		        '''
																																																																																																																																				        接收数据体信息,并调用数据体处理方法处理响应体
																																																																																																																																						        :param timeout: 超时时间
																																																																																																																																								        :return:
																																																																																																																																										        '''
																																																																																																																																												        if isinstance(timeout, dict):
																																																																																																																																														            timeout = timeout["timeout"]
																																																																																																																																																	        try:
																																																																																																																																																			            self.settimeout(timeout)
																																																																																																																																																						            recv_json = self.wss.recv()
																																																																																																																																																									            all_json_recv = self.load_json(recv_json)
																																																																																																																																																												            self._set_response(all_json_recv)
																																																																																																																																																															            return all_json_recv
																																																																																																																																																																		        except WebSocketTimeoutException:
																																																																																																																																																																				            logger.error(f"已经超过{timeout}秒没有接收数据啦")
																																																																																																																																																																							
																																																																																																																																																																							    def settimeout(self, timeout):
																																																																																																																																																																								        '''
																																																																																																																																																																										        设置超时时间
																																																																																																																																																																												        :param timeout: 超时时间
																																																																																																																																																																														        :return:
																																																																																																																																																																																        '''
																																																																																																																																																																																		        self.wss.settimeout(timeout)
																																																																																																																																																																																				
																																																																																																																																																																																				    def recv_all(self, timeout=3):
																																																																																																																																																																																					        '''
																																																																																																																																																																																							        接收多个数据体信息,并调用数据体处理方法处理响应体
																																																																																																																																																																																									        :param timeout: 超时时间
																																																																																																																																																																																											        :return:
																																																																																																																																																																																													        '''
																																																																																																																																																																																															        if isinstance(timeout, dict):
																																																																																																																																																																																																	            timeout = timeout["timeout"]
																																																																																																																																																																																																				        recv_list = []
																																																																																																																																																																																																						        while True:
																																																																																																																																																																																																								            try:
																																																																																																																																																																																																											                self.settimeout(timeout)
																																																																																																																																																																																																															                recv_json = self.wss.recv()
																																																																																																																																																																																																																			                all_json_recv = self.load_json(recv_json)
																																																																																																																																																																																																																							                recv_list.append(all_json_recv)
																																																																																																																																																																																																																											                logger.info(f"all::::: {all_json_recv}")
																																																																																																																																																																																																																															            except WebSocketTimeoutException:
																																																																																																																																																																																																																																		                logger.error(f"已经超过{timeout}秒没有接收数据啦")
																																																																																																																																																																																																																																						                break
																																																																																																																																																																																																																																										        self._set_response(recv_list)
																																																																																																																																																																																																																																												        return recv_list
																																																																																																																																																																																																																																														
																																																																																																																																																																																																																																														    def close(self):
																																																																																																																																																																																																																																															        '''
																																																																																																																																																																																																																																																	        关闭连接
																																																																																																																																																																																																																																																			        :return:
																																																																																																																																																																																																																																																					        '''
																																																																																																																																																																																																																																																							        return self.wss.close()
																																																																																																																																																																																																																																																									
																																																																																																																																																																																																																																																									    def _set_response(self, response):
																																																																																																																																																																																																																																																										        self.response = response
																																																																																																																																																																																																																																																												
																																																																																																																																																																																																																																																												    def _get_response(self) -> list:
																																																																																																																																																																																																																																																													        return self.response
																																																																																																																																																																																																																																																															
																																																																																																																																																																																																																																																															```
																																																																																																																																																																																																																																																															
																																																																																																																																																																																																																																																															test_case.py websocket 接口自动化测试用例:
																																																																																																																																																																																																																																																															```
																																																																																																																																																																																																																																																															class TestWsDemo:
																																																																																																																																																																																																																																																															
																																																																																																																																																																																																																																																															    def setup(self):
																																																																																																																																																																																																																																																																        url = 'ws://echo.websocket.org/'
																																																																																																																																																																																																																																																																		        self.wss = WebsocketUtil()
																																																																																																																																																																																																																																																																				        self.wss.conn(url)
																																																																																																																																																																																																																																																																						
																																																																																																																																																																																																																																																																						    def teardown(self):
																																																																																																																																																																																																																																																																							        self.wss.close()
																																																																																																																																																																																																																																																																									
																																																																																																																																																																																																																																																																									    def test_demo(self):
																																																																																																																																																																																																																																																																										        data = {"a": "hello", "b": "world"}
																																																																																																																																																																																																																																																																												        self.wss.send(data)
																																																																																																																																																																																																																																																																														        res = self.wss.recv()
																																																																																																																																																																																																																																																																																        assert 'hello' == res['a']
																																																																																																																																																																																																																																																																																		
																																																																																																																																																																																																																																																																																		```
																																																																																																																																																																																																																																																																																		
																																																																																																																																																																																																																																																																																		WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
																																																																																																																																																																																																																																																																																		
																																																																																																																																																																																																																																																																																		在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

最后感谢每一个认真阅读我文章的人,看着粉丝一路的上涨和关注,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走!

软件测试面试文档

我们学习必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有字节大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。
 

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1103725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring()

一、导学 二、 1.入门程序 spring快照版本是最新的版本&#xff0c;未发布。需要用到<repository></> 下面这个不需要配置仓库&#xff0c;直接写在依赖中就行 引入spring相关依赖 <?xml version"1.0" encoding"UTF-8"?> <proje…

QT实现凸凹边形等距缩放

参考&#xff1a;https://blog.csdn.net/weixin_39383896/article/details/99615371和https://blog.csdn.net/qq_15821883/article/details/117421400 代码逻辑思路&#xff1a; 1、获取向量AB、BC的坐标。 2、计算向量AB、BC的长度。 3、根据点乘获取cosθ大小。 4、根据cosθ…

LeetCode 高频题目分类列表

&#x1f4a1; LeetCode 高频面试题分类列表&#xff0c;总共24类&#xff0c;312道题目&#xff01; 图 133.克隆图 207.课程表 210.课程表 II 399.除法求值 547.省份数量 684.冗余连接 743.网络延迟时间 785.判断二分图 堆 215.数组中的第K个最大元素 295.数据流的中位数 26…

【性能测试篇1】初识性能测试

目录 性能测试定义 性能测试和功能测试有什么区别 测试工具上面&#xff1a; 特殊业务场景下&#xff1a; 性能测试常见概念&#xff1a; ①用户相关&#xff1a; 1.1并发用户数&#xff1a; 1.2在线用户数&#xff1a; 1.3系统用户数量&#xff1a; ②响应时间相关&…

Golang操作数据库简单示例

目录 准备工作准备数据创建项目连接数据库查询数据修改数据插入数据删除数据释放资源完整代码最终执行结果 准备工作 在开始之前&#xff0c;你需要确保自己安装了Golang的编程环境&#xff0c;安装MySQL数据库&#xff0c;有一个可以用于编写代码的编辑器或IDE工具。我在这里…

大型公共建筑能耗监测与信息管理系统研究及产品选型

摘要&#xff1a;文章通过阐述大型公共建筑能耗现状&#xff0c;突出大型公共建筑实施节能监管的必要性&#xff0c;并在系统总结运用技术手段实施建筑能耗监测的基础上&#xff0c;介绍了江苏省建筑能耗监测系统研究过程中的技术创新和应用情况。 关键词&#xff1a;公共建筑…

深度学习——含并行连接的网络(GoogLeNet)

深度学习——含并行连接的网络&#xff08;GoogLeNet&#xff09; 文章目录 前言一、Inception块二、GoogLeNet模型三、训练模型总结 前言 上篇文章中学习了NIN&#xff0c;而GoogLeNet吸收了NIN中串联网络的思想&#xff0c;并在此基础上做了改进。该论文中的一个观点是&…

PyQt5基础学习(一)

从PyQt5最基础的内容开始学习 import sysfrom PyQt5.QtGui import QIcon from PyQt5.QtWidgets import QWidget, QApplication, QTextBrowserclass Example(QWidget):def __init__(self):super().__init__()self.initUI()def initUI(self):self.resize(300, 300)self.setWind…

数据结构之单链表的模拟实现

&#x1f495;"你笑的次数越多越好&#xff0c;因为你只有用笑才能不怀恶意地消灭罪恶。"&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;数据结构之单链表的模拟实现 MyArrayList /*** Created with IntelliJ IDEA.* Description:* User: 绿字* …

F5.5G落进现实:目标网带来的光之路

数字化与智能化的世界将走向何方&#xff1f;这个问题有着非常复杂的答案&#xff0c;但其中有一个答案已经十分清晰。那就是智能化的下一步&#xff0c;必将走向泛在万兆的世界。 网络是算力联接的底座&#xff0c;是智能演化的基础。纵观每一代数字化升级&#xff0c;都可以发…

2023秋招华为技术岗线上面试经历

2023/10/16 个人情况&#xff1a;博士&#xff0c;预计2024年毕业&#xff0c;参加了2023秋招&#xff0c;华为应聘到3面主管面。 下面按招聘流程顺序&#xff0c;记录我的面试经历。因为想写详细一点的独立文章&#xff0c;所以想来想去还是放到CSDN上。 1. 宣讲会 宣讲会…

kr 第三阶段(一)16 位汇编

为什么要学习 16 位汇编&#xff1f; 16 位汇编包含了大部分 32 位汇编的知识点。有助于在学习内核的两种模式。 实模式&#xff1a;访问真实的物理内存保护模式&#xff1a;访问虚拟内存 有助于提升调试能力&#xff0c;调试命令与 OllyDbg 和 WinDebug 通用。可以学习实现反…

spring boot整合MongoDB 一

MongoDB介绍 应用场景 传统的关系型数据库&#xff08;如MySQL&#xff09;&#xff0c;在数据操作的“三高”需求以及应对Web2.0的网站需求面前&#xff0c;显得力不从心。 解释&#xff1a;“三高”需求&#xff1a; • High performance - 对数据库高并发读写的需求。 • …

制作.a静态库 (封盒)

//云库房间 1.GitHub上创建开源框架项目须包含文件&#xff1a; LICENSE:开源许可证&#xff1b;README.md:仓库说明文件&#xff1b;开源项目&#xff1b;(登录GitHub官网) 2. 云仓储库构建成功(此时云库中没有内容三方框架)&#xff01;&#xff01;&#xff01; 3. 4.5. //…

数仓建设(二)

1) 指标梳理 指标口径的不一致使得数据使用的成本极高&#xff0c;经常出现口径打架、反复核对数据的问题。在数据治理中&#xff0c;我们将需求梳理到的所有指标进行进一步梳理&#xff0c;明确其口径&#xff0c;如果存在两个指标名称相同&#xff0c;但口径不一致&#xff0…

项目管理之生命周期管理

项目生命周期管理矩阵是项目管理中一个重要的概念&#xff0c;它包括了项目从准备到收尾的各个阶段。项目生命周期管理矩阵以四个主要管理阶段为基础&#xff0c;分别为准备阶段、启动阶段、执行阶段和收尾阶段。这四个阶段在项目管理中有着明确的目标和职责&#xff0c;贯穿了…

【LeetCode】35. 搜索插入位置

1 问题 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 示例 1: 输入: nums [1,3,5,6], target 5 输出: 2 示例…

GPT4 Plugins 插件 WebPilot 生成抖音文案

1. 生成抖音文案 1.1. 准备1篇优秀的抖音文案范例 1.2. Promept公式 你是一个有1000万粉丝的抖音主播&#xff0c; 请模仿下面的抖音脚本文案&#xff0c;重新改与一篇文章改写成2分钟的抖音视频脚本&#xff0c; 要求前一部分是十分有争议性的内容&#xff0c;并且能够引发…

linux进程间通讯--信号量

1.认识信号量 方便理解&#xff1a;信号量就是一个计数器。当它大于0能用&#xff0c;小于等于0&#xff0c;用不了&#xff0c;这个值自己给。 2.特点&#xff1a; 信号量用于进程间同步&#xff0c;若要在进程间传递数据需要结合共享内存。信号量基于操作系统的 PV 操作&am…

GitHub仓库的README文件无法显示图片问题-非域名污染原因

之前上自己仓库就偶然发现图片不显示现象&#xff0c;当时以为是网络问题就没有留意这事。但是一直不显示就有问题了&#xff01;于是网上搜了一遭&#xff0c;看见大家遇到此现象的原因普遍归于DNS污染1而我的问题原来是MarkDown格式&#xff01; 在图片语法前不要加分区语法…