前言:由于运维反馈帮忙计算云主机的费用,特编写此脚本进行运算
如图,有如下excel数据
计算过程中需用到数据库中的数据,故封装了一个读取数据库的类
import MySQLdb
from sshtunnel import SSHTunnelForwarder
class SSHMySQL(object):
def __init__(self):
self.server = self.get_server()
self.conn = self.get_conn()
self.cur = self.conn.cursor()
def __enter__(self):
return self
def get_server(self):
# 使用SSH隧道,通过跳板机连接数据库
server = SSHTunnelForwarder(
('192.xx.xx.xx', 22), # 跳板机地址
ssh_username='xxxx', # 跳板机账号
ssh_password='xxxx', # 跳板机密码
remote_bind_address=('127.0.0.1', 3306) # MySql服务器
)
return server
def get_conn(self):
# 开启隧道
self.server.start()
# 使用MySQLdb的connect()方法连接数据库
conn = MySQLdb.connect(
host='127.0.0.1', # 此处必须是127.0.0.1
port=self.server.local_bind_port,
user='root',
password='',
db='ecos',
charset='utf8'
)
return conn
def get_query_one(self, query, param=None):
try:
# 使用execute()方法执行SQL语句
self.cur.execute(query, param)
# 提交当前事务
self.conn.commit()
# 使用fetchone()方法获取第一条数据
data = self.cur.fetchone()
if data is not None:
response = dict(zip([k[0] for k in self.cur.description], data))
else:
response = data
return response
except Exception as e:
# 回滚当前事务
self.conn.rollback()
raise e
def get_query_all(self, query, param=None):
try:
# 使用execute()方法执行SQL语句
self.cur.execute(query, param)
# 提交当前事务
self.conn.commit()
# 使用fetchall()方法获取全部数据
data = self.cur.fetchall()
if data is not None:
response = [dict(zip([k[0] for k in self.cur.description], row)) for row in data]
else:
response = data
return response
except Exception as e:
# 回滚当前事务
self.conn.rollback()
raise e
def __exit__(self, exc_type, exc_val, exc_tb):
# 关闭游标
self.cur.close()
# 关闭数据库连接
self.conn.close()
# 关闭隧道
self.server.close()
def db_query(self, query, param):
res = self.get_query_one(query, param)
print(res)
if __name__ == '__main__':
with SSHMySQL() as db:
query = "SELECT * FROM user WHERE surname = %s"
param = ('yx_01',)
res = db.get_query_all(query, param)
print(res)
封装后,调试一下,可以正常读取数据库内容,使用pandas模板读取excel表中的数据,进行运算
import pandas as pd
import calendar
import re
import datetime
from sql.connect_sql import SSHMySQL
# 基础信息
file_path = r'C:\Users\阿娇啊\Desktop\主机概览.xlsx'
# 云主机和磁盘的折扣
vm_discount = 0.01
cloud_discount = 0.01
# 购买周期(按月计费)
vm_cycle = 3
c_cycle = 3
# 当前年月日
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
cma_days = calendar.monthrange(year, month)[1]
cmr_days = cma_days - day + 1
# 读取sheet云主机数据
usecols_vm = ['名称', '规格配置', '系统盘类型']
df_vm = pd.read_excel(file_path, sheet_name='云主机', usecols=usecols_vm)
len_vm = len(df_vm.index)
print('云主机基础信息:------------')
print('总行数为:{};本月剩余天数为:{};云主机折扣为:{};系统盘折扣为:{};购买周期为:{}个月'.format(len_vm, cmr_days, vm_discount, cloud_discount, vm_cycle))
# 价格 = (单价*12个月/365天*本月剩余天数)+剩余月数*单价
# 云主机价格
vm_list = []
sc_list = []
for i in range(0, len_vm):
# 按行和列 获取表格数据
vm_name = df_vm.iloc[i]['名称']
sc_type = df_vm.iloc[i]['系统盘类型']
spec_con = df_vm.iloc[i]['规格配置']
# 正则匹配云主机规格、系统盘大小及单位,并转换为字符串
pat_vm = '\w*.\w*.\w'
pat_sc = '系统盘: \w*'
pat_sc_size = '\d.'
pat_sc_unit = 'TB|GB'
vm_spec = re.compile(pat_vm).findall(spec_con)[0]
sc = re.compile(pat_sc).findall(spec_con)[0]
sc_size = re.compile(pat_sc_size).findall(sc)[0]
sc_unit = re.compile(pat_sc_unit).findall(sc)[0]
# 从数据库获取云主机规格单价和系统盘单价
with SSHMySQL() as db:
query = "SELECT CAST(monthly as CHAR) as monthly FROM `spec` WHERE name= %s and type = 'VIRTUALMACHINE'"
vm_param = (vm_spec, )
vm_res = db.get_query_all(query, vm_param)
vm_month = float((vm_res[0])['monthly'])
# print('云主机单价为:', vm_month)
query = "SELECT CAST(monthly as CHAR) as monthly FROM `spec` WHERE name= %s and type = 'CLOUDDISK'"
sc_param = (sc_type,)
sc_res = db.get_query_all(query, sc_param)
sc_month = float((sc_res[0])['monthly'])
# print('系统盘单价为:', sc_month)
# 云主机价格
vm_price = (vm_month*12/365*cmr_days+(vm_cycle-1)*vm_month)*vm_discount
# 系统盘价格
sc_price = (sc_month*float(sc_size)*12/365*cmr_days+(vm_cycle-1)*sc_month*float(sc_size))*cloud_discount
print('{}-->云主机价格为:{}元;系统盘价格为:{}元'.format(vm_name, vm_price, sc_price))
vm_list.append(vm_price)
sc_list.append(sc_price)
print('云主机总价为:{};系统盘总价为:{}'.format(sum(vm_list), sum(sc_list)))
运算结果为: