用Python进行websocket接口测试

news2024/10/6 10:33:24

这篇文章主要介绍了用Python进行websocket接口测试,帮助大家更好的理解和使用python,感兴趣的朋友可以了解下

我们在做接口测试时,除了常见的http接口,还有一种比较多见,就是socket接口,今天讲解下怎么用Python进行websocket接口测试。

Socket

Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯。

Python 中,我们使用 socket 模块的 socket 函数来创建一个 socket 对象。语法格式如下:

socket.socket ( family ,type ,proto)

现在大多数用的都是websocket,那我们就先来安装一下websocket的安装包。

pip install websocket-client


安装完之后,我们就开始我们的websocket之旅了。

在websocket里,我们有常用的这几个方法:

on_message方法

def on_message(ws, message):
  print(message)

on_message是用来接受消息的,server发送的所有消息都可以用on_message这个方法来收取。

on_error方法:

def on_error(ws, error):
  print(error)

这个方法是用来处理错误异常的,如果一旦socket的程序出现了通信的问题,就可以被这个方法捕捉到。

on_open方法:

def on_open(ws):
  def run(*args):
    for i in range(30):
      # send the message, then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()

on_open方法是用来保持连接的,上面这样的一个例子,就是保持连接的一个过程,每隔一段时间就会来做一件事,他会在30s内一直发送hello。最后停止。

on_close方法:

def on_close(ws):
  print("### closed ###")

onclose主要就是关闭socket连接的。

如何创建一个websocket应用:

ws = websocket.WebSocketApp("wss://echo.websocket.org")

括号里面就是你要连接的socket的地址,在WebSocketApp这个实例化的方法里面还可以有其他参数,这些参数就是我们刚刚介绍的这些方法。

ws = websocket.WebSocketApp("ws://echo.websocket.org/",
              on_message=on_message,
              on_error=on_error,
              on_close=on_close)

完整代码:

import websocket
from threading import Thread
import time
import sys

def on_message(ws, message):
  print(message)

def on_error(ws, error):
  print(error)

def on_close(ws):
  print("### closed ###")

def on_open(ws):
  def run(*args):
    for i in range(3):
      # send the message, then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()


if __name__ == "__main__":

  websocket.enableTrace(True)
  host = "ws://echo.websocket.org/"
  ws = websocket.WebSocketApp(host,
                on_message=on_message,
                on_error=on_error,
                on_close=on_close)
  ws.on_open = on_open
  ws.run_forever()

也可以使用python进行websocket的客户端压力测试。


# -*- coding:utf-8 -*-
# __author__ == 'chenmingle'
 
import websocket
import time
import threading
import json
import multiprocessing
import uuid
from threadpool import ThreadPool, makeRequests
 
# 修改成自己的websocket地址
WS_URL = "xxxx"
# 定义进程数
processes = 4
# 定义线程数(每个文件可能限制1024个,可以修改fs.file等参数)
thread_num = 700
index = 1
 
 
def on_message(ws, message):
 # print(message)
 pass
 
 
def on_error(ws, error):
 print(error)
 pass
 
 
def on_close(ws):
 # print("### closed ###")
 pass
 
 
def on_open(ws):
 global index
 index = index + 1
 
 def send_thread():
  # 设置你websocket的内容
  # 每隔10秒发送一下数据使链接不中断
  while True:
   ws.send(u'hello服务器')
   time.sleep(10)
 
 t = threading.Thread(target=send_thread)
 t.start()
 
 
def on_start(num):
 time.sleep(5)
 # websocket.enableTrace(True)
 ws = websocket.WebSocketApp(WS_URL + str(num),
        on_message=on_message,
        on_error=on_error,
        on_close=on_close)
 ws.on_open = on_open
 ws.run_forever()
 
 
def thread_web_socket():
 # 线程池
 pool_list = ThreadPool(thread_num)
 num = list()
 # 设置开启线程的数量
 for ir in range(thread_num):
  num.append(ir)
 requests = makeRequests(on_start, num)
 [pool_list.putRequest(req) for req in requests]
 pool_list.wait()
 
 
if __name__ == "__main__":
 # 进程池
 pool = multiprocessing.Pool(processes=processes)
 # 设置开启进程的数量
 for i in xrange(processes):
  pool.apply_async(thread_web_socket)
 pool.close()
 pool.join()

