性能测试 - Locust WebSocket client

news2025/1/16 15:52:25

Max.Bai

2024.10

0. 背景

Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。

1. WebSocket test Client
“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optional

import websocket
from locust import events

logger = logging.getLogger(__name__)


class WebSocketClient:
    def __init__(self, host: str, log_messages: bool = False):
        self._host: str = host
        self._id: str = secrets.token_hex(8)
        self._alias: Optional[str] = None
        self._ws: Optional[websocket.WebSocketApp] = None

        self.log_messages = log_messages
        self.count_recv_type = False
        self.heartbeat_auto_respond = False
        self._recv_messages: list = []
        self.messages: list = []
        self._sent_messages: list = []

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, type, value, traceback):
        self.disconnect()

    @property
    def tag(self) -> str:
        tag = f"{self._host} <{self._id}>"
        if self._alias:
            tag += f"({self._alias})"
        return tag

    def connect(
        self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None
    ):
        if not self._ws:
            self._alias = alias
            self._ws = websocket.WebSocketApp(
                url=self._host,
                header=headers,
                on_open=self._on_open,
                on_message=on_message if on_message else self._on_message,
                on_close=self._on_close,
            )
            thread = threading.Thread(target=self._ws.run_forever)
            thread.daemon = True
            thread.start()
            time.sleep(3)
        else:
            logger.warning("An active WebSocket connection is already established.")

    def is_connected(self) -> bool:
        return self._ws is not None

    def disconnect(self):
        if self._ws:
            self._ws.close()
            self._alias = None
        else:
            logger.warning("No active WebSocket connection established.")

    def _on_open(self, ws):
        logger.debug(f"[WebSocket] {self.tag} connected.")
        events.request.fire(
            request_type="ws_client",
            name="connect",
            response_time=0,
            response_length=0,
        )

    def _on_message(self, ws, message):
        recv_time = time.time()
        recv_time_ms = int(recv_time * 1000)
        recv_time_ns = int(recv_time * 1000000)
        logger.debug(f"[WebSocket] {self.tag} message received: {message}")
        if self.log_messages:
            self._recv_messages.append(message)
        self.messages.append(message)

        # public/respond-heartbeat
        if self.heartbeat_auto_respond:
            if "public/heartbeat" in message:
                self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))

        if self.count_recv_type:
            try:
                msg = json.loads(message)
                id = str(msg.get("id", 0))
                if len(id) == 13:
                    resp_time = recv_time_ms - int(id)
                elif len(id) == 16:
                    resp_time = (recv_time_ns - int(id)) / 1000
                elif len(id) > 13:
                    resp_time = recv_time_ms - int(id[:13])
                else:
                    resp_time = 0
                method = msg.get("method", "unknown")
                code = msg.get("code", "unknown")
                error = msg.get("message", "unknown")
                # send_time = int(msg.get("nonce", 0))
                if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:
                    events.request.fire(
                        request_type="ws_client",
                        name=f"recv {method}",
                        response_time=0,
                        response_length=len(msg),
                    )
                elif code == 0:
                    events.request.fire(
                        request_type="ws_client",
                        name=f"recv {method} {code}",
                        # response_time=recv_time - send_time,
                        response_time=resp_time,
                        response_length=len(msg),
                    )
                else:
                    events.request.fire(
                        request_type="ws_client",
                        name=f"recv {method} {code}",
                        response_time=resp_time,
                        response_length=len(msg),
                        exception=error,
                    )
            except Exception as e:
                events.request.fire(
                    request_type="ws_client",
                    name="recv error",
                    response_time=0,
                    response_length=len(msg),
                    exception=str(e),
                )

    def _on_close(self, ws, close_status_code, close_msg):
        logger.debug(f"[WebSocket] {self.tag} closed.")
        self._ws = None

        events.request.fire(
            request_type="ws_client",
            name="close",
            response_time=0,
            response_length=0,
        )

    def set_on_message(self, on_message: Callable):
        self._ws.on_message = on_message

    def send(self, message: str):
        if self._ws:
            self._ws.send(data=message)
            if self.log_messages:
                self._sent_messages.append(message)
            logger.debug(f"[WebSocket] {self.tag} message sent: {message}")
        else:
            logger.warning(f"No active [WebSocket] {self.tag} connection established.")
            raise ConnectionError("No active [WebSocket] connection established.")

    def clear(self):
        self._recv_messages = []
        self._sent_messages = []
        self.messages = []

    def expect_messages(
        self,
        matcher: Callable[..., bool],
        count: int = 1,
        timeout: int = 10,
        interval: int = 1,
    ) -> list:
        """Expect to receive one or more filtered messages.

        Args:
            matcher (Callable): A matcher function used to filter the received messages.
            count (int, optional): Number of messages to be expected before timeout. Defaults to 1.
            timeout (int, optional): Timeout in seconds. Defaults to 10.
            interval (int, optional): Interval in seconds. Defaults to 1.

        Returns:
            list: A list of messages filtered by the matcher.
        """

        deadline: float = time.time() + timeout
        result: list = []  # messages filtered by the matcher
        seen: list = []  # messages already seen by the matcher to be excluded from further matching

        while time.time() < deadline:
            snapshot: list = [*self._recv_messages]

            for element in seen:
                if element in snapshot:
                    snapshot.remove(element)

            result.extend(filter(matcher, snapshot))
            if len(result) >= count:
                break

            seen.extend(snapshot)
            time.sleep(interval)

        if len(result) < count:
            logger.warning(
                f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages."
            )

        return result
