Locust单机多核压测,以及主从节点的数据通信处理

news2024/11/29 22:55:14

一、背景

这还是2个月前做的一次接口性能测试,关于locust脚本的单机多核运行,以及主从节点之间的数据通信。

先简单交代下背景,在APP上线之前,需要对登录接口进行性能测试。经过评估,我还是优先选择了locust来进行脚本开发,本次用到了locust的单机多核运行能力,只不过这里还涉及到主从节点之间数据通信。现成的可参考的有效文档甚少,所以还是自己摸着官方文档过河比较靠谱。

顺带提一下,学习框架这种东西最好的教程其实还得是官方文档以及框架源码了,这里贴上locust官方文档链接,需要的可以自行学习:https://docs.locust.io/en/stable/what-is-locust.html

二、代码编写

其实脚本代码的编写一大重点就是如何处理测试数据,不同的测试需求对于测试数据的处理是不同的。比如这次的需求,手机号不能重复。另外考虑到长时间的负载压力,数据量还得足够。

最后测试数据还需要处理,那么我使用的测试号段是非真实号码段,测试结束后可以查询对应号段内的手机号,进行相关业务数据的清理。

1. 代码概览

还是老样子,先附上全部代码,然后对其结构进行拆分讲解。

import random
import time
from collections import deque

from locust import HttpUser, task, run_single_user, TaskSet, events
from locust.runners import WorkerRunner, MasterRunner

CURRENT_TIMESTAMP = str(round(time.time() * 1000))
RANDOM = str(random.randint(10000000, 99999999))
MOBILE_HEADER = {
    "skip-request-expired": "true",
    "skip-auth": "true",
    "skip-sign": "true",
    "os": "IOS",
    "device-id": "198EA6A4677649018708B400F3DF69FB",
    "nonce": RANDOM,
    "sign": "12333",
    "version": "1.2.0",
    "timestamp": CURRENT_TIMESTAMP,
    "Content-Type": "application/json"
}

last_mobile = ""
worker_mobile_deque = deque()


# 13300120000, 13300160000 新用户注册号段

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        mobile_list = []
        for i in range(13300120000, 13300160000):
            mobile_list.append(i)
        mobile_list_length = len(mobile_list)
        print("列表已生成,总计数量:", mobile_list_length)
        worker_count = environment.runner.worker_count
        chunk_size = int(mobile_list_length / worker_count)
        print(f"平均每个worker分得的手机号数量:{chunk_size}")

        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size
            if i + 1 < worker_count:
                end_index = start_index + chunk_size
            else:
                end_index = len(mobile_list)
            data = mobile_list[start_index:end_index]
            environment.runner.send_message("mobile_list", data, worker)


