目录
部署服务端程序
主服务端控制程序main.py
子目录的计算程序 jisuan.py
读取数据
读取csv数据读取
读取excel
时间格式转换
时间戳转datetime并且生成时间序列最后格式化时间
常用函数
拟合预测
服务端程序控制与维护
部署服务端程序
主服务端控制程序main.py
from flask import Flask, request, jsonify, make_response
from flask_cors import *
from pydantic import BaseModel
# utils为本目录的一个文件夹名,utils文件夹中有jisuan.py文件,搞个简称js
import utils.jisuan as js
import sys
# 支持并发访问,保证计算过程不报错的一些定义,不必纠结
sys.setrecursionlimit(100000) #设置递归深度
app = Flask(__name__)
class Data(BaseModel):
path: str
data: list
app = Flask(__name__)
CORS(app)
# 发生错误,返回给前端的格式,需要与前端设计一起考虑
def wrong(cause):
msg = {}
msg['msg'] = cause
msg['code'] = 500
data = {}
data['data'] = "0"
data['msg'] = msg
return data
# 前端发送请求的接口名,需要与函数名一致
@app.post("/jisuan1")
async def jisuan1():
# 接受文件
try:
file_obj = request.files['file']
except:
return wrong("文件上传失败")
# 接受对象数据
try:
receive = request.json
except:
return wrong("数据接受失败")
#给子js文件的daoru函数发送接收到的文件,接受返回内容
try:
result = js.daoru(file_obj)
except Exception as e:
print(e)
return wrong("文件处理出错")
#发送对象数据
try:
result = js.yunsuan(receive)
except Exception as e:
print(e)
return wrong(e)
#成功的返回格式
msg = {}
msg['msg'] = "成功"
msg['code'] = 200
data = {}
data['data'] = result
data['msg'] = msg
return data
# 在8765端口等候,0.0.0.0 处理所有地址来的请求
if __name__ == '__main__':
app.run(host='0.0.0.0',port=8765)
子目录的计算程序 jisuan.py
def daoru(file_obj):
return 1
def yunsuan(data):
return 1
读取数据
读取csv数据读取
import csv
def get_data():
file_path = 'C:\\Users\\1.csv'
result ={}
result['时间'] = [] #对象中创建名为name,值为空列表的键值对
result['值'] = []
with open(file_path, 'r') as file:
reader = csv.reader(file)
for values in reader:
values[0].split(" ") //读取csv的第一行
# 假设第一列为时间,进行格式转换
time_obj = datetime.strptime(values[0][0], " %Y/%m/%d-%H:%M")
result['时间'].append(time_obj)
result['值'].append(float(values[0][1]))
return result
读取excel
import pandas as pd
def daoru(file_obj):
#忽略异常,不必纠结
warnings.simplefilter(action='ignore', category=FutureWarning)
data = pd.read_excel(file_obj)
# 用0填充空值
data.fillna(0, inplace=True)
#选择第六列的所有元素,转化成数列
数值1 = list(data.iloc[:, 5])
result = {}
result['数值1'] = 数值1[:]
#转化成对象返回
return result
时间格式转换
时间戳转datetime并且生成时间序列最后格式化时间
import time
from datetime import datetime
import datetime as dt
时间戳 = 1710080000
time_list = []
# 把时间戳转成datetime格式
date = time.localtime(int(时间戳))
date_time = datetime(*date[:6])
time_list.append(date_time)
后一个时间比前一个时间晚一个小时
for i in range(10):
time_list.append(time_list[i] + dt.timedelta(minutes=60))
for i in range(11):
#格式化时间
time_list[i] = time_list[i].strftime('%Y-%m-%d %H')
print(time_list)
常用函数
四舍五入 round(2.7026,1)
平方 pow(10,2) 更可靠方式 numpy.power(10, 0.32)
添加数列元素 list_a.append(1)
生成pai math.pi
拟合预测
import numpy as np
拟合列表 = [1,2,2.5,3,2.8,2.6,2.4,2.3]
# 生成自然数列表
x_拟合 = list(range(0,8))
x_预测 = list(range(0,20))
# 拟合度预告,拟合效果越好,预测效果越差
拟合度 = 2
# x与y要长度一致
p1 = np.poly1d(np.polyfit(x_拟合,拟合列表, 拟合度))
预测列表 = list(p1(x_预测))
服务端程序控制与维护
进入到main文件目录
netstat -lnp|grep 8765 查看占用8765端口的进程
sudo kill -9 13990 杀死这个进程
python3 main.py 重启服务端main程序
由于flask框架长时间运行会自己中断,我写了一个程序,让他每隔一段时间重新启动,实现永远不会掉线,在此分享
nohup python3 -u keep.py > nohup.log 2>&1 &
目录中创建一个keep.py文件,再创建一个写入日志的nohup.log文件
# 只是重复了上面的手动关闭,开启过程,不必修改
import time
import subprocess
import re
import os
import psutil
def job():
# 进程名称python3
# 当前进程pid
# print("dance:", os.getpid())
with open('keep.txt', 'w') as f:
f.write(str(os.getpid()))
# 当前进程名称
# print(psutil.Process().name())
os.system("nohup python3 -u main.py > /root/1.log 2>&1 &")
print("开启")
time.sleep(60*60*3)
command = "netstat -lnp|grep 8765"
output = subprocess.check_output(command, shell=True, universal_newlines=True)
print(output)
match = re.search(r'LISTEN\s+(\d+)/python', output)
if match:
number = int(match.group(1))
# print("获取pid: " + str(number))
os.system("sudo kill -9 %d" % number)
print("杀死了")
# time.sleep(7)
while True:
job()