结合单元测试框架pytest+数据驱动模型+allure
目录
api: 存储测试接口 conftest.py :设置前置操作 目前前置操作:1、获取token并传入headers,2、获取命令行参数给到环境变量,指定运行环境 commmon:存储封装的公共方法 connect_mysql.py:连接数据库 http_requests.py: 封装自己的请求方法 logger.py: 封装输出日志文件 read_yaml.py:读取yaml文件测试用例数据 read_save_data.py:读取保存的数据文件 case: 存放所有的测试用例 data:存放测试需要的数据 save_data: 存放接口返回数据、接口下载文件 test_data: 存放测试用例依赖数据 upload_data: 存放上传接口文件 logs: 存放输出的日志文件 report: 存放测试输出报告 getpathinfo.py :封装项目测试路径 pytest.int :配置文件 requirement.txt: 本地python包(pip install -r requirements.txt 安装项目中所有python包) run_main.py: 项目运行文件
结构设计
1.每一个接口用例组合在一个测试类里面生成一个py文件 2.将每个用例调用的接口封装在一个测试类里面生成一个py文件 3.将测试数据存放在yml文件中通过parametrize进行参数化,实现数据驱动 4.通过allure生成测试报告
代码展示
api/api_service.py #需要测试的一类接口
'''
Code description:服务相关接口
Create time: 2020/12/3
Developer: 叶修
'''
import os
from common.http_requests import HttpRequests
class Api_Auth_Service(object):
def __init__(self):
self.headers = HttpRequests().headers
def api_home_service_list(self):
# 首页服务列表
# url = "http://192.168.2.199:9092/v1/auth/auth_service/findAuthService"
url = os.environ["host"] + "/v1/auth/auth_service/findAuthService" # 读取conftest.py文件地址进行拼接
response = HttpRequests().get(url, headers=self.headers, verify=False)
# print(response.json())
return response
def get_service_id(self):
#获取银行卡三要素认证服务id
url = "http://192.168.2.199:9092/v1/auth/auth_service/findAuthService"
#url = os.environ["host"] + "/v1/auth/auth_service/findAuthService" # 读取conftest.py文件地址进行拼接
response = HttpRequests().get(url,headers=self.headers)
#print(response.json()['data'][0]['service_list'][0]['id'])
service_id = response.json()['data'][0]['service_list'][1]['id']
return service_id
def api_service_info(self,serviceId='0b6cf45bec757afa7ee7209d30012ce1',developerId=''):
#服务详情
body = {
"serviceId" :serviceId,
"developerId":developerId
}
url = "http://192.168.2.199:9092/v1/auth/auth_service/findServiceDetail"
#url = os.environ["host"] + "/v1/auth/auth_service/findServiceDetail"#读取conftest.py文件地址进行拼接
response = HttpRequests().get(url,headers=self.headers,params = body,verify=False)
#print(response.json())
return response
def api_add_or_update_service(self,api_param_req,api_param_res,description,error_code,icon,id,interface_remarks,
name,product_info,request_method,sample_code,sort,type,url):
#服务添加或者更新
body={
"api_param_req": api_param_req,
"api_param_res": api_param_res,
"description": description,
"error_code": error_code,
"icon": icon,
"id": id,
"interface_remarks": interface_remarks,
"name": name,
"product_info": product_info,
"request_method": request_method,
"sample_code": sample_code,
"sort": sort,
"type": type,
"url": url,
}
#url = "http://192.168.2.199:9092/v1/auth/auth_service/insertOrUpdateService"
url = os.environ["host"] + "/v1/auth/auth_service/insertOrUpdateService" # 读取conftest.py文件地址进行拼接
response = HttpRequests().post(url,json=body,headers=self.headers,verify=False)
return response
def api_add_service_price(self,id,max_number,money,service_id,small_number):
#服务价格添加
body = {
"id": id,
"max_number": max_number,
"money": money,
"service_id": service_id,
"small_number": small_number
}
# url = "http://192.168.2.199:9092/v1/auth/auth_service/insertServicePrice"
url = os.environ["host"] + "/v1/auth/auth_service/insertServicePrice" # 读取conftest.py文件地址进行拼接
response = HttpRequests().post(url, json=body, headers=self.headers, verify=False)
return response
def api_apply_service(self,developer_id,service_id):
#申请服务
body ={
"developer_id": developer_id,
"service_id": service_id
}
# url = "http://192.168.2.199:9092/v1/auth/auth_service/applyService"
url = os.environ["host"] + "/v1/auth/auth_service/applyService" # 读取conftest.py文件地址进行拼接
response = HttpRequests().post(url, json=body, headers=self.headers, verify=False)
return response
if __name__ == '__main__':
#Auth_Service().api_home_service_list()
Api_Auth_Service().get_service_id()
#Auth_Service().api_service_info()
api/get_token.py#获取登录token
'''
Code description:获取token
Create time:2020-12-03
Developer:叶修
'''
import os
import urllib3
from common.http_requests import HttpRequests
class Get_Token(object):
def get_token(self,account='****',password='****'):
#url = "http://192.168.2.199:9092/v1/auth/developer/accountLogin"
url = os.environ["host"]+"/v1/auth/developer/accountLogin"
body = {
"account": account,
"password": password,
}
urllib3.disable_warnings()
r = HttpRequests().post(url, json=body,verify=False)
#print(r.json())
token = r.json()['data']['token']
params = {
"access_token": token
}
HttpRequests().params.update(params)#更新token到session
return token
if __name__ == '__main__':
print(Get_Token().get_token())
case/test_service_info.py #上面接口某一测试用例
'''
Code description: 服务详情
Create time: 2020/12/3
Developer: 叶修
'''
import sys
import allure
import pytest
from common.logger import Log
from common.read_yaml import ReadYaml
from api.api_auth_service.api_auth_service import Api_Auth_Service
testdata = ReadYaml("auth_service.yml").get_yaml_data() # 读取数据
@allure.feature('服务详情')
class Test_Service_Info(object):
log = Log()
@pytest.mark.process
@pytest.mark.parametrize('serviceId,developerId,expect', testdata['service_info'],
ids=['服务详情'])
def test_service_info(self,serviceId,developerId,expect):
self.log.info('%s{%s}' % ((sys._getframe().f_code.co_name,'------服务详情接口-----')))
with allure.step('获取服务id'):
serviceId = Api_Auth_Service().get_service_id()
with allure.step('服务详情'):
msg = Api_Auth_Service().api_service_info(serviceId,developerId)
self.log.info('%s:%s' % ((sys._getframe().f_code.co_name, '获取请求结果:%s' % msg.json())))
# 断言
assert msg.json()["result_message"] == expect['result_message']
assert msg.json()['result_code'] == expect['result_code']
assert 'url' in msg.json()['data']
conftest.py
'''
Code description:配置信息
Create time: 2020/12/3
Developer: 叶修
'''
import os
import pytest
from api.get_token import Get_Token
from common.http_requests import HttpRequests
@pytest.fixture(scope="session")
def get_token():
'''前置操作获取token并传入headers'''
Get_Token().get_token()
if not HttpRequests().params.get("access_token", ""):#没有get到token,跳出用例
pytest.skip("未获取token跳过用例")
yield HttpRequests().req
HttpRequests().req.close()
def pytest_addoption(parser):
parser.addoption(
"--cmdhost", action="store", default="http://192.168.1.54:32099",
help="my option: type1 or type2"
)
@pytest.fixture(scope="session",autouse=True)
def host(request):
'''获取命令行参数'''
#获取命令行参数给到环境变量
#pytest --cmdhost 运行指定环境
os.environ["host"] = request.config.getoption("--cmdhost")
print("当前用例运行测试环境:%s" % os.environ["host"])
common/connect_mysql.py
'''
Code description: 配置连接数据库
Create time: 2020/12/3
Developer: 叶修
'''
import pymysql
dbinfo = {
"host":"******",
"user":"root",
"password":"******",
"port":31855
}
class DbConnect():
def __init__(self,db_conf,database=""):
self.db_conf = db_conf
#打开数据库
self.db = pymysql.connect(database = database,
cursorclass = pymysql.cursors.DictCursor,
**db_conf)
#使用cursor()方式获取操作游标
self.cursor = self.db.cursor()
def select(self,sql):
#sql查询
self.cursor.execute(sql)#执行sql
results = self.cursor.fetchall()
return results
def execute(self,sql):
#sql 删除 提示 修改
try:
self.cursor.execute(sql)#执行sql
self.db.commit()#提交修改
except:
#发生错误时回滚
self.db.rollback()
def close(self):
self.db.close()#关闭连接
def select_sql(select_sql):
'''查询数据库'''
db = DbConnect(dbinfo,database='auth_platform')
result = db.select(select_sql)
db.close()
return result
def execute_sql(sql):
'''执行SQL'''
db = DbConnect(dbinfo,database='auth_platform')
db.execute(sql)
db.close()
if __name__ == '__main__':
sql = "SELECT * FROM auth_platform.auth_service where name='四要素认证'"
sel = select_sql(sql)[0]['name']
print(sel)
common/http_requests.py
'''
Code description: 封装自己的请求类型
Create time: 2020/12/3
Developer: 叶修
'''
import requests
# 定义一个HttpRequests的类
class HttpRequests(object):
req = requests.session()#定义session会话
# 定义公共请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36',
'cookie':''
}
params = {
'access_token':''
}
# 封装自己的get请求,获取资源
def get(self, url='', params='', data='', headers=None, cookies=None,stream=None,verify=None):
response = self.req.get(url,params=params,data=data,headers=headers,cookies=cookies,stream=stream,verify=verify)
return response
# 封装自己的post方法,创建资源
def post(self, url='', params='',data='', json='', headers=None, cookies=None,stream=None,verify=None):
response = self.req.post(url,params=params,data=data,json=json,headers=headers,cookies=cookies,stream=stream,verify=verify)
return response
# 封装自己的put方法,更新资源
def put(self, url='', params='', data='', headers=None, cookies=None,verify=None):
response = self.req.put(url, params=params, data=data, headers=headers, cookies=cookies,verify=verify)
return response
# 封装自己的delete方法,删除资源
def delete(self, url='', params='', data='', headers=None, cookies=None,verify=None):
response = self.req.delete(url, params=params, data=data, headers=headers, cookies=cookies,verify=verify)
return response
common/logger.py
'''
Code description: 封装输出日志文件
Create time: 2020/12/3
Developer: 叶修
'''
import logging,time
import os
import getpathinfo
path = getpathinfo.get_path()#获取本地路径
log_path = os.path.join(path,'logs')# log_path是存放日志的路径
# 如果不存在这个logs文件夹,就自动创建一个
if not os.path.exists(log_path):os.mkdir(log_path)
class Log():
def __init__(self):
#文件的命名
self.logname = os.path.join(log_path,'%s.log'%time.strftime('%Y_%m_%d'))
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
#日志输出格式
self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def __console(self,level,message):
#创建一个fileHander,用于写入本地
fh = logging.FileHandler(self.logname,'a',encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(self.formatter)
self.logger.addHandler(fh)
#创建一个StreamHandler,用于输入到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(self.formatter)
self.logger.addHandler(ch)
if level == 'info':
self.logger.info(message)
elif level == 'debug':
self.logger.debug(message)
elif level == 'warning':
self.logger.warning(message)
elif level == 'error':
self.logger.error(message)
#避免日志重复
self.logger.removeHandler(fh)
self.logger.removeHandler(ch)
#关闭打开文件
fh.close()
def debug(self,message):
self.__console('debug',message)
def info(self,message):
self.__console('info',message)
def warning(self,message):
self.__console('warning',message)
def error(self,message):
self.__console('error',message)
if __name__ == '__main__':
log = Log()
log.info('测试')
log.debug('测试')
log.warning('测试')
log.error('测试')
common/read_save_data.py
'''
Code description: 读取保存数据
Create time: 2020/12/8
Developer: 叶修
'''
import os
import yaml
import getpathinfo
class Read_Save_Date():
def __init__(self):
path = getpathinfo.get_path()#获取本地路径
self.head_img_path = os.path.join(path,'data','save_data')+"/"+'head_img_path.txt'# head_img_path文件地址
self.order_id_path = os.path.join(path, 'data', 'save_data') + "/" + 'order_id.txt' # order_id.txt文件地址
def get_head_img_path(self):
# 获取head_img_path
with open(self.head_img_path, "r", encoding="utf-8")as f:
return f.read()
def get_order_id(self):
# 获取order_id
with open(self.order_id_path, "r", encoding="utf-8")as f:
return f.read()
if __name__ == '__main__':
print(Read_Save_Date().get_head_img_path())
print(Read_Save_Date().get_order_id())
common/read_yaml.py
'''
Code description: 读取yml文件测试数据
Create time: 2020/12/3
Developer: 叶修
'''
import os
import yaml
import getpathinfo
class ReadYaml():
def __init__(self,filename):
path = getpathinfo.get_path()#获取本地路径
self.filepath = os.path.join(path,'data','test_data')+"/"+filename#拼接定位到data文件夹
def get_yaml_data(self):
with open(self.filepath, "r", encoding="utf-8")as f:
# 调用load方法加载文件流
return yaml.load(f,Loader=yaml.FullLoader)
if __name__ == '__main__':
data = ReadYaml("auth_service.yml").get_yaml_data()['add_or_update_service']
print(data)
data/
data/test_data/auth_service.yml
home_service_list:
- [{'result_code': '0', 'result_message': '处理成功'}]
service_info:
- ['','',{'result_code': '0', 'result_message': '处理成功'}]
add_or_update_service:
- [['1','1','1','1','1','123456','1','测试','1','1','1','1','1','1'],{'result_code': '0', 'result_message': '处理成功'}]
add_service_price:
- ['123456789','10','0','','0',{'result_code': '0', 'result_message': '处理成功'}]
apply_service:
- ['','',{'result_code': '0', 'result_message': '处理成功'}]
logs/
report/
getpathinfo.py
'''
Code description:配置文件路径
Create time: 2020/12/3
Developer: 叶修
'''
import os
def get_path():
# 获取当前路径
curpath = os.path.dirname(os.path.realpath(__file__))
return curpath
if __name__ == '__main__':# 执行该文件,测试下是否OK
print('测试路径是否OK,路径为:', get_path())
pytest.ini
#pytest.ini
[pytest]
markers = process
addopts = -p no:warnings
#addopts = -v --reruns 1 --html=./report/report.html --self-contained-html
#addopts = -v --reruns 1 --alluredir ./report/allure_raw
#addopts = -v -s -p no:warnings --reruns 1 --pytest_report ./report/Pytest_Report.html
requirements.txt
allure-pytest==2.8.18
allure-python-commons==2.8.18
BeautifulReport==0.1.3
beautifulsoup4==4.9.3
ddt==1.4.1
Faker==4.18.0
Flask==1.1.1
httpie==1.0.3
httplib2==0.9.2
HttpRunner==1.5.8
py==1.9.0
PyMySQL==0.10.1
pytest==6.1.1
pytest-base-url==1.4.2
pytest-cov==2.10.1
pytest-forked==1.3.0
pytest-html==2.1.1
pytest-instafail==0.4.2
pytest-metadata==1.10.0
pytest-mock==3.3.1
pywin32==228
PyYAML==5.3.1
requests==2.22.0
requests-oauthlib==1.3.0
requests-toolbelt==0.9.1
run_main.py
'''
Code description: 运行主流程测试用例
Create time: 2020/11/5
Developer: 叶修
'''
import os
import pytest
if __name__ == '__main__':
pytest.main(['-m','process', '-s','--alluredir', 'report/tmp'])#-m运行mark标记文件
os.system('allure generate report/tmp -o report/html --clean') # /report/tmp 为存放报告的源文件目录