前言
由于mysql链接超时波动,导致数据缺失,需要根据日志填补数据
流程
获取确实数据的订单列表
搜索日志,获取请求日志
根据请求日志拼装sql
打印sql供修复数据
代码
因为我们日志打印的有问题,所以这里用字符串截取获取入参。如果日志打印的是标准json,直接搞json即可
from elasticsearch import Elasticsearch
import json
class MyUtils:
pass
def getValue(fullStr, beginStr, endStr):
start = fullStr.find(beginStr) + len(beginStr)
end = fullStr.find(endStr)
value = fullStr[start:end]
return value
def setValue(orderInfoExt, columnName, fullStr, beginStr, endStr):
value = MyUtils.getValue(fullStr, beginStr, endStr)
if value != 'null':
orderInfoExt[columnName] = value
es = Elasticsearch(hosts="http://xxx:9200/", http_auth=('xxx', 'xxx'))
scroll_id = None
fileName = "create-order-info" + ".txt"
orderIdList = [74xxxx574,
74xxxx822
]
orderExtInfoList = []
for orderId in orderIdList:
query_json = {
"_source": ["message", "logger_name", "@timestamp"],
"query": {
"bool": {
"filter":
[
{
"bool":
{
"filter":
[
{
"multi_match":
{
"lenient": True,
"query": "order/v1/createOrder",
"type": "phrase"
}
},
{
"multi_match":
{
"lenient": True,
"query": orderId,
"type": "phrase"
}
}
]
}
},
{
"range":
{
"@timestamp":
{
"format": "strict_date_optional_time",
"gte": "2024-11-01T00:00:00.000Z",
"lte": "2024-11-02T10:00:00.000Z"
}
}
}
],
"must":
[
],
"must_not":
[
],
"should":
[
]
}
}
}
query = es.search(index='xxxx-pro*', body=query_json, scroll='25m', size=5000,
request_timeout=2000000)
for k in query['hits']['hits']:
timestr = k['_source']['@timestamp']
request = k['_source']['message']
orderInfoExt = {}
#beancopy的字段
MyUtils.setValue(orderInfoExt, 'user_device_mac', request, "userDeviceMac=", ", userDeviceImei")
MyUtils.setValue(orderInfoExt, 'user_device_imei', request, "userDeviceImei=", ", userDeviceImsi")
#特殊的字段
MyUtils.setValue(orderInfoExt, 'order_id', request, "orderId=", ", oid")
MyUtils.setValue(orderInfoExt, 'user_order_ip', request, "userIpAddr=", ", userPort")
#print(orderInfoExt)
orderExtInfoList.append(orderInfoExt)
# 假设表名为 orders
table_name = 'order_info_ext'
for orderInfoExt in orderExtInfoList:
# 提取列名
columns = ', '.join(orderInfoExt.keys())
# 提取值,并处理为适当的格式
values = []
for key, value in orderInfoExt.items():
if value == 'null':
values.append('NULL')
elif isinstance(value, (int, float)):
values.append(str(value))
elif isinstance(value, str):
values.append("'"+value+"'")
else:
values.append('NULL')
# 构建 INSERT 语句
sql = f"INSERT INTO {table_name} ({columns}) VALUES ({', '.join(values)});"
print(sql)