Python编写BC260Y TCP数据收发压力测试脚本

news2024/9/21 22:52:59

Python编写BC260Y TCP数据收发压力测试脚本

使用BC260Y的TCP AT命令发送数据时,能够在数据中带有’\r\n’(回车换行),而其他模组会将’\r\n’当做AT命令处理的结束符,例如EC800E,为了验证TCP数据中带有’\r\n’时数据发收的稳定性,决定进行压力测试。

  • 编写一个Python脚本周期向TCP服务器发送1024字节数据(数据中带有不止一个’\r\n’),在超时时间内判断是否发送成功和接收成功。
  • TCP服务器同样以’\r\n’为结束符,收到结束符后将收到的所有数据原封不动回发。
  • 所有的数据收发log保存到文件,便于出错后分析。
    使用串口工具测试步骤如下:
    在这里插入图片描述

编写Python脚本:

#########################################
## AT测试脚本:根据BC260Y模组TCP模块编写TCP压力测试脚本,模组使用直吐模式连接TCP服务器,每隔5s向服务器发送1024字节数据,服务器以'\r\n'(回车换行)作为数据接结束标志,将接收到的数据返回
## pyhton版本:3.6.5 
## 需要安装的第三方库:pip install pyserial
#########################################
## -- coding: utf-8 --
import os
import sys
import threading
import time
import datetime 
import msvcrt
import serial
import logging
import signal
import json
import hmac
import base64
from hashlib import sha256

############# 宏定义(开始) ####################
ATCOM         = 'COM38'
ATBaud        = 9600
LOGNamePrefix = 'LOG-NAME'
g_ExitFlag = 0                  ## 0--未退出;1--退出
g_RecvResult = 0                ## 0--未接收;1--接收成功
g_RecvData = ""                 ## 接收数据缓存
g_SendSuccCount = 0             ## 成功次数
g_SendFailCount = 0             ## 失败次数
g_SendTotleCount = 0            ## 测试总次数
g_TimeOut = 0                   ## 0--未超时;1--超时
g_Cereg = 0                     ## 0--未驻网;1--已驻网
############# 宏定义(结束) ####################

###******************************************************************************
## 日志格式,输出到文件,同时输出到屏幕
def logInit(log_dir, logFileName) :
    ## 日志格式,输出到log_dir路径下logFileName文件
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)    
    
    log_file_path = os.path.join(log_dir, logFileName)

    logging.basicConfig(
        level=logging.DEBUG,
        format='[%(asctime)s][%(levelname)s]: %(message)s',
        filename=log_file_path,
        ## 写入模式,覆盖现有的日志文件,如果文件已经存在,它的内容将被清空
        filemode='w'
    )

    ## 日志格式,输出到屏幕
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    ## %(asctime)s:表示日志事件发生的时间。默认格式为YYYY-MM-DD HH:MM:SS,mmm,其中mmm表示毫秒。
    ## %(levelname)s:表示日志记录的级别,如DEBUG, INFO, WARNING, ERROR, CRITICAL等。
    ## %(message)s:表示日志记录的消息内容。
    formatter = logging.Formatter('[%(asctime)s][%(levelname)s]: %(message)s')
    console.setFormatter(formatter)
    logging.getLogger().addHandler(console)

###******************************************************************************
## 打开串口
## 端口,GNU/Linux上的/dev/ttyUSB0等 或 Windows上的COM3等
## 波特率,标准值之一:50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,19200,38400,57600,115200
## 超时设置,None:永远等待操作,0为立即返回请求结果,其他值为等待超时时间(单位为秒)
def openComPort(portx, bps, timeout):
    ret = False
    ser = ""
    try:
        ser = serial.Serial(portx, bps, timeout=timeout)
        if(ser.is_open):
            ret = True
    except Exception as e:
        logging.error("openComPort faild:",e)
    return ser,ret

###******************************************************************************
## 关闭串口
def closeComPort(ser):
    if(ser.is_open):
        ser.close()
        return 0
    return -1

