基于Locust实现MQTT协议服务的压测脚本

news2025/2/24 6:28:31

一、背景简介

业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。

目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。

本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。

捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):

  1. 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
  2. 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
  3. 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。

其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。

二、技术选型

这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。

所以就继续编码好了,仍然首选python,想到了locust库,后来看官方文档的时候,看到locust也针对mqtt协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust+paho.mqtt的组合来实现本次的负载脚本。

三、代码编写

1. 脚本代码

暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。

脚本目前做了这些事情:

  • 从db中查询有效可用的所有测试车辆信息数据
  • 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
  • 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
  • 脚本统计连接数、请求数、响应时间等信息写到报表中
  • 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv
import datetime
import queue
import os
import sys
import time
import ssl

from paho.mqtt import client as mqtt_client

# 根据不同系统进行路径适配
if os.name == "nt":
    path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    sys.path.insert(0, path)
    from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
    sys.path.append("/app/qa_test_app/")
    from GB_test.utils.mysql_operating import DB

from locust import User, TaskSet, events, task, between, run_single_user


BROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # 超时时间
TEST_TOPIC = "test_topic"

TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用来publish的测试数据,仅示意

BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 业务需要转换成 byte 类型后再发送

# 创建队列
client_queue = queue.Queue()

# 连接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:
    # 把可用的车辆信息存到队列中去
    client_queue.put(t)


def fire_success(**kwargs):
    """请求成功时调用"""
    events.request.fire(**kwargs)


def calculate_resp_time(t1, t2):
    """计算响应时间"""
    return int((t2 - t1) * 1000)


class MQTTMessage:
    """已发送的消息实体类"""
    def __init__(self, _type, qos, topic, payload, start_time, timeout):
        self.type = _type,
        self.qos = qos,
        self.topic = topic
        self.payload = payload
        self.start_time = start_time
        self.timeout = timeout


# 统计总共发送成功的消息数量
total_published = 0
disconnect_record_list = []  # 定义存放连接断开的记录的列表容器


class PublishTask(TaskSet):

    @task
    def task_publish(self):
        self.client.loop_start()
        topic = TEST_TOPIC
        payload = BYTES_DATA
        # 记录发送的开始时间
        start_time = time.time()
        mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
        published_mid = mqtt_msg_info.mid
        # 将发送成功的消息内容,放入client实例的 published_message 字段
        self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
                                                                   0,
                                                                   topic,
                                                                   payload,
                                                                   start_time,
                                                                   PUBLISH_TIMEOUT)
        # 发送成功回调
        self.client.on_publish = self.on_publish
        # 断开连接回调
        self.client.on_disconnect = self.on_disconnect

    @staticmethod
    def on_disconnect(client, userdata, rc):
        """ broker连接断开,放入列表容器"""
        disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
        disconnect_record_list.append(disconnected_info)
        print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))

    @staticmethod
    def on_publish(client, userdata, mid):
        if mid:
            # 记录消息发送成功的时间
            end_time = time.time()
            # 从已发送的消息容器中,取出消息
            message = client.published_message.pop(mid, None)
            # 计算开始发送到发送成功的耗时
            publish_resp_time = calculate_resp_time(message.start_time, end_time)
            fire_success(
                request_type="p_success",
                name="client_id: " + str(client._client_id),
                response_time=publish_resp_time,
                response_length=len(message.payload),
                exception=None,
                context=None
            )
            global total_published
            # 成功发送累加1
            total_published += 1


class MQTTLocustUser(User):
    tasks = [PublishTask]
    wait_time = between(2, 2)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 从队列中获取客户端 username 和 client_id
        current_client = client_queue.get()

        self.client = mqtt_client.Client(current_client[1])
        self.client.username_pw_set(current_client[0], PASSWORD)
        # self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模拟client连接报错

        # 定义一个容器,存放已发送的消息
        self.client.published_message = {}

    def on_start(self):
        # 设置tls
        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
        self.client.tls_set_context(context)

        self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
        self.client.on_connect = self.on_connect

    def on_stop(self):
        print("publish 成功, 当前已成功发送数量:{}".format(total_published))
        if len(disconnect_record_list) == 0:
            print("无断开连接的client")
        else:
            # 把断开记录里的信息写入csv
            with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
                for i in disconnect_record_list:
                    writer.writerow(i)
            print("断开连接的client信息已写入csv文件")

    @staticmethod
    def on_connect(client, userdata, flags, rc, props=None):
        if rc == 0:
            print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))
            fire_success(
                request_type="c_success",
                name='count_connected',
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )
        else:
            print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))
            fire_success(
                request_type="c_fail",
                name="client_id: " + str(client._client_id),
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )


if __name__ == '__main__':
    run_single_user(MQTTLocustUser)

2. 代码分析-locust库部分

并发请求能力还是使用的locust库的能力。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承UserTaskSet即可。