以上就是用Python进行websocket接口测试的详细内容。


最后

如果你想学习自动化测试,那么下面这套视频应该会帮到你很多

如何逼自己1个月学完自动化测试,学完即就业,小白也能信手拈来,拿走不谢,允许白嫖....

最后我这里给你们分享一下我所积累和整理的一些文档和学习资料,有需要直接领取就可以了!


以上内容,对于软件测试的朋友来说应该是最全面最完整的备战仓库了,为了更好地整理每个模块,我也参考了很多网上的优质博文和项目,力求不漏掉每一个知识点,很多朋友靠着这些内容进行复习,拿到了BATJ等大厂的offer,这个仓库也已经帮助了很多的软件测试的学习者,希望也能帮助到你。

​​

​​​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1118499.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(一)docker:建立oracle数据库

前言,整个安装过程主要根据docker-images/OracleDatabase/SingleInstance /README.md ,里边对如何制作容器讲的比较清楚,唯一问题就是都是英文,可以使用谷歌浏览器自动翻译成中文,自己再对照英文相互参照来制作提前准备…

P1 缓冲池管理

文章目录 Task1 LRU-K 替换策略Task2 缓冲池管理Task3 读/写页面保护 Task1 LRU-K 替换策略 LRU-K算法:当访问次数达到K次后,将数据索引从历史队列移到缓存队列中(缓存队列时间降序);缓存数据队列中被访问后重新排序&…

Python--循环中的两大关键词 break 与 continue

在Python循环中,经常会遇到两个常见的关键词:break 与 continue break:代表终止整个循环结构 continue:代表中止当前本次循环,继续下一次循环 break: 英 /breɪk/ v. 打破,打碎&#xff0c…

c语言练习95:练习使用双向链表(实现增删改查)

练习使用双向链表&#xff08;实现增删改查&#xff09; 是指针指向了一块被释放的空间 解决方案&#xff1a; plistNULL List.h #pragma once #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> #include<assert.h> #include<…

java实现多线程下载器

前言&#xff1a; &#x1f44f;作者简介&#xff1a;我是笑霸final&#xff0c;一名热爱技术的在校学生。 &#x1f4dd;个人主页&#xff1a;个人主页1 || 笑霸final的主页2 &#x1f4d5;系列专栏&#xff1a;项目专栏 &#x1f4e7;如果文章知识点有错误的地方&#xff0c;…

2316. 统计无向图中无法互相到达点对数(leetcode)并查集-------------------Java实现

2316. 统计无向图中无法互相到达点对数&#xff08;leetcode&#xff09;并查集-------------------Java实现 题目表述 给你一个整数 n &#xff0c;表示一张 无向图 中有 n 个节点&#xff0c;编号为 0 到 n - 1 。同时给你一个二维整数数组 edges &#xff0c;其中 edges[i…

使用IDEA时遇到java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver报错的解决方案

目录 一、项目环境二、可能原因解决方案1. 没有导入mysql的jar包2. mysql的jar包版本问题 一、项目环境 二、可能原因解决方案 1. 没有导入mysql的jar包 先检查项目lib文件夹下有没有mysql的jar包&#xff0c;没有就把jar包复制到该目录下 再检查项目结构中有没有导入mysql…

使用vscode搭建虚拟机

首先vscode插件安装 名称: Remote - SSH ID: ms-vscode-remote.remote-ssh 说明: Open any folder on a remote machine using SSH and take advantage of VS Codes full feature set. 版本: 0.51.0 VS Marketplace 链接: https://marketplace.visualstudio.com/items?it…

掌握 C++ 中 static 关键字的多种使用场景

static是什么 在最开始C中引入了static关键字可以用于修饰变量和函数&#xff0c;后来由于C引入了class的概念&#xff0c;现在static可以修饰的对象分为以下5种&#xff1a; 成员变量&#xff0c;成员函数&#xff0c;普通函数&#xff0c;局部变量&#xff0c; 全局变量 s…

vmware安装 Rocky9(自定义分区安装)