###******************************************************************************
## 串口发送AT命令并接收返回值
def ATSend(atcmd):
    global g_RecvData

    ## 发送AT命令前先将接收缓存g_RecvData清空
    g_RecvData = ""

    if(len(atcmd) > 0):
        logging.info(">>>>>>Send AT cmd:[%s]" %(atcmd+'\r\n'))
        bytes_written  = ser.write((atcmd+'\r\n').encode('utf-8'))

    return bytes_written 

###******************************************************************************
## 接收数据线程
def threadFunComRecv():
    global g_RecvResult, g_RecvData
    rxDataTmp = ""
    rxData = ""

    while(0 == g_ExitFlag):
        count = ser.inWaiting()
        if(count > 0):
            tmp = ser.read(ser.in_waiting)
            
            try:
                rxDataTmp = tmp.decode('utf-8')
            except:
                logging.info(tmp)
            else:
                rxData = rxData + rxDataTmp
                if(ser.in_waiting > 0):
                    continue
                
                ## 以'\n'为结束符将数据存入接收缓存g_RecvResult
                if('\n' == rxData[-1]):
                    g_RecvData = g_RecvData + rxData
                    rxData = ""
                    rxDataTmp = ""
                    ## 将接收标志为置位
                    g_RecvResult = 1

###******************************************************************************
## 按键终止
def signal_Handler(sig, frame):
    global g_ExitFlag

    g_ExitFlag = 1
    logging.info("!!!Exit!!!")
    sys.exit(0)

###******************************************************************************
## 定时器超时函数
def timeOut():
    global g_TimeOut

    g_TimeOut = 1
    logging.info("!!!ERROR:Time out!!!\r\n")

###******************************************************************************
## 开启定时器
def start_timer(delay):

    timer = threading.Timer(delay, timeOut)  
    timer.start()  

    return timer

###******************************************************************************
## 检查接收数据函数
def checkRecvData(strings, timeout):
    global g_TimeOut

    ## 启动定时器,在超时时间timeout内判断是否接收到指定数据strings
    timer = start_timer(timeout)
    while(1 != g_TimeOut):
        if(str(g_RecvData).find(strings) >= 0):
            g_TimeOut = 0
            timer.cancel()
            ## 波特率9600时,1024字节数据传输需要1.06s,加2s延时确保在收到指定数据strings后的其他数据都能接收完全
            time.sleep(2)
            logging.info("<<<<<<g_RecvData:[%s] \r\n" %(g_RecvData))

            return 1

    if(1 == g_TimeOut):
            g_TimeOut = 0
            timer.cancel()
            time.sleep(2)
            logging.info("!!!ERROR: Time out!!!")

            return -1
    

