README.TXT
2023/6/15
V1.0 实现了单个点位数据通信、数据解析、数据存储
2023/6/17
V2.0 实现了多个点位数据通信、数据解析、数据存储
2023/6/19
V2.1 完善log存储,仅保留近3天的log记录,避免不必要的存储;限制log大小,2MB。
架构介绍
使用Python开发服务器程序,实现以下功能:
- 采用问询的方式读取各类传感器数据
- 正确高速解析各类传感器的数据
- 存储解析后的各类传感器数据
- 存储程序运行过程中的log
- 管理log,超过一定量、一定时间自动删除log
打包发布 或者在后台运行py服务器程序
硬件介绍 传感器 串口服务器 采集模块 5G通信模块
主要传感器
485电子水尺传感器
该传感器支持485通信
其他硬件:
485漏液侦测传感器
485采集模块
串口服务器
5G通信模块
调试工具
采集模块调试工具
485电子水尺调试工具
串口调试助手,主要用于 漏液传感器
其他工具
CRC计算器,不过我一般都是自己写CRC校验程序,CRC校验程序放在后面完整代码了
WIN10自带的计算器
调试过程
完整代码 LOG存储功能、数据读取解析存储功能
#!D:/workplace/python
# -*- coding: utf-8 -*-
# @File : main0620.py
# @Author:Romulushe
# @Time : 2023/6/20 13:53
# @Software: PyCharm
# @Use: PyCharm
import os
import sys
import logging
import time
from logging.handlers import RotatingFileHandler
import shutil
import socket
import threading
import pymysql
import binascii
import time
import datetime
base_path = os.path.dirname(os.path.realpath(sys.argv[0]))
def get_log_path():
return os.path.join(base_path, 'logs')
def cleanup_logs():
log_path = get_log_path()
current_time = time.time()
for file_name in os.listdir(log_path):
file_path = os.path.join(log_path, file_name)
if os.path.isfile(file_path):
creation_time = os.path.getctime(file_path)
if current_time - creation_time > (3 * 24 * 60 * 60):
os.remove(file_path)
def configure_logging():
log_path = get_log_path()
os.makedirs(log_path, exist_ok=True)
log_filename = get_log_filename()
log_file = os.path.join(log_path, log_filename)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', filename=log_file)
def get_log_filename():
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d_%H-%M.log")
def create_new_log():
log_path = get_log_path()
log_files = os.listdir(log_path)
if len(log_files) >= 20:
oldest_file = min(log_files)
os.remove(os.path.join(log_path, oldest_file))
log_filename = get_log_filename()
log_filepath = os.path.join(log_path, log_filename)
return log_filepath
def check_log_size(log_filepath):
log_size = os.path.getsize(log_filepath)
if log_size > 2 * 1024 * 1024:
# 创建新的日志文件
new_log_filepath = create_new_log()
try:
shutil.move(log_filepath, new_log_filepath)
return new_log_filepath
except PermissionError:
insert_log(logger, f'{log_filepath} {PermissionError}', log_filepath)
time.sleep(0.1)
return log_filepath
return log_filepath
def insert_log(logger, log_message, log_filepath):
log_filepath = check_log_size(log_filepath)
# 创建文件处理器
file_handler = RotatingFileHandler(log_filepath, maxBytes=2 * 1024 * 1024, backupCount=1)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# 添加文件处理器到日志记录器
logger.addHandler(file_handler)
try:
logger.debug(log_message)
except PermissionError:
insert_log(logger, f'{log_message} {PermissionError}', log_filepath)
time.sleep(0.1) # 延迟0.1秒
# 移除文件处理器
logger.removeHandler(file_handler)
# 定义个函数,使其专门重复处理客户端的请求数据(也就是重复接受一个用户的消息并且重复回答,直到用户选择下线)
def client_request_1(tcp_client_1, tcp_client_address):
#print(tcp_client_1)
# 循环接收和发送数据
while True:
# #print(datetime.datetime.today())
# msg=["010300000005859C","02020000000879FF"]
# msg=['010300010001D5CA',"02020000000879FF"]
# msg=["010300000001840A","02020000000879FF"]#8点位
msg=msg_address(tcp_client_address)
# interval =60
interval = int(60 / len(msg))
# print(interval,type(interval))
for i in msg:
try:
tcp_client_1.send(binascii.unhexlify(i))
recv_data = tcp_client_1.recv(4096)
# 有消息就回复数据,消息长度为0就是说明客户端下线了
if recv_data:
# #print(f"recv_data:{recv_data}")
bistr = binascii.hexlify(recv_data).decode('utf8')
# print("==================================================")
# print(f"读数:【{bistr}】,{len(bistr)}")
try:
if (bistr.startswith('010302') or bistr.startswith('030302') ) and len(bistr) >=14:
bistr=bistr[:15]
# # 输入数据
input_data = bistr[:-4]
# # 计算带有校验码的结果
result = calculate_checksum(input_data)
if bistr == result:
waterLevel = int(bistr[6:10], 16)
print("水深", waterLevel, "cm", bistr, bistr[6:10], bistr[:-4], bistr[-4:], "input_data:",
{input_data}, " result:", {result})
# #print(bistr==result)
ip_address(tcp_client_address[0], tcp_client_address[1], waterLevel, 'water')
if bistr.startswith("020201") and len(bistr) >= 12:
bistr = bistr[:13]
# 输入数据
input_data = bistr[:-4]
# 计算带有校验码的结果
result = calculate_checksum(input_data)
# #print(result==bistr)
if result == bistr:
data = data_address(bistr)
#print(f"水泵采集模块状态:{data}")
ip_address(tcp_client_address[0], tcp_client_address[1], data, 'status')
ip_address(tcp_client_address[0], tcp_client_address[1], data, 'fault')
if bistr.startswith("0104") and len(bistr) >= 14:
bistr = bistr[:15]
# 输入数据
input_data = bistr[:-4]
# # 计算带有校验码的结果
result = calculate_checksum(input_data)
if result == bistr:
# #print(f"bistr[6:10]:{bistr[6:10]}")
data =int(bistr[6:10], 16)
#print(f"漏液检测状态:{data}")
ip_address(tcp_client_address[0], tcp_client_address[1], data, 'liquid')
except Exception as e:
insert_log(logger, f'{tcp_client_address} {e}', log_filepath)
#print(e)
else:
#print("%s 客户端下线了..." % tcp_client_address[1])
insert_log(logger, f'{tcp_client_address[1]} OFFLINE', log_filepath)
tcp_client_1.close()
break
# time.sleep(interval)
time.sleep(2)
except Exception as e:
insert_log(logger, f'{tcp_client_address} {i} {e}', log_filepath)
#print(f"{tcp_client_address} {i} {e}")
# 连接数据库
def connect_database(host, port, user, password, db_name):
try:
conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db_name)
# print("成功连接到数据库")
return conn
except pymysql.Error as e:
insert_log(logger, f'DATABASE CONNECT FAILED', log_filepath)
# print(f"数据库连接失败: {e}")
# 插入数据
def insert_data(conn, types,table_name, create_time, location_no, parameter_desc, location, param_value):
try:
cursor = conn.cursor()
sql = f"INSERT INTO {table_name} (TYPE, CREATE_TIME, LOCATION_NO, PARAMETER_DESC, LOCATION, PARAMVALUE) " \
f"VALUES (%s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (types,create_time, location_no, parameter_desc, location, param_value))
conn.commit()
# print(f"数据插入成功: {create_time}, {parameter_desc}: {param_value}")
cursor.close()
except pymysql.Error as e:
insert_log(logger, f'DATABASE INSERT DATA FAILED', log_filepath)
# print(f"数据插入失败: {e}")
def msg_address(tcp_client_address):
if tcp_client_address[0] == '根据实际IP修改' or tcp_client_address[0] == '根据实际IP修改' or tcp_client_address[0] == '根据实际IP修改' :
msg = ["010300000001840A", "02020000000879FF"] # 8点位无其他,水位01,状态02
return msg
if (tcp_client_address[0] == '根据实际IP修改' and tcp_client_address[1] == 10010 )or tcp_client_address[0] == '根据实际IP修改' or (tcp_client_address[0] == '根据实际IP修改' and tcp_client_address[1] == 10000 ) or tcp_client_address[0] == '根据实际IP修改' or tcp_client_address[0] == '根据实际IP修改' :
msg =["010300000001840A", "02020000000879FF"] # 4点位非漏液,水位01,状态02
return msg
if (tcp_client_address[0] =='根据实际IP修改' and tcp_client_address[1] == 10000 )or (tcp_client_address[0] == '根据实际IP修改' and tcp_client_address[1] == 10010):
msg = ["01040000000131CA"] # 4点位漏液
return msg
if tcp_client_address[0] == '根据实际IP修改' :
msg = ["010300010001D5CA","01040000000131CA"] # 2点位漏液,漏液01,水位02
return msg
if tcp_client_address[0] == '根据实际IP修改' :
msg = ["03030000000185E8","02020000000879FF"] # 展厅漏液01,水位03,状态02
# msg = ["03030000000185E8", "02020000000879FF", "01040000000131CA"] # 展厅漏液01,水位03,状态02
return msg
#ip判断
def ip_address(ip,port,param_value,notes):
host = ''#根据实际情况自定义
port = 3306
user =''#根据实际情况自定义
password = ''#根据实际情况自定义
db_name = ''#根据实际情况自定义
table_name =''#根据实际情况自定义
# 连接数据库
conn = connect_database(host, port, user, password, db_name)
#8点位采集+水尺
if str(ip)==''#根据实际情况自定义:
#只有1号端口
#水位尺数据
#采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根据实际情况自定义
# 模拟数据
if notes=='water':
location_no =''#根据实际情况自定义
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes=='status':
data=param_value
# print("data:",data)
status1=data[0]#1
status2=data[1]#2
status3=data[4]#5
status4=data[6]#7
# print("status1,status2,status3,status4:",status1,status2,status3,status4)
# # 插入1号数据
location_no = 'D04-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D04-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
# 插入3号数据
location_no = 'D04-03'
parameter_desc = '3号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status3)
# 插入4号数据
location_no = 'D04-04'
parameter_desc = '4号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status4)
if notes=='fault':
data = param_value
fault1 = data[2]#3
fault2 = data[3]#4
fault3 = data[5]#6
fault4 = data[7]#8
# print("fault1,fault2,fault3,fault4:",fault1,fault2,fault3,fault4)
# 插入1号数据
location_no = 'D04-05'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D04-06'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 插入3号数据
location_no = 'D04-07'
parameter_desc = '3号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault3)
# 插入4号数据
location_no = 'D04-08'
parameter_desc = '4号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault4)
# 8点位采集+水尺
if str(ip)==''#根据实际情况自定义:
#只有1号端口
#水位尺数据
#采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根据实际情况自定义
# 模拟数据
if notes=='water':
location_no = 'D07'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes=='status':
data=param_value
status1=data[0]
status2=data[1]
status3=data[2]
status4=data[3]
#print("status1,status2,status3,status4:",status1,status2,status3,status4)
# 插入1号数据
location_no = 'D07-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D07-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
# 插入3号数据
location_no = 'D07-03'
parameter_desc = '3号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status3)
# 插入4号数据
location_no = 'D07-04'
parameter_desc = '4号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status4)
if notes=='fault':
data=param_value
fault1 = data[4]
fault2 = data[5]
fault3 = data[6]
fault4 = data[7]
#print("fault1,fault2,fault3,fault4:",fault1,fault2,fault3,fault4)
# 插入1号数据
location_no = 'D07-05'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D07-06'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 插入3号数据
location_no = 'D07-07'
parameter_desc = '3号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault3)
# 插入4号数据
location_no = 'D07-08'
parameter_desc = '4号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault4)
# 8点位采集+水尺
if str(ip) == ''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D08'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
status3 = data[2]
status4 = data[3]
#print("status1,status2,status3,status4:", status1, status2, status3, status4)
# 插入1号数据
location_no = 'D08-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D08-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
# 插入3号数据
location_no = 'D08-03'
parameter_desc = '3号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status3)
# 插入4号数据
location_no = 'D08-04'
parameter_desc = '4号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status4)
if notes == 'fault':
data=param_value
fault1 = data[4]
fault2 = data[5]
fault3 = data[6]
fault4 = data[7]
#print("fault1,fault2,fault3,fault4:", fault1, fault2, fault3, fault4)
# 插入1号数据
location_no = 'D08-05'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D08-06'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 插入3号数据
location_no = 'D08-07'
parameter_desc = '3号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault3)
# 插入4号数据
location_no = 'D08-08'
parameter_desc = '4号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault4)
# 4点位采集+水尺
if str(ip) ==''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D09'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1号数据
location_no = 'D09-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D09-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1号数据
location_no = 'D09-03'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D09-04'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
if notes == 'liquid':
data_liquid = param_value
# 插入漏液数据
location_no = 'F03'
parameter_desc = ''#根据实际情况自定义
location=''#根据实际情况自定义
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
# 4点位采集+水尺
if str(ip) ==''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D06'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1号数据
location_no = 'D06-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D06-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1号数据
location_no = 'D06-03'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D06-04'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 4点位采集+水尺
if str(ip) ==''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D05'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
# 插入1号数据
location_no = 'D05-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D05-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
# 插入1号数据
location_no = 'D05-03'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D05-04'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
if notes == 'liquid':
data_liquid = param_value
# 插入漏液数据
location_no = 'F02'
parameter_desc =''#根据实际情况自定义
location=''#根据实际情况自定义
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
# 4点位采集+水尺
if str(ip) == ''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D03'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1号数据
location_no = 'D03-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D03-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1号数据
location_no = 'D03-03'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D03-04'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 4点位采集+水尺
if str(ip) == ''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D01'
parameter_desc = '水位值'
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1号数据
location_no = 'D01-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D01-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1号数据
location_no = 'D01-03'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D01-04'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 4点位采集+水尺
if str(ip) == ''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根据实际情况自定义
# 模拟数据
if notes == 'water':
location_no = 'D02'
parameter_desc = '水位值'
print( 'water:',param_value)
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
print("status1,status2:", status1, status2)
# 插入1号数据
location_no = 'D02-01'
parameter_desc = '1号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2号数据
location_no = 'D02-02'
parameter_desc = '2号泵浦状态'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
print("fault1,fault2:", fault1, fault2)
# 插入1号数据
location_no = 'D02-03'
parameter_desc = '1号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2号数据
location_no = 'D02-04'
parameter_desc = '2号泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
if notes == 'liquid':
data_liquid = param_value
print('liquid:'.data_liquid)
# 插入漏液数据
location_no = 'F01'
parameter_desc =''#根据实际情况自定义
location=''#根据实际情况自定义
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
# 2点位采集+水尺
if str(ip) == ''#根据实际情况自定义:
# 只有1号端口
# 水位尺数据
# 采集模块数据
# 数据库连接信息
types = '防汛'
create_time = datetime.datetime.now()
# 模拟数据
if notes == 'water':
location_no = 'D10'
parameter_desc = '水位值'
location =''#根据实际情况自定义
# 插入数据
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'liquid':
data_liquid = param_value
# 插入漏液数据
location_no = 'F04'
parameter_desc = ''#根据实际情况自定义
location = ''#根据实际情况自定义
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
def data_address(data):
extracted_data = data[6:8]
binary_data = bin(int(extracted_data, 16))[2:].zfill(8)
reversed_data = binary_data[::-1]
return reversed_data
#CRC校验
def calculate_crc(data):
crc = 0xFFFF
polynomial = 0xA001
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= polynomial
else:
crc >>= 1
return crc
def calculate_checksum(data):
# 将输入数据转换为字节列表
data_bytes = bytes.fromhex(data)
# 计算校验码
crc_value = calculate_crc(data_bytes)
crc_bytes = crc_value.to_bytes(2, 'big') # 将校验码转换为2字节的大端字节序
checksum = ''.join(format(byte, '02x') for byte in reversed(crc_bytes)) # 转换为十六进制字符串,并反转字节顺序
# 将校验码插入到原数据之后
result = data + checksum
return result
if __name__ == '__main__':
configure_logging()
cleanup_logs()
log_filepath = create_new_log()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
oldtime = datetime.datetime.today()
# 创建服务端套接字对象
tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置端口复用,使程序退出后端口马上释放
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# 绑定端口
tcp_server.bind(("", 8050))
# 设置监听
tcp_server.listen(128)
# 循环等待客户端连接请求
while True:
tcp_client_1, tcp_client_address = tcp_server.accept()
tcp_client_1.settimeout(10)
# 创建多线程对象
thd = threading.Thread(target=client_request_1, args=(tcp_client_1, tcp_client_address))
# #print(thd)
# 设置守护主线程
thd.setDaemon(True)
# 启动子线程对象
thd.start()
# 关闭服务器套接字
# tcp_server.close()`