2. 如何使用
class PrivateWsUser(User):

    def on_start(self):
        self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)
        self.ws_client.connect()


    @task
    def send_hello()
        self.ws_client.send("hello world")
    
    
            
3. 扩展

可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2277605.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自动化办公|xlwings简介

xlwings 是一个开源的 Python 库&#xff0c;旨在实现 Python 与 Microsoft Excel 的无缝集成。它允许用户使用 Python 脚本自动化 Excel 操作&#xff0c;读取和写入数据&#xff0c;执行宏&#xff0c;甚至调用 VBA 脚本。这使得数据分析、报告生成和其他与 Excel 相关的任务…

Dify应用-工作流

目录 DIFY 工作流参考 DIFY 工作流 2025-1-15 老规矩感谢参考文章的作者,避免走弯路。 2025-1-15 方便容易上手 在dify的一个桌面上,添加多个节点来完成一个任务。 每个工作流必须有一个开始和结束节点。 节点之间用线连接即可。 每个节点可以有输入和输出 输出类型有,字符串,…

《C++11》并发库:简介与应用

在C11之前&#xff0c;C并没有提供原生的并发支持。开发者通常需要依赖于操作系统的API&#xff08;如Windows的CreateThread或POSIX的pthread_create&#xff09;或者第三方库&#xff08;如Boost.Thread&#xff09;来创建和管理线程。这些方式存在以下几个问题&#xff1a; …

建筑综合布线可视化管理

随着数字化转型的加速&#xff0c;越来越多的业务应用依赖网络来实现&#xff0c;综合布线系统作为网络基础设施&#xff0c;加强对综合布线系统的管理维护是业务安全稳定运行的重要保障。传统的表格CAD图纸的综合布线管理模式&#xff0c;易造成综合布线系统线缆混乱、随意变更…

ESXi 切换硬盘直通后无法恢复的解决办法

起因&#xff1a;近日&#xff0c;准备了一块SATA固态硬盘&#xff0c;计划对现有的ESXI虚拟机上新增扩容。因为只增加一块固态&#xff0c;也不打算做raid&#xff0c;就打算把它当作单独的存储来用。在网上搜了一些方法&#xff0c;脑子一热&#xff0c;通过ESXI控制台程序&a…

计算机网络 (43)万维网WWW

前言 万维网&#xff08;World Wide Web&#xff0c;WWW&#xff09;是Internet上集文本、声音、动画、视频等多种媒体信息于一身的信息服务系统。 一、基本概念与组成 定义&#xff1a;万维网是一个分布式、联机式的信息存储空间&#xff0c;通过超文本链接的方式将分散的信息…

汽车免拆诊断案例 | 2007 款法拉利 599 GTB 车发动机故障灯异常点亮

故障现象  一辆2007款法拉利599 GTB车&#xff0c;搭载6.0 L V12自然吸气发动机&#xff08;图1&#xff09;&#xff0c;累计行驶里程约为6万km。该车因发动机故障灯异常点亮进厂检修。 图1 发动机的布置 故障诊断 接车后试车&#xff0c;发动机怠速轻微抖动&#xff0c;…

ChatGPT正在朝着全面个人助手迈出重要一步,推出了一个名为“Tasks”的新功能

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

微软震撼发布:Phi-4语言模型登陆Hugging Face

近日&#xff0c;微软公司在Hugging Face平台上正式发布了其最新的语言模型Phi-4&#xff0c;这一发布标志着人工智能技术的又一重要进步。Phi-4模型以其140亿参数的高效配置&#xff0c;在复杂推理任务中表现出色&#xff0c;特别是在数学领域&#xff0c;更是展现出了卓越的能…

