一、背景
最近项目搞重构,将原有的系统拆分成了多个子系统。但是有数据表需要在不同系统中数据,同时为了解决项目性能最了一个很简单的方案,就是公共数据存在每个系统之中。
二、分析
分析这些表,这些表相比源数据表,表结构完全相同,除了名称加了MD前缀。但数据却要相同。这里我们可以从几个维度去进行比对:
a、表结构
b、表数据量
c、表主键最大id
d、表数据
三、编码
因最开始是以脚本的形式来写,所有没做配置文件。
1、数据库相关
数据库连接信息
# coding=utf8
"""
数据库配置信息
"""
mysql_data = {
"CRM": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"PLM": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"MES": {
"host": "127.0.0.1",
"user": "user_name",
"password": ********",
"db": "db_name",
"port": 3306
},
"SCM": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"UC": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"GDM": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"ESM": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"LOCAL": {
"host": "127.0.0.1",
"user": "user_name",
"password": "********",
"db": "db_name",
"port": 3306
},
"78": {
'host': '127.0.0.1',
"user": 'user_name',
'password': '********',
"db": "db_name",
"port": 3306
},
"PLM-PRE": {
'host': '127.0.0.1',
"user": 'user_name',
'password': '********',
"db": "db_name",
"port": 3306
}
}
数据库连接池
# coding=utf8
import pymysql
from pymysql import err
"""
获取crm的数据到plm
1、项目相关数据
2、合同相关数据
"""
class MSP_Connect:
def __init__(self, mysql_info: dict):
try:
self.host = mysql_info['host']
self.user = mysql_info['user']
self.password = mysql_info['password']
self.db = mysql_info['db']
self.con = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
db=self.db,
charset="utf8"
)
self.cur = self.con.cursor()
except KeyError:
print("mysql_info键名错误,key值分别【host, user, password, db】")
except err.OperationalError as e:
if e.args[0] == 1045:
print("MySql密码错误")
else:
print("发生了其它连接错误")
# 执行sql
def execute(self, sql, values=None):
if values == None:
self.cur.execute(sql)
return self.cur.fetchall()
return self.cur.execute(sql, values)
# 执行多条sql
def execute_many(self, sql, values):
return self.cur.executemany(sql, values)
# 提交事务
def commit(self):
self.con.commit()
# 释放资源
def close(self):
self.cur.close()
self.con.close()
2、检查处理
# coding=utf8
import pymysql
import os
from datetime import datetime, timedelta
from msp_script.database.connect_config import MSP_Connect
from msp_script.database.db_info import mysql_data
from openpyxl import Workbook, load_workbook
def check_msp_table_data(target_db_name, file_name=None):
"""
:description: 检查数据同步
:param target_db_name: 目标数据
:param file_name: 文件名称
:return:
"""
print('******************************************* [' + target_db_name + '] *******************************************')
delete_excel_file()
target_db = MSP_Connect(mysql_data[target_db_name])
write_result = []
# 1、获取目标系统中的主数据表
print('1、获取目标系统中的主数据表')
md_tables = get_table_index_by_md(target_db)
print('\t主数据表为' + str(list(table[1] for table in md_tables)))
info = []
# 2、对比源数据、目标数据库表结构
print('2、对比源数据、目标数据库表结构')
for table in md_tables:
target_table_name = table[1] # 获取当前检查的数据表名词
source_table_info = get_source_table_name(target_table_name) # 获取原数据表名称以及在哪个系统
source_table_name = source_table_info[0]
source_db_name = source_table_info[1]
if source_db_name in ('CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM'):
source_db = MSP_Connect(mysql_data[source_db_name])
# 检查表及表数量
check_rusult = check_table_info(source_db, source_table_name, target_db, target_table_name)
check_rusult.append(target_db_name)
check_rusult.append(source_db_name)
write_result.append(check_rusult)
if check_rusult[8] != "" and check_rusult[8] != None:
info.append(check_rusult[8])
source_db.close()
target_db.close()
# 写入报告
print('3、写入比对结果到Excel')
write_excel(write_result, file_name)
return info
def get_source_table_name(target_table_name):
"""
通过目标表名称获取源数据表名称
:param target_table_name: 目标数据库名称
:return: 源数据表名词,系统的简称
"""
try:
source_table_name = target_table_name.split('_', 2)[2]
sys_name = target_table_name.split('_', 2)[1].upper()
return source_table_name, sys_name
except Exception as e:
print("[" + target_table_name + "] 可能是非主数据表")
def get_table_index_by_md(target_db: MSP_Connect):
"""
:description: 获取目标数据库中的主数据表
:param target_db: 目标数据库连接对象
:return:
"""
get_md_table_sql = """
SELECT
TABLE_SCHEMA AS '数据库名',
TABLE_NAME AS '表名',
TABLE_ROWS AS '行数',
DATA_LENGTH AS '数据大小',
INDEX_LENGTH AS '索引大小',
CREATE_TIME AS '创建时间',
table_comment AS '表描述'
FROM
information_schema.TABLES WHERE TABLE_NAME LIKE 'md_%'
"""
result = target_db.execute(get_md_table_sql, None)
return result
# 检验msp数据同步
def check_table_info(source_db: MSP_Connect, source_table_name, target_db: MSP_Connect, target_table_name):
"""
:param source_db: 源数据库连接对象
:param source_table_name: 源数据表
:param target_db: 目标数据库连接对象
:param target_table_name: 目标数据表
:return:
"""
# 获取表字段
source_sql = "SHOW COLUMNS FROM `" + source_table_name + "`"
target_sql = "SHOW COLUMNS FROM `" + target_table_name + "`"
source_result = source_db.execute(source_sql, None)
target_result = target_db.execute(target_sql, None)
# 获取表数量
source_count_sql = "select count(*) from `" + source_table_name + "`"
target_count_sql = "select count(*) from `" + target_table_name + "`"
source_count = source_db.execute(source_count_sql, None)
target_count = target_db.execute(target_count_sql, None)
# 获取最大id
source_max_id_sql = "select max(id) from `" + source_table_name + "`"
target_max_id_sql = "select max(id) from `" + target_table_name + "`"
source_max_id = source_db.execute(source_max_id_sql, None)
target_max_id = target_db.execute(target_max_id_sql, None)
if source_result == target_result:
flag = '相同'
else:
flag = '不同'
msg = "目标系统表【" + target_table_name + "】与源系统表【" + source_table_name + "】表结构【 " + flag + " 】最大值分别为【" + str(target_max_id[0][0]) + ", " + str(source_max_id[0][0]) + "】, 数量分别为【" + str(target_count[0][0]) +", "+ str(source_count[0][0]) +"】"
# print(msg)
result = False
remark = ''
check_data_result = check_data(source_db, source_table_name, target_db, target_table_name)
if result == False:
if source_result != target_result:
remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】表结构不一致'
print('\t' + remark)
elif source_count != target_count:
remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】数据量不一致,数据量差额为:' + str(abs(source_count[0][0] - target_count[0][0])) + '条'
print('\t' + remark)
elif source_max_id != target_max_id:
remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】表最大值不一致'
print('\t' + remark)
elif check_data_result[0] == False and len(check_data_result) == 2:
remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】最近的100条数据不相同,ID为:[' + check_data_result[1] + ']'
print('\t' + remark)
elif check_data_result[0] == False and len(check_data_result) == 3:
remark = check_data_result[1]
print('\t' + remark)
else:
result = True
return [
flag,
target_table_name,
source_table_name,
target_count[0][0],
source_count[0][0],
target_max_id[0][0],
source_max_id[0][0],
result,
remark
]
# 将比对结果写入到Excel
def write_excel(write_result: list, file_name):
"""
:param write_result: 写入的数据
:param file_name: 文件名称无后缀
:return:
"""
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
file_dir = os.path.join(parent_dir, 'file')
file_path = os.path.join(file_dir, file_name) + '.xlsx'
if os.path.exists(file_path):
work_book = load_workbook(file_path)
work_sheet = work_book.active
max_row = work_sheet.max_row
else:
work_book = Workbook()
work_sheet = work_book.active
headers = ['序号', '目标系统', '目标系统表', '源系统', '源系统表', '表结构是否相同', '目标系统表数量', '原系统表数量', '目标系统最大值', '原系统最大值', '比对结果', '备注']
for col, headers in enumerate(headers, start=1):
work_sheet.cell(row=1, column=col, value=headers)
max_row = 2
for num, result in enumerate(write_result, start=max_row):
raw = []
raw.append(num - 1) # 序号
raw.append(result[9]) # 目标系统
raw.append(result[1]) # 目标系统表
raw.append(result[10]) # 源系统
raw.append(result[2]) # 源系统表
raw.append(result[0]) # 表结构是否相同
raw.append(result[3]) # 目标系统表数量
raw.append(result[4]) # 原系统表数量
raw.append(result[5]) # 目标系统最大值
raw.append(result[6]) # 原系统最大值
raw.append(result[7]) # 比对结果
raw.append(result[8]) # 备注
work_sheet.append(raw)
work_book.save(parent_dir + "/file/" + file_name + '.xlsx')
# 删除前一天产生的Excel文件
def delete_excel_file():
format_time = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
parent_path = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) + '/file'
for file in os.listdir(parent_path):
if file.endswith('.xlsx') and "数据比对结果_" + format_time in file:
os.remove(os.path.join(parent_path, file))
def check_data(source_db: MSP_Connect, source_table_name, target_db: MSP_Connect, target_table_name):
"""
:param source_db: 源数据库连接
:param source_table_name: 源表
:param target_db: 目标数据库连接
:param target_table_name: 目标表
:return: 错误信息
"""
res = []
try:
source_sql = "select * from `" + source_table_name + "` order by update_at desc, id desc limit 0, 100"
target_sql = "select * from `" + target_table_name + "` order by update_at desc, id desc limit 0, 100"
source_result = source_db.execute(source_sql)
target_result = target_db.execute(target_sql)
# return True if source_result == target_result else False
if target_result == source_result:
res.append(True)
return res
else:
ids = ''
for result in target_result:
if result not in source_result:
ids += (str(result[0]) + ',')
res.append(False)
res.append(ids[:-1])
return res
except pymysql.err.OperationalError as e:
if e.args[0] == 1054:
res.append(False)
res.append("源系统表【" + source_table_name + "】表缺少update_at字段")
res.append('error')
print("\t源系统表【" + source_table_name + '】表缺少update_at字段')
return res
def check_data_custom(source_db, target_db: list, tables: list):
"""
:param source_db: 原数据库别名
:param target_db: 目标数据库别名
:param tables: 需要检查的的表名
:return:
"""
result = []
for db in target_db:
tar_db = MSP_Connect(mysql_data[db.upper()])
dict = {}
list = []
for table in tables:
res = check_table_info(MSP_Connect(mysql_data[source_db]), table, tar_db, 'md_' + source_db.lower() + '_' + table)
if res[7] == False:
list.append({
# 'res': res[7],
'mark': res[8],
'table': table
})
dict[db.upper()] = list
result.append(dict) if dict else None
tar_db.close()
return result
if __name__ == '__main__':
# db_name = ['PLM', 'CRM', 'GDM', 'MES', 'SCM', 'ESM']
db_name = ['PLM', 'CRM']
# format_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
format_time = datetime.now().strftime('%Y%m%d%H%M%S')
file_name = '数据比对结果_' + format_time
# print(file_name)
for name in db_name:
print()
print('****************************************************************************************')
print(name + "系统数据开始检验...")
check_msp_table_data(name, file_name)
3、api
# coding=utf8
import logging
from flask import Flask, jsonify, request, send_file
from msp_script.public.check_msp_syn import check_msp_table_data, check_data_custom
from datetime import datetime, timedelta
import os
app = Flask(__name__)
app.debug = True
@app.route('/msp/api/check_syn_data', methods=['GET'])
def check_syn_data():
"""
:param target_db_name 目标数据库名称,必要参数
:param file_name 报告文件名称,必要参数
"""
target_db_name = request.args.get('target_db_name')
file_name = request.args.get('file_name')
info = {}
if file_name == None or file_name == '':
info['code'] = 500
info['message'] = '[file_name]文件名称不能为空!'
return jsonify(info)
else:
file_name = file_name + '_' + datetime.now().strftime('%Y%m%d%H%M%S')
if target_db_name == None or target_db_name == '':
info['code'] = 500
info['message'] = '[target_db_name]数据库别名不能为空!'
return jsonify(info)
if target_db_name.__contains__(','):
db_name = target_db_name.split(',')
for db in db_name:
if db.upper() not in ('CRM', 'PLM', 'MES', 'SCM', 'ESM', 'GDM'):
info['code'] = 500
info['message'] = '数据库别名只能是[CRM, PLM, MES, SCM, ESM, GDM]!'
return jsonify(info)
for name in db_name:
data = check_msp_table_data(name.upper(), file_name)
info[name] = data
info['code'] = 200
info['file'] = '/msp/api/download/' + file_name + '.xlsx'
return jsonify(info)
elif target_db_name.upper() not in ('CRM', 'PLM', 'MES', 'SCM', 'ESM', 'GDM'):
info['code'] = 500
info['message'] = '数据库别名只能是[CRM, PLM, MES, SCM, ESM, GDM]!'
return jsonify(info)
else:
data = check_msp_table_data(target_db_name.upper(), file_name)
info['code'] = 200
info['file'] = '/msp/api/download/' + file_name + '.xlsx'
info[target_db_name] = data
return jsonify(info)
@app.route('/msp/api/download/<string:file_name>', methods=['GET'])
def download_file(file_name):
"""
:param file_name: 文件名称
:param suffix: 文件后缀
:return:
"""
info = {}
if file_name == "" or file_name == None:
info['code'] = 500
info['message'] = '文件名称不能为空!'
return jsonify(info)
else:
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
file_dir = os.path.join(parent_dir, 'file')
file_path = os.path.join(file_dir, file_name)
if os.path.exists(file_path) and os.path.isfile(file_path):
return send_file(file_path, as_attachment=True)
else:
info['code'] = 500
info['message'] = '文件不存在!'
return jsonify(info)
@app.route('/msp/api/check_table_data', methods=['POST'])
def check_tables():
"""
:param source_db 源数据库名称,必要参数
:param target_db 目标数据库名称,必要参数
:param tables 需要检查的源数据表名称,必要参数
"""
source_db = request.json.get('source_db').upper()
target_db = [db.upper() for db in request.json.get('target_db')]
tables = request.json.get('tables')
result = {}
if source_db == '' or source_db == None:
result['code'] = 500
result['message'] = 'source_db原数据库名称不能为空'
return jsonify(result)
elif source_db not in ['CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM']:
result['code'] = 500
result['message'] = '原数据库名称[' + source_db + ']不存在'
return jsonify(result)
if target_db == '' or target_db == None or len(target_db) == 0:
result['code'] = 500
result['message'] = 'target_db目标数据库名称不能为空'
return jsonify(result)
else:
for t_db in target_db:
if t_db not in ['CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM']:
result['code'] = 500
result['message'] = '目标数据库名称[' + t_db + ']不存在'
return jsonify(result)
if source_db in target_db:
result['code'] = 500
result['message'] = 'source_db源数据库' + source_db + '不允许同时存在目标数据库列表' + str(target_db) + '中'
return jsonify(result)
if tables == '' or tables == None or len(tables) == 0:
result['code'] = 500
result['message'] = 'tables需要验证的表不能为空'
return jsonify(result)
else:
res = check_data_custom(source_db, target_db, tables)
result['code'] = 200
result['message'] = '成功'
result['source_db'] = source_db
result['target_db'] = target_db
result['tables'] = tables
result['no_match_data'] = res
return jsonify(result)
@app.route("/demo", methods=['GET'])
def demo():
dict = {
"a": 1,
"v": 2
}
return jsonify(dict)
if __name__ == '__main__':
app.run()
四、测试
这里一共提供三个接口
1、全量检查数据
/msp/api/check_syn_data GET
参数:target_db_name 需要检查的数据库名称
file_name 执行完输出的报告名称
2、自定义检查系统表数据
/msp/api/check_table_data POST
参数:source_db 源数据库名称
target_db 目标数据库名称
tables 需要检查的表
3、下载检查结果
/msp/api/download/<文件名称> GET