###******************************************************************************
## 主函数
if __name__ == "__main__":
    ## 将当前本地时间格式化为'%Y%m%d%H%M%S'这样的格式
    t = time.strftime('%Y%m%d%H%M%S', time.localtime())
    logFileName = "%s-%s.log" %(LOGNamePrefix, t)
    log_dir = ".\\log"
    # log_dir = "C:\\Users\\Administrator\\Desktop\\TCPRecvTest\\log"

    ## 初始化log输出
    logInit(log_dir, logFileName)

    ## ctrl+c终止程序
    signal.signal(signal.SIGTERM, signal_Handler)
    signal.signal(signal.SIGINT, signal_Handler)

    ## 打开串口
    ser,ret = openComPort(ATCOM, ATBaud, 5)
    if(False == ret):
        exit
    else:
        logging.info("ATCOM:%s Open Success!" %ATCOM)
    
    ## 开启接收线程
    trdComRecv = threading.Thread(target=threadFunComRecv)
    trdComRecv.start()

    ## 唤醒模组,模组在睡眠模式中时,第一条命令会唤醒模组,但是无法响应该AT命令,因此先发一条AT命令唤醒模组
    ser.write(('AT\r\n').encode('utf-8'))
    logging.info("Wake up module!")
    time.sleep(1)

    ## 退出睡眠模式 AT+QSCLK=0
    ATCMD = "AT+QSCLK=0"
    bytes_written = ATSend(ATCMD)
    ## 长度加2是因为多了'\r\n'
    if(bytes_written == len(ATCMD) + 2):
        ## 5s内检查是否收到'OK'
        ret = checkRecvData("OK", 5)
        if(1 == ret):
            logging.info(">>>>>>[%s] OK \r\n" %(ATCMD))
        else:
            logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
            logging.info("!!!Exit!!!")
            sys.exit(0)

    ## 等待模组驻网 AT+CEREG?
    while(0 == g_Cereg):
        ATCMD = "AT+CEREG?"
        bytes_written = ATSend(ATCMD)
        if(bytes_written == len(ATCMD) + 2):
            ## 5s内检查是否收到'+CEREG: 0,1'
            ret = checkRecvData("+CEREG: 0,1", 5)
            if(1 == ret):
                logging.info(">>>>>>[%s] OK \r\n" %(ATCMD))
                g_Cereg = 1
            else:
                logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
                logging.info("!!!Exit!!!")
                sys.exit(0)

    ## 连接TCP服务器 AT+QIOPEN=0,0,"TCP","112.91.141.248",8000,0,1
    ATCMD = "AT+QIOPEN=0,0,\"TCP\",\"112.91.141.248\",8000,0,1"
    bytes_written = ATSend(ATCMD)
    if(bytes_written == len(ATCMD) + 2):
        ## 5s内检查是否收到'+QIOPEN: 0'
        ret = checkRecvData("+QIOPEN: 0,", 5)
        if(1 == ret):
            ## 模组上电第一次连接TCP服务器时返回+QIOPEN: 0,0
            if(str(g_RecvData).find("+QIOPEN: 0,0") >= 0):
                logging.info(">>>>>>[%s] OK Firts connection \r\n" %(ATCMD))
            ## 若该TCP连接已存在则回复+QIOPEN: 0,563
            elif(str(g_RecvData).find("+QIOPEN: 0,563") >= 0):
                logging.info(">>>>>>[%s] OK  Existing connection \r\n" %(ATCMD))
            else:
                logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
                logging.info("!!!Exit!!!")
                sys.exit(0)                
        else:
            logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
            logging.info("!!!Exit!!!")
            sys.exit(0)

    ## 主线程
    while(0 == g_ExitFlag):
        time.sleep(5)

        ## 退出睡眠模式 AT+QISEND=0,1024,"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789A\r\nA456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234B\r\nB9012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901C\r\nC678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123D\r\nD89012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456E\r\nE12345678901234567890123456789012345678901234567890123456789012345678901\r\n"
        ATCMD="AT+QISEND=0,1024,\"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789A\r\nA456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234B\r\nB9012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901C\r\nC678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123D\r\nD89012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456E\r\nE12345678901234567890123456789012345678901234567890123456789012345678901\r\n\""
        bytes_written = ATSend(ATCMD)

        if(bytes_written == len(ATCMD) + 2):
            ## 5s内检查是否收到'"recv"',表示模组接收到TCP服务器回复的数据
            ret = checkRecvData("+QIURC: \"recv\",", 5)
            ## 记录测试测试、成功次数、失败次数
            if(1 == ret):
                g_SendSuccCount = g_SendSuccCount + 1
                g_SendTotleCount = g_SendTotleCount + 1
                logging.info("######SEND SUCC! TOTAL COUNT:%d PASS COUNT:%d FAIL COUNT:%d\r\n" %(g_SendTotleCount, g_SendSuccCount, g_SendFailCount))
            else:
                g_SendFailCount = g_SendFailCount + 1
                g_SendTotleCount = g_SendTotleCount + 1
                logging.info("######SEND SUCC! TOTAL COUNT:%d PASS COUNT:%d FAIL COUNT:%d\r\n" %(g_SendTotleCount, g_SendSuccCount, g_SendFailCount))

            time.sleep(5)

使用Python脚本测试:
在这里插入图片描述

保存的log:
在这里插入图片描述

VScode包(包含运行时环境配置)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2079048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Zoom iOS 转录例子