RTC(Real_Time Clock)

RTC概述&#xff1a; RTC&#xff08;实时时钟&#xff0c;Real-Time Clock&#xff09;是一种用于跟踪当前日期和时间的计时设备。RTC可以是独立的芯片&#xff0c;也可以是集成在微控制器或处理器中的一个模块。RTC是现代电子设备中不可或缺的一部分&#xff0c;为各种应用提…

[leetcode]链表基础回顾

一.创建带头节点的链表 #include <iostream> #include <string> #include <algorithm> using namespace std; typedef struct Node { char ch; Node* next; }*LinkList,ListNode; void printLinkList(LinkList& head) { LinkList p head…

rclone,云存储备份和迁移的瑞士军刀,千字常文解析,附下载链接和安装操作步骤...

一、什么是rclone&#xff1f; rclone是一个命令行程序&#xff0c;全称&#xff1a;rsync for cloud storage。是用于将文件和目录同步到云存储提供商的工具。因其支持多种云存储服务的备份&#xff0c;如Google Drive、Amazon S3、Dropbox、Backblaze B2、One Drive、Swift、…

JAVA:利用 RabbitMQ 死信队列实现支付超时场景的技术指南

1、简述 在支付系统中&#xff0c;订单支付的超时自动撤销是一个非常常见的业务场景。通常用户未在规定时间内完成支付&#xff0c;系统会自动取消订单&#xff0c;释放相应的资源。本文将通过利用 RabbitMQ 的 死信队列&#xff08;Dead Letter Queue, DLQ&#xff09;来实现…

favor的本质

英文单词 favor&#xff0c;通常指一个的“喜好或偏爱”&#xff1a; favor n.赞成&#xff1b;喜爱&#xff0c;宠爱&#xff0c;好感&#xff0c;赞同&#xff1b;偏袒&#xff0c;偏爱&#xff1b;善行&#xff0c;恩惠 v.赞同&#xff1b;喜爱&#xff0c;偏爱&#xff1b…

[青基解读一] 2025年国家自然科学基金---指南解读

指南解读 1 需要2个高级专业技术职称推荐&#xff08;2个正教授&#xff09; 2 国自然、国社科只能申请一个 3 资助类别 亚类说明 附注说明 自由探索or目标导向 4 申请代码到二级 申请代码、研究方向、关键词 主要参与者不写学生仅写人数 主要参与者 在线采集、填写简历、生成…

Open FPV VTX开源之ardupilot配置

Open FPV VTX开源之ardupilot配置 1. 源由2. 配置3. 总结4. 参考资料5. 补充5.1 飞控固件版本5.2 配置Ardupilot的BF OSD5.3 OSD偏左问题 1. 源由 飞控嵌入式OSD - ardupilot配置使用ardupliot配套OSD图片。 Choose correct font depending on Flight Controller SW. ──>…

HarmonyOS应用开发者初级认证最新版– 2025/1/13号题库新版

1.欢迎各位读者&#xff0c;本文档来自鸿蒙开发学员亲测&#xff0c;最新版。&#xff08;考试时直接Ctrlf进行搜索&#xff0c;一定要认真比对答案&#xff0c;有的答案相似度很高&#xff09;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 欢迎…

视觉多模态大模型---MiniMax-vl-01---以闪电般的注意力缩放基础模型

简介 MiniMax-VL-01 是与今年1月15日由上海稀宇科技有限公司&#xff08;MiniMax&#xff09;发布并开源的一款视觉多模态大模型&#xff0c;它与基础语言大模型 MiniMax-Text-01 一同构成了 MiniMax-01 系列。这款模型的设计初衷是为了应对日益增长的长上下文处理需求&#x…

CF 230A.Dragons(Java实现)

题目分析 &#xff08;桐老爷&#xff0c;泪目&#xff09;题目讲很多字&#xff0c;其实就是打怪升级&#xff0c;初始战斗力>龙的战斗力就能击败龙并炼化经验增加战斗力&#xff0c;然后打下一条龙&#xff0c;如果打不过了就寄 思路分析 首先我还是想到键值对&#xff0…

【落羽的落羽 C语言篇】文件操作

文章目录 一、文件的概念和分类1. 概念和分类2. 文件名3. 数据文件 三、文件操作1. 文件的打开和关闭1.1 流1.2 文件指针1.3 文件的打开和关闭 2. 文件的顺序读写3. 文件的随机读写4. 文件读取的判定5. 文件缓冲区 一、文件的概念和分类 1. 概念和分类 文件是用来保存数据的。…