User

首先是先定义User类,这里就是用来生成我要用来测试的车辆。

类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的文档使用就行。

红色框里,是User类的2个重要熟悉属性:

  • tasks: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask类下面定义的内容。
  • wait_time: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。

绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。

蓝色框里有2个方法,也是locust提供的能力:

  • on_start:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。

  • on_stop:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。

TaskSet

定义好User类,就需要来定义TaskSet类,你得告诉产生出来的用户,要干点啥。

我这根据业务需要,就是让车辆不停的像broker发送数据即可。

红色部分,同样是paho.mqtt提供的能力,会启动新的线程去执行你定义的事情。

黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo类。

注意返回的2个属性:

  • mid: 返回这个消息发送的顺序
  • rc: 表示发送的响应状态,0 就是成功

绿色部分,还记得我在上面的User类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。

2. 代码分析-paho.mqtt库部分

上面的代码已经用到了不少paho.mqtt的能力,这里再进行整体梳理下。

  • client.Client():声明一个client
  • client.username_pw_set(): 设置客户端的用户名,密码
  • client.tls_set_context: 设置ssl模式
  • client.connect(): 连接代理
  • client.publish:向代理推送消息

还用到了一些回调函数:

  • on_connect:连接操作成功时回调
  • on_publish:发布成功时回调
  • on_disconnect:客户端与代理断开连接时回调

另外还用到了一个事件函数events.request

当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event

查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。

比如我在发送回调函数里调用了该方法。

所以最后在控制台显示的报告里就有我定义的内容了。

由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。

后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。

三、小结

后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。

现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。

另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/619350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PMP考试到底难在哪里?

那么,PMP考试到底难在哪? 01涉及面广 目前PMP考试内容大部分来源于教材《PMBOK指南》和《敏捷实践指南》。 作为考试出题的知识基础《PMBOK指南》,总共有700多页,所覆盖的知识面很广。 另一方面,根据最新版考纲&am…

使用大型语言模(LLM)构建系统(二):内容审核、预防Prompt注入

今天我学习了DeepLearning.AI的 Building Systems with LLM 的在线课程,我想和大家一起分享一下该门课程的一些主要内容。 下面是我们访问大型语言模(LLM)的主要代码: import openai#您的openai的api key openai.api_key YOUR-OPENAI-API-KEY def get_…

媒体邀约分步详解

传媒如春雨,润物细无声,大家好,我是51媒体网 胡老师。 邀请媒体参加活动的有哪些步骤: 活动落地执行:在整个活动方案中,邀请媒体来报道活动,往往会安排在整个活动的中期去做,因此在…

spring-boot集成spring-brick实现动态插件

spring-boot集成spring-brick实现动态插件 spring-boot集成spring-brick实现动态插件 项目结构 & 需求实现spring-boot集成spring-brick 环境说明1. 主程序集成spring-brick 第一步:引入相关依赖第二步:修改程序入口方法第三步:编写配置…

软件测试太卷了,我实在是做不到啊~

前言 本科计算机专业,做了四年软件测试工作,从一开始一脸懵的点点点,到现在会自动化测试了,浅谈一下计算机专业人员从事软件测试的一点点心得体会,仅供参考交流。 如果你本科学的是计算机专业,觉得开发那…

1092 To Buy or Not to Buy (PAT甲级)

1092. To Buy or Not to Buy (20)-PAT甲级真题_柳婼的博客-CSDN博客 柳婼的解法要更清晰一些。 下面是我的解法…… #include <iostream> #include <string>int main(){std::string a, b;bool flag true;int extra, missing;int cntA[62] {0};int cntB[62] {…

银行项目:如何大规模、高效率的做自动化测试

背景 近几年&#xff0c;各家商业银行均在大力发展自动化测试。在这一进程中&#xff0c;自动化测试的设计理念不断完善&#xff0c;新的技术不断应用&#xff0c;使得自动化测试资产的积累代价和维护代价不断降低&#xff0c;自动化测试资产的数量不断增长。 在短短几年间&…

【STM32CubeMX项目】DHT11模块

前言 在我的另一篇里文章里已经介绍过DHT11的时序理论了&#xff0c;这里介绍下&#xff0c;我写DHT11的数据获取的思路和调用。程序验证后&#xff0c;发现下述问题&#xff0c;暂时解决不了&#xff0c;但是还是会把个人的代码流程&#xff0c;函数的编写思路和工程写下&…

通过Python封装接口商品ID获取京东商品历史价格数据,京东历史价格数据,京东API接口

京东商品历史价格数据展示了该商品在一段时间内的价格变化情况&#xff0c;可作为购物决策的重要参考因素。用户可以根据历史价格数据来判断当前商品的价格是否处于一个合理水平&#xff0c;并对接下来的价格趋势进行预测。 京东商品历史价格数据可以在商品详情页面中查看&…