一、在zoom marketplace创建通用app&#xff0c;zoom-recall 详见Zoom会议机器人转写例子-CSDN博客 二、mac下按照Xcode&#xff0c;创建APP项目meetingbot4ios 三、本实用的SDK为MobileRTC&#xff0c;即Meeting SDK的iOS版本 四、依赖如下&#xff1a; MobileRTC和Crypto…

Swift-UITableView列表动态设置高度,根据不同的内容长度,设置heightForRowAt

此篇文章主要阐述如何利用swift语言&#xff0c;实现返回内容不同长度文本的高度&#xff0c;比如第一个列表文字1行&#xff0c;只需要50像素高度&#xff0c;第二个列表文字超出了1行&#xff0c;如2行&#xff0c;那么就自动调整这个单元文本的高度&#xff1b; 用MVC实现&…

给房子“养老”,你准备好了吗?

文&#xff5c;琥珀食酒社 作者 | 积溪 真崩不住了啊 一觉醒来 朋友圈被房屋养老金刷屏了 有人说买房如买“爹”&#xff0c;真的好费钱 有的说咱自己的养老还没着落呢 未来还得给房子养老&#xff1f; 当然&#xff0c;这事已经被辟谣了 说公共账户不需要咱老百姓额外…

力扣经典题目之->相同的树(递归判断两颗二叉树是否相同)

一&#xff1a;题目 二&#xff1a;代码 三&#xff1a;递归展开 第一种模型&#xff1a; 递归展开图&#xff1a; 左&#xff1a; 右&#xff1a; 第二种模型及其递归展开图&#xff1a; 解释&#xff1a; 递归思路即&#xff1a;根相同&#xff0c;左子树相同&#xff0c;…

cdga|制造业、工程设计行业、创投行业的数据治理痛点与解决方案

在当今数字化时代&#xff0c;数据已成为企业核心资产&#xff0c;数据治理成为推动企业数字化转型和高质量发展的关键。然而&#xff0c;不同行业在数据治理过程中面临着不同的痛点与挑战。 今天小编来和大家聊聊制造业、工程设计行业、创投行业的数据治理痛点进行详细分析&a…

【java计算机毕设】网上商城MySQL springcloud vue HTML maven项目设计源码带项目报告PPT 前后端可分离也可不分离

目录 1项目功能 2项目介绍 3项目地址 1项目功能 【java计算机毕设】网上商城MySQL springcloud vue HTML maven项目设计源码带项目报告PPT 前后端可分离也可不分离 2项目介绍 系统功能&#xff1a; 网上商城包括管理员、用户两种角色。 管理员功能包括个人中心模块用于修改…

Java | Leetcode Java题解之第375题猜数字大小II

题目&#xff1a; 题解&#xff1a; class Solution {public int getMoneyAmount(int n) {int[][] f new int[n 1][n 1];for (int i n - 1; i > 1; i--) {for (int j i 1; j < n; j) {f[i][j] j f[i][j - 1];for (int k i; k < j; k) {f[i][j] Math.min(f[…

leetcode 560 和为k 的子数组

leetcode 560 和为k 的子数组 正文一般解法字典方法 正文 一般解法 class Solution:def subarraySum(self, nums: List[int], k: int) -> int:number 0for i in range(len(nums)):for j in range(i , len(nums)):if sum(nums[i:j 1]) k:number 1return number上述方法虽…

北斗卫星导航系统的应用落地,改变未来出行方式

近年来&#xff0c;随着北斗卫星导航系统在全球范围内的部署和完善&#xff0c;它的应用范围不断扩大&#xff0c;正逐渐成为数字化出行时代的重要基础设施。从智能导航到车联网&#xff0c;从航空导航到陆地测绘&#xff0c;北斗卫星导航系统的应用正在深入各个领域&#xff0…

视频单条剪、脚本靠手写?云微客开启海量视频时代

老板们注意了&#xff0c;现在已不再是视频单条剪&#xff0c;脚本靠手写的时代&#xff01;在这个信息爆炸的时代&#xff0c;短视频已经成为了现代信息传播和娱乐消费的重要载体&#xff0c;那么我们该如何高效、快速地制作出大量高质量的短视频内容呢&#xff1f;这就需要云…