def setup_mobile_list(environment, msg, **kwargs):
    len_msg_data = len(msg.data)
    print(f"worker收到的master传来的数据号段:{msg.data[0]} ~ {msg.data[len_msg_data-1]}")
    global worker_mobile_deque
    worker_mobile_deque = deque(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('mobile_list', setup_mobile_list)


class VcodeLoginUser(TaskSet):
    # wait_time = between(5, 5)

    @task
    def vcode_login(self):
        test_mobile = worker_mobile_deque.popleft()
        print("当前获取的手机号:", test_mobile)
        # print("当前队列大小:", len(worker_mobile_deque))
        global last_mobile
        last_mobile = test_mobile
        with self.client.post("/g/sendMobileVcode",
                              headers=MOBILE_HEADER,
                              json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:
            try:
                send_response_json = send_response.json()
                if send_response_json["message"] == "success":
                    params = {"mobile": str(test_mobile), "vcode": "111111"}
                    # print(test_mobile, "登录请求参数:", params)
                    with self.client.post("/g/vcodeLogin",
                                          json=params,
                                          headers=MOBILE_HEADER,
                                          catch_response=True) as login_response:
                        # print(login_response.json)
                        login_response_json = login_response.json()
                        if login_response_json["message"] != "success":
                            login_response.failure("message not equal success")
                        elif login_response_json["code"] != 0:
                            login_response.failure("code not equal 0")
                        elif login_response_json["data"]["rId"] == "":
                            login_response.failure("rid is null")
                        elif login_response_json["data"]["mobile"] != str(test_mobile):
                            login_response.failure("mobile is error,入参手机号{},返回的手机号{}"
                                                   .format(test_mobile, login_response.json()["data"]["mobile"]))
                        # print(test_mobile, "请求结果:", login_response.json())
                else:
                    send_response.failure("{} send code fail".format(test_mobile))
            except Exception as e:
                send_response.failure("send code fail {}".format(e))

    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        print("脚本结束")
        print("当前队列大小:", len(worker_mobile_deque))
        print("最后的手机号:", last_mobile)


class LocustLogin(HttpUser):
    tasks = [VcodeLoginUser]
    host = "https://qa.test.com"


if __name__ == '__main__':
    run_single_user(LocustLogin)

2. 代码拆解-要加必要的断言

首先是基于locust开发的http请求的脚本大结构是不变的,依旧是两大块:HttpUserTaskSet,这里不再对其讲解了,大伙看下官方文档就明白了。

接下来就是类VcodeLoginUser,可以看到在这里面是定义了单个用户的详细动作。注意这里要加上必要的断言。否则仅靠框架的非200外的错误断言还是不够的。

比如我这里关注登录成功后的几个必要字段:coderIdmobile,这些一定是要符合断言的才可以。

果不其然,压测过程中就发现了并发情况下会出现的问题:入参手机号是a,接口返回的手机号是b。并发量越大错误越多。如果我只断言code=0,那么这个问题就不容易发现了,虽然接口返回的code都是成功的,但是业务上已经存在错误了。

...
        with self.client.post("/g/sendMobileVcode",
                              headers=MOBILE_HEADER,
                              json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:
            try:
                send_response_json = send_response.json()
                if send_response_json["message"] == "success":
                    params = {"mobile": str(test_mobile), "vcode": "111111"}
                    # print(test_mobile, "登录请求参数:", params)
                    with self.client.post("/g/vcodeLogin",
                                          json=params,
                                          headers=MOBILE_HEADER,
                                          catch_response=True) as login_response:
                        # print(login_response.json)
                        login_response_json = login_response.json()
                        if login_response_json["message"] != "success":
                            login_response.failure("message not equal success")
                        elif login_response_json["code"] != 0:
                            login_response.failure("code not equal 0")
                        elif login_response_json["data"]["rId"] == "":
                            login_response.failure("rid is null")
                        elif login_response_json["data"]["mobile"] != str(test_mobile):
                            login_response.failure("mobile is error,入参手机号{},返回的手机号{}"
                                                   .format(test_mobile, login_response.json()["data"]["mobile"]))
                        # print(test_mobile, "请求结果:", login_response.json())
                else:
                    send_response.failure("{} send code fail".format(test_mobile))
            except Exception as e:
                send_response.failure("send code fail {}".format(e))
...
3. 代码拆解-单机多核处理

接下来就是重点了,如何在单台机器上用到多cpu。最开始的时候我忽略了这点,后来发现负载上不去,一打开资源监视器才发现只有1个cpu在满负载运行。

这里示意图仅供参考,我的win笔记本是12c的。

因为Locust是单进程的,不能充分利用多核CPU,于是需要我们压力机上开启一个master进程,然后再开启多个slave进程,组成一个单机分布式系统即可。

开启的方式也很简单:

# 开启 master 
locust -f locustfile.py --master

# 开启 slave
locust -f locustfile.py --slave

这里我们开启 slave 节点的时候可以开启对应多个命令行窗口,当时没截图,借用网上的图片示意一下:

开启后,你的web界面就可以实时看到当前启动的节点数了。

4. 代码拆解-处理主从节点数据通信

开启主从节点倒是很容易,测试数据就需要针对性进行处理了。

因为我的测试登录用的手机号不可以重复,所以要保证不同 slave 节点上同时运行的代码产生的手机号都不可以重复。

继续扒了下官方文档,发现可以通过增加事件监听器来实现我的需求。

这里我加了三个监听器分别来处理不同的事情:

  • @events.init.add_listener:在locust运行初始化的时候执行
  • @events.test_start.add_listener: 在测试代码开始运行的时候执行
  • @events.test_stop.add_listener: 在测试代码结束运行的时候执行

@events.test_start.add_listener
首先,在@events.test_start.add_listener里,我主要处理全量数据的生成,以及把这些手机号平均分配给生成的 slave 节点。

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        mobile_list = []
        for i in range(13300120000, 13300160000):
            mobile_list.append(i)
        mobile_list_length = len(mobile_list)
        print("列表已生成,总计数量:", mobile_list_length)
        worker_count = environment.runner.worker_count
        chunk_size = int(mobile_list_length / worker_count)
        print(f"平均每个worker分得的手机号数量:{chunk_size}")

        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size
            if i + 1 < worker_count:
                end_index = start_index + chunk_size
            else:
                end_index = len(mobile_list)
            data = mobile_list[start_index:end_index]
            environment.runner.send_message("mobile_list", data, worker)

注意这里最后一行中定义的mobile_list,需要定义一个对应函数来接收这个数据。

def setup_mobile_list(environment, msg, **kwargs):
    len_msg_data = len(msg.data)
    print(f"worker收到的master传来的数据号段:{msg.data[0]} ~ {msg.data[len_msg_data-1]}")
    global worker_mobile_deque
    worker_mobile_deque = deque(msg.data)

这样,不同的 slave 节点脚步分配到的手机号段就是不同的了,解决测试数据重复的问题。

另外,我定义另一个全局变量worker_mobile_deque,这样不同的 slave 节点接收的数据就可以放到队列里,运行的时候从队列里面取,用一个少一个,直到队列里的数据用完。

@events.init.add_listener
接着就是在@events.init.add_listener里要注册上面定义的数据字段和处理函数。

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('mobile_list', setup_mobile_list)

@events.test_stop.add_listener
最后,在@events.test_stop.add_listener这里可以做一些后置处理,我是简单起见,只是记录输出了本次测试用到了哪个号码段,这样我下次运行脚本的时候可以从后面的数据开始,最大化测试数据的使用,不浪费。

    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        print("脚本结束")
        print("当前队列大小:", len(worker_mobile_deque))
        print("最后的手机号:", last_mobile)

三、小结

脚本调试完后可以稳定运行,接下来就是测试的过程了,进行了服务器单节点、多节点负载能力的测试,水平拓展能力的测试,以及服务动态扩容、长时间高负载测试。测试的角度观察测试报告,服务各项指标的情况。只不过涉及到开发端,调优分析的工作并未能参与很多。不过大概还是那些常见问题,后续有机会可以再单独分享了。

从使用角度来看,locust深得我爱,比起 jemter真的太轻便了,代码灵活度也非常高,单机负载能力也是响当当的,这点比jemeter强太多了。我这个项目不需要非常高的量,所以单机只用了8c就够了。如果有小伙伴需要非常高的并发,locust 也支持多机器分布式,进一步扩大并发能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1159723.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023我和云栖有个约会

时间&#xff1a;2023.11.1 地点&#xff1a;云栖小镇 事件&#xff1a;约会 昨天刚在网上看到了有阿姨在云栖大会给自己女儿相亲的照片&#xff0c;今天直接就赶了过去。约会了一整天&#xff0c;虽然很累&#xff0c;但真的很值得。由于是第一次和云栖约会&#xff0c;那就…

“深入理解Nginx的负载均衡与动静分离“

目录 引言一、Nginx简介1. Nginx的基本概念2. Nginx的特点3. Nginx的安装配置 二、Nginx搭载负载均衡三、前端项目打包四、Nginx部署前后端分离项目&#xff0c;同时实现负载均衡和动静分离总结 引言 在现代互联网应用中&#xff0c;高性能和可扩展性是至关重要的。Nginx作为一…

初出茅庐的小李博客之STCW15408AS单片机串口1使用记录

STCW15408AS单片机串口1使用记录 资源介绍&#xff1a; STC15W401AS系列单片机是STC生产的单时钟/机器周期(1T)的单片机&#xff0c;是宽电压/高可靠/低功耗/超强抗干扰的新一代8051单片机&#xff0c;采用STC第九代加密技术&#xff0c;无法解密&#xff0c; 代码完全兼容传…

node使用fs模块(四)—— 文件夹的使用(创建、读取、删除)

文章目录 前言一、文件的创建1.参数2.基本使用 二、文件的读取1.参数2.读取文件的基本使用3.读取文件的递归使用&#xff08;option中添加recursive为true&#xff09; 三、文件的删除1. 参数2. 基本使用&#xff08;删除文件夹&#xff09;3. 递归删除文件夹 前言 创建、读取…

学习时遇到的错误

1. pycharm中使用ssh远程连接的jupyter时&#xff0c;出现***端口已经被占用的情况 办法一&#xff1a;更换端口&#xff0c;将端口更换为其他 办法二&#xff1a;重启远程终端服务器 2. 关于wandb&#xff0c;在pycharm中调用了wandb.init()初始化函数&#xff0c;中途关闭…

MolFormer分子预训练模型

Large-scale chemical language representations capture molecular structure and properties&#xff08;2022&#xff0c;NMI&#xff09; 和原本transformer encoder的不同&#xff1a; 采用linear attention mechanismrotary positional embedding 模型 transformer e…

x3daudio17.dll丢失是什么原因?x3daudio1_7.dll怎么解决?(修复教程分享)

在我多年的电脑使用经历中&#xff0c;我曾经遇到过x3daudio1_7.dll缺失的问题。这个问题让我苦恼了很久&#xff0c;但最终我还是找到了4种有效的修复方法。今天&#xff0c;我想和大家分享一下这些方法&#xff0c;希望对你们有所帮助。 首先&#xff0c;我要讲述一下我遇到…

跟着Nature Communications学作图:纹理柱状图+添加显著性标签!

&#x1f4cb;文章目录 复现图片设置工作路径和加载相关R包读取数据集数据可视化计算均值和标准差 计算均值和标准差方差分析组间t-test 图a可视化过程图b可视化过程合并图ab 跟着「Nature Communications」学作图&#xff0c;今天主要通过复刻NC文章中的一张主图来巩固先前分享…

基础课15——语音标注

语音数据标注是对语音数据进行处理和分析的过程&#xff0c;目的是让人工智能系统能够理解和识别语音中的信息。这个过程包括了对语音信号的预处理、特征提取、标注等步骤。 在语音数据标注中&#xff0c;标注员需要对语音数据进行分类、切分、转写等操作&#xff0c;让人工智…

C++ 多线程之OpenMP并行编程使用详解

C 多线程之OpenMP并行编程使用详解 总结OpenMP使用详解本文转载自&#xff1a;https://blog.csdn.net/AAAA202012/article/details/123665617?spm1001.2014.3001.5506 1.总览 OpenMP(Open Multi-Processing)是一种用于共享内存并行系统的多线程程序设计方案&#xff0c;支持…

软件测试之用例篇(万能公式、具体方法)

目录 1. 概念 2. 万能公式 3.具体设计测试用例的方法 &#xff08;1&#xff09;等价类 &#xff08;2&#xff09;边界值 &#xff08;3&#xff09;判定表(因果图) &#xff08;4&#xff09;场景设计法 &#xff08;5&#xff09;正交法 如何使用 allparis 生成正交…

绕开网站反爬虫原理及实战

1.摘要 在本文中,我首先对网站常用的反爬虫和反自动化技术做了一个梳理, 并对可能能够绕过这些反爬技术的开源库chromedp所使用的技术分拆做一个介绍, 最后利用chromedp库对一个测试网站做了爬虫测试, 并利用chromedp库绕开了爬虫限制,成功通过程序自动获取到信息。在测试过程…

基站/手机是怎么知道信道情况的?

在无线通信系统中&#xff0c;信道的情况对信号的发送起到至关重要的作用&#xff0c;基站和手机根据信道的情况选择合适的资源配置和发送方式进行通信&#xff0c;那么基站或者手机是怎么知道信道的情况呢&#xff1f; 我们先来看生活中的一个例子&#xff0c;从A地发货到B地…

小程序如何设置自动预约快递

小程序通过设置自动预约功能&#xff0c;可以实现自动将订单信息发送给快递公司&#xff0c;快递公司可以自动上门取件。下面具体介绍如何设置。 在小程序管理员后台->配送设置处&#xff0c;选择首选配送公司。为了能够支持自动预约快递&#xff0c;请选择正常的快递公司&…

vue3写nav滚动事件中悬停在顶部

1. 防抖类Animate, 使用requestAnimationFrame代替setTimeout 也可以使用节流函数, lodash有现成的防抖和节流方法 _.debounce防抖 _.throttle节流 export default class Animate {constructor() {this.timer null;}start (fn) > {if (!fn) {throw new Error(需要执行…

力扣:147. 对链表进行插入排序(Python3)

题目&#xff1a; 给定单个链表的头 head &#xff0c;使用 插入排序 对链表进行排序&#xff0c;并返回 排序后链表的头 。 插入排序 算法的步骤: 插入排序是迭代的&#xff0c;每次只移动一个元素&#xff0c;直到所有元素可以形成一个有序的输出列表。每次迭代中&#xff0c…

解决在Win7下运行一些老游戏花屏或色彩异常问题的方法

有一些喜欢回顾经典老游戏的玩家们&#xff0c;在目前最新的windows7的操作系统下&#xff0c;运行某些游戏会出现花屏&#xff0c;问题的原因是因为win7对这些游戏的DirectDraw不兼容&#xff0c;一种方法是改游戏配置文件&#xff0c;把游戏色彩8bit改成16bit&#xff0c;当然…

安装pytorch报错torch.cuda.is_available()=false的解决方法

参考文章&#xff1a; https://blog.csdn.net/qq_46126258/article/details/112708781 https://blog.csdn.net/Andy_Luke/article/details/122503884 https://blog.csdn.net/anmin8888/article/details/127910084 https://blog.csdn.net/zcs2632008/article/details/127025294 …

为什么Facebook运营需使用IP代理?有哪些美国IP代理好用?

随着互联网的快速发展和全球用户规模的不断增长&#xff0c;Facebook已成为了全球最大的社交媒体平台之一。然而&#xff0c;大批量地运营Facebook账号往往需要借助IP代理这一工具&#xff0c;提高账号的安全性和可靠性&#xff0c;使得运营Facebook更加流畅。那么Facebook为什…

嵌入式到底如何理解呢?

今日话题&#xff0c;嵌入式到底如何理解呢&#xff1f;以我个人的理解&#xff0c;可以用一个客观的比喻来描述&#xff0c;就是将某个系统嵌入到特定的环境中&#xff0c;以实现特定的功能。这个过程包括将现实世界中的人、物的意图和逻辑关系&#xff0c;通过计算和运算的方…