【华为自研】| 国产数据库 GaussDB崛起

目录 GaussDBGaussDB 简介产品优势GaussDB(for openGauss)GaussDB(for MySQL)GaussDB(for Cassandra)GaussDB(for Mongo)GaussDB(for Redis)GaussDB(for Influx) GaussDB GaussDB采用一体化架构&#xff0c;同时支持关系型和非关系型数据库引擎&#xff0c;能够满足政企全方位…

2023面试题合集(建议收藏)

写在前面 个人强烈感觉面试因人而异&#xff0c;对于简历上有具体项目经历的同学&#xff0c;个人感觉面试官会着重让你介绍自己的项目&#xff0c;包括但不限于介绍一次真实攻防/渗透/挖洞/CTF/代码审计的经历 > 因此对于自己的项目&#xff0c;面试前建议做一次复盘&…

125760-30-7,Fmoc-Ser(Ac4Galβ1-3Ac2GalNAcα)-OH,由不同糖的混合物组成,包括单糖和双糖

●常用名&#xff1a;O-[4,6-二-O-乙酰基-2-(乙酰氨基)-2-脱氧-3-O-(2,3,4,6-四-O-乙酰基-BETA-D-吡喃半乳糖基)-ALPHA-D-吡喃半乳糖基]-N-[芴甲氧羰基]-L-丝氨酸 ●英文名&#xff1a;Fmoc-Ser(Ac4Galβ1-3Ac2GalNAcα)-OH●外观以及性质&#xff1a; 陕西新研博美生物科技有限…

基于simulink仿真机械手将负载从一个灰色圆柱形平台移动到另一个平台

一、前言 此示例说明了在 Simulink 3D 动画™模型中使用全局坐标。全局坐标可以在模型中以多种方式用于对象跟踪和操作、简单的碰撞检测、触觉效果模拟等。 二、示例 虚拟世界中对象的全局坐标可通过VR源块获得。对于场景中的每个变换&#xff0c;VR 源块的“参数”对话框中的树…

第一次做SDK测试,做个笔记

一、认识SDK 1、含义 SDK是为客户端提供的特定的软件包、软件框架、硬件平台、操作系统等建立应用软件时的开发工具的集合。如拨打电话&#xff0c;摄像机&#xff0c;视频播放/录制&#xff0c;图片保存&#xff0c;预览图片&#xff0c;刷新窗口&#xff0c;显示成功状态页面…

CnOpenData短视频播主排名数据

一、数据简介 短视频即短片视频&#xff0c;是一种互联网内容传播方式&#xff0c;一般是指在互联网新媒体上传播的时长在5分钟以内的视频。随着网红经济的出现&#xff0c;视频行业逐渐崛起一批优质UGC内容制作者&#xff0c;微博、秒拍、快手、今日头条纷纷入局短视频行业&am…

DevOps实践:持续交付和自动化部署的最佳实践

引言 今天给大家分享一篇有关DevOps实践的文章。 在当今快节奏的软件开发环境中&#xff0c;为了保持竞争力&#xff0c;加速交付和提高质量已经成为必须要做到的事情。而DevOps方法论则是解决这些问题的一种综合性方案。 本文将为您介绍DevOps的最佳实践&#xff0c;包括持…

项目管理过程中常见的错误,您都知道吗?

在项目管理过程中&#xff0c;出现错误是很常见的。这些错误可能会导致项目延误、超支、质量下降&#xff0c;甚至会让整个项目失败。因此&#xff0c;了解这些错误&#xff0c;及时采取措施避免和纠正&#xff0c;是每个项目经理必须掌握的技能。 以下是一些常见的项目管理错…

Java001——认识dos和使用基本的dos命令

围绕以下4点来学习&#xff1a; 1、什么是dos? 2、dos的作用? 3、电脑中怎么操作dos? 4、操作dos有哪些基本的dos命令? 一、什么是dos&#xff1f; DOS是磁盘操作系统Disk Operating System&#xff09;的简称。 二、dos的主要作用&#xff1f; DOS 操作系统的主要功能…

Linux tomcat 8 配置访问本地文件,并且配置https

一 、Linux Tomcat 配置访问本地文件 1. 将需要被访问的文件上传至 /opt/datafile 目录下 2. 修改tomcat server.xml文件&#xff0c;增加配置 docBase:文件所在目录 path: 代理访问路劲<Context docBase"/opt/datafile/" path"files" debug"0&q…

maven私服搭建详细教程

目录 1 为什么需要私服 2 Nexus私服 2.1 Nexus下载及登录 2.2 maven仓库 2.2.1 代理仓库 2.2.2 宿主仓库 2.2.3 仓库组 3 本地Maven下载构建 3.1 pom.xml方式 3.2 镜像方式 4 本地依赖发布到私服 4.1 maven部署到nexus私服 4.1.1 快照版本 ​4.1.2 release版本 4.2 …