TRIZ理论在手术机器人功能区设计中的应用

TRIZ&#xff0c;全称为“Theory of Inventive Problem Solving”&#xff0c;是一套系统化地分析和解决复杂问题的理论工具。它不仅能够预测技术系统的发展趋势&#xff0c;还能提供一套高效的问题解决策略&#xff0c;帮助设计师突破思维定势&#xff0c;实现突破性创新。在手…

视频转换成MP3怎么转?这里有快速转换通道

视频转换成MP3怎么转&#xff1f;在数字化时代&#xff0c;视频和音频内容无处不在&#xff0c;我们时常需要将视频中的音频提取出来&#xff0c;以便在特定场合下单独播放或编辑。将视频转换成MP3音频格式是一种常见且实用的需求。为了帮助你轻松实现这一操作&#xff0c;本文…

openlayers10+vue3+ts

在 Vue 3 应用程序中使用 Vite 工具链和 OpenLayers10 创建一个简单的地图实例&#xff0c;并实现一些基础的地图交互功能。 Demo 地图初始化 底图加载与切换 GeoJSON 数据的加载与导出 绘制功能 轨迹回放功能 使用 VectorLayer postrender 实现丝滑的轨迹运动效果。

基于Java的音乐网站与分享平台

你好&#xff0c;我是音乐技术爱好者&#xff0c;专注于音乐与技术的结合。如需交流或合作&#xff0c;请联系我。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;Java语言&#xff0c;Spring Boot框架&#xff0c;B/S架构 工具&#xff1a;Eclipse&a…

007、架构_MDS

​架构 什么是元数据 什么是元数据 元数据又称中介数据、中继数据,为描述数据的数据,主要是描述数据属性的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能;GoldenDB 数据库元数据大致分为两类: 数据字典:库、表、字段属性信息、视图、函数、存储过程属…

【数据结构-前缀异或】力扣1310. 子数组异或查询

有一个正整数数组 arr&#xff0c;现给你一个对应的查询数组 queries&#xff0c;其中 queries[i] [Li, Ri]。 对于每个查询 i&#xff0c;请你计算从 Li 到 Ri 的 XOR 值&#xff08;即 arr[Li] xor arr[Li1] xor … xor arr[Ri]&#xff09;作为本次查询的结果。 并返回一…

SpringBoot2:依赖管理与自动配置

一、依赖管理 什么叫依赖管理&#xff1f; 我们做过Maven项目的&#xff0c;都知道pom.xml的作用。里面配置了一大堆的包依赖。 所以&#xff0c;SpringBoot的依赖管理&#xff0c;意思就是&#xff0c;我们用SpringBoot开发web应用&#xff0c;那么&#xff0c;相关的依赖包&…

添加专辑失败,获取 userId 为空

文章目录 1、debug发现问题2、发现 userId 为空3、GuiGuAspect4、GuiGuLogin 1、debug发现问题 2、发现 userId 为空 点击 getUserId() 进入方法内部 package com.atguigu.tingshu.common.util;/*** 获取当前用户信息帮助类*/ public class AuthContextHolder {private stati…

【Python】copy()浅拷贝与深拷贝

前言 由于python关于传参的方面和C语言有些出入&#xff0c;对于先学C在学Python的可能需要做些笔记&#xff0c;比如Python中def传参是直接传址&#xff0c;而不是传值创建局部变量等等...而到了copy函数感觉又是个新概念... 1. copy介绍 1.1 copy() 浅拷贝 Python 的 cop…

如何解决 Compute 节点上的内存溢出(OOM)问题

内存溢出&#xff08;Out-of-memory&#xff09;是数据处理系统中常见的问题&#xff0c;本文将分析 OOM 的各种原因并提供有效的解决方法。 RisingWave 使用像 AWS S3 这样的共享存储&#xff0c;并将 Compute 节点的内存用作缓存以增强流处理性能。缓存以 Least Recently Us…