一、下载镜像 访问官网&#xff0c;下载dvd的镜像 Download Rocky | Rocky Linuxhttps://rockylinux.org/download 二、新建vmware虚拟机 1、vmware尽量选择vmware17 2、下一步 3、稍后安装 4、选择系统类型&#xff1a;red hat9 5、自定义安装位置 6、根据电脑配置&#…

动画系统的前世今生(一)

掐指一算&#xff0c;五年没更新过我的CSDN账号啦&#xff0c;方向也从人工智能变成了计算机图形学&#xff0c;当然也依旧会关注AI的发展&#xff0c;之前在知乎上写了一些文章[传送门]&#xff0c;后续也会逐渐同步到CSDN上&#xff5e; 这个系列将包含五篇文章&#xff0c;内…

allegro中shape的一些基本操作(二)——铜皮添加网络、合并shape

给铜皮&#xff08;shape&#xff09;添加网络 例如下图&#xff0c;想要给这个新添加的shape添加到GND的网络&#xff0c;可以先选中这个shape&#xff0c;让其进入shape编辑模式&#xff0c;然后再右键点击&#xff0c;最后再PCB上点击GND网络 选中铜皮后在铜皮上右键&…

字典树学习笔记

trie 树&#xff0c;即字典树&#xff0c;是一种可以实现 O ( S ) O(S) O(S) 的预处理&#xff08; S S S 为所有字符串的长度和&#xff09;&#xff0c; O ( N ) O(N) O(N)&#xff08; N N N 为查询的字符串的长度&#xff09;的查询的数据结构。 举个栗子&#xff0c;对于…

Rust-后端服务调试入坑记

这篇文章收录于Rust 实战专栏。这个专栏中的相关代码来自于我开发的笔记系统。它启动于是2023年的9月14日。相关技术栈目前包括&#xff1a;Rust&#xff0c;Javascript。关注我&#xff0c;我会通过这个项目的开发给大家带来相关实战技术的分享。 如果你关注过我的Rust 实战里…

Baize_h1mini六足机器人零件准备

导航在这里&#xff1a; Baize_H1mini六足机器人制作教程&#xff08;开源&#xff09;_ros 六足机器人教程-CSDN博客 你现在在地图的红色字体位置&#xff08;走到终点就制作完成了&#xff09;&#xff1a; 重要提示&#xff1a;自己使用打印机打印零件时&#xff0c;对于新…

Pandas数据处理分析系列5-数据如何提取

Pandas-数据提取 ①通过索引提取数据 # 提取前10行数据 df.head(10) # 提取末尾10行数据 df.tail(10) # 通过列名提取数据 df[列名’] # 通过布尔条件提取数据 df[df[列名] > 10] # 多条件过滤 df[(df[列名1] > 10) & (df[列名2] < 20)] df1=pd.read_excel(&quo…

14.Tensor Product:Covector-Covector Pairs

该文张量积仍使用一些非标准的符号。 左边的将是本文使用的 &#xff0c; 右边的是标准。 Covector-Covector Pairs 是Bilinear Forms 在上一节中&#xff0c; Linear Maps linear combinations of vector-covector pairs 这种将向量和协向量组合在一起的过程叫 张量积 这…

基于springboot实现java学习平台项目【项目源码+论文说明】计算机毕业设计

基于springboot实现java学习平台演示 摘要 在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括学习平台的网络应用&#xff0c;在外国学习平台已经是很普遍的方式&#xff0c;不过国内的管理平台可能还处于起步阶段。学习平台具…

Leetcode 第 360 场周赛题解

Leetcode 第 360 场周赛题解 Leetcode 第 360 场周赛题解题目1&#xff1a;2833. 距离原点最远的点思路代码复杂度分析 题目2&#xff1a;2834. 找出美丽数组的最小和思路代码复杂度分析 题目3&#xff1a;2835. 使子序列的和等于目标的最少操作次数思路代码复杂度分析 题目4&a…

使用GoogleNet网络实现花朵分类

一.数据集准备 新建一个项目文件夹GoogleNet&#xff0c;并在里面建立data_set文件夹用来保存数据集&#xff0c;在data_set文件夹下创建新文件夹"flower_data"&#xff0c;点击链接下载花分类数据集https://storage.googleapis.com/download.tensorflow.org/exampl…