Pyhon:串口应用及数据解析过程
串口通信是一种常用的通信协议,本文重点记录在Python中使用串口,并且以一款电源保护板的串口数据协议为例,对其进行解析,记录收发过程中对16进制数据进行转换的过程。
1. 调用串口
在Python中进行串口通信时,需要serial
包的支持,通过安装
pip install pyserial
安装包,然后可以对串口进行调用,在Linux系统中和在windows系统中的调用方式基本相同,只是端口的名称有所差别
- 打开串口:可以通过以下命令打开端口,这里打开
COM7
端口,设置波特率为115200
serial_port = serial.Serial("COM7", 115200, timeout=0.5) # windows 中打开方式
在Linux系统中可以采用如下方法打开端口,大同小异
serial_port = serial.Serial(
port='/dev/ttyUSB0', # Linux中打开方式
baudrate=115200,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
)
# Wait a second to let the port initialize
time.sleep(1)
- 关闭串口:在关闭串口的时候可以采用如下指令,就完成了对打开的端口进行关闭
serial_port.close()
- 发送数据:利用串口最主要的过程就是对数据的发送和接收功能,发送数据可以用如下函数
serial_port.write()
- 接收数据:接收过程用到的函数相对于发送会多几个,有如下函数需要使用
serial_port.flush() # 清空缓存
data_num = serial_port.inWaiting() # 这个数据显示一次读取回来多少个数据
data = serial_port.read() # 读串口数据
在有了这些基本函数后,需要做的是针对自己的任务来设计发送和接收的具体过程,根据所要传输的内容设计过程。在下面的测试过程中针对实际的串口数据解码过程,对电源保护板的串口数据进行解码,传输16进制的HEX数据。下面配合代码详细描述解码和测试过程。
2 测试工具准备
为了方便测试,可以采用虚拟串口,Virtual Serial Port Driver工具,工具可以有15天的试用期,有足够时间使用,不用破解,可以去下载地址下载,利用该工具可以生成一对虚拟串口,然后利用串口调试助手打开其中一个端口,Python程序中打开里一个端口,完成对于数据发送接收及解析的测试过程。
在生成了一对串口之后,还需要准备串口调试助手,如下UartAssist小软件,这个工具还挺好用,在5.0版本上具有很多其他的功能,可以用这个地址下载。
具有很多的便捷功能,
3 程序编写及测试
程序如下,为了方便说明分成4个部分(这个4个部分在一个.py文件中)。
-
- 导入包,打开关闭串口函数
import time
import serial
# import math
import binascii
# 在Linux系统中调用串口
# serial_port = serial.Serial(
# port='/dev/ttyUSB0',
# baudrate=115200,
# bytesize=serial.EIGHTBITS,
# parity=serial.PARITY_NONE,
# stopbits=serial.STOPBITS_ONE,
# )
# # Wait a second to let the port initialize
# time.sleep(1)
serial_port = serial.Serial("COM7", 115200, timeout=0.5)
# 打开串口函数
def serial_open():
if serial_port.isOpen(): # 判断串口是否成功打开
print("打开串口成功。")
print(serial_port.name) # 输出串口号
else:
print("打开串口失败。")
# 关闭串口函数
def serial_close():
serial_port.close()
pass
-
- 读串口数据函数并解析数据
data_all = []
data_hex_str = []
# 读串口数据
def serial_read():
serial_port.flush()
while True:
# time.sleep(100)
data_num = serial_port.inWaiting()# 这个数据显示一次读取回来多少个数据
# print("接收到", data_num, "个数据")
if data_num > 0:
time.sleep(0.1)
print('第一次读取当前接收字节数', data_num)
data_num = serial_port.inWaiting() # 这个数据显示一次读取回来多少个数据
data_all = []
print('第二次读取当前接收字节数', data_num)
for i in range(data_num):
data = serial_port.read()
data_all.append(data.hex()) # 这个地方是字符串的操作 这个操作的过程把接收到的数据都以16进制字符形式存储在这个变量中
# data_hex_str += binascii.hexlify(data) # 这个转换成hex字符串
# print(data.hex()) # 这个就可以取出16进制字符的形式 如'FF'
print('总的接收的字符串列表', data_all)
# 在这里就有数据了 接下来是如何解析数据
if data_all[0] == '4e' and data_all[1] == '57':
print('解析到起始帧。。。。。。。。。')
frame_len_str = bytes.fromhex(data_all[2] + data_all[3]) # 转成字符
frame_len = int(frame_len_str.hex(), 16)
print('帧长:', frame_len)
frame_id = data_all[4] + data_all[5] + data_all[6] + data_all[7]
print('BMS终端ID号:', frame_id)
frame_ordword = data_all[8]
print('命令字:', frame_ordword)
frame_source = data_all[9]
print('数据帧来源:', frame_source) # 0 BMS 1 蓝牙 2 GPS 3 PC上位机
frame_trantype = data_all[10]
print('传输类型:', frame_trantype) # 0 BMS 1 蓝牙 2 GPS 3 PC上位机
if data_all[11] == '79': ## 这里面涉及到电池数量,这个用多少设置成多少 测试用20节电池, 3个字节表示一个电池电压
print('开始解析单电池电压...')
len_0x79 = int(bytes.fromhex(data_all[12]).hex(), 16) # 这个字节是这个表示电压的字节的长度
# print('共', len_0x79, '个字节')
battery_num = round(len_0x79/3)
print('共连接了', battery_num, '节电池')
battery_mv_str = bytes.fromhex(data_all[14] + data_all[15]) # 转成字符
battery_mv = int(battery_mv_str.hex(), 16)
print('第一节电池电压值:', battery_mv, 'mv')
if data_all[13+len_0x79] == '80':
print('开始解析功率管温度...')
power_tube_temp_str = bytes.fromhex(data_all[74] + data_all[75]) # 转成字符
power_tube_temp = int(power_tube_temp_str.hex(), 16)
# 对温度进行转换,超过一百为负值
if power_tube_temp > 100:
power_tube_temp = 0-(power_tube_temp-100)
print('功率管温度为:', power_tube_temp)
if data_all[16+len_0x79] == '81':
print('开始解析电池箱内温度...')
battery_case_temp_str = bytes.fromhex(data_all[77] + data_all[78]) # 转成字符
battery_case_temp = int(battery_case_temp_str.hex(), 16)
# 对温度进行转换,超过一百为负值
if battery_case_temp > 100:
battery_case_temp = 0 - (battery_case_temp - 100)
print('电池箱内温度为:', battery_case_temp)
if data_all[19+len_0x79] == '82':
print('开始解析电池温度...')
battery_temp_str = bytes.fromhex(data_all[80] + data_all[81]) # 转成字符
battery_temp = int(battery_temp_str.hex(), 16)
# 对温度进行转换,超过一百为负值
if battery_temp > 100:
battery_temp = 0 - (battery_temp - 100)
print('电池温度为:', battery_temp)
if data_all[22+len_0x79] == '83':
print('开始电池总电压...')
battery_v_sum_str = bytes.fromhex(data_all[83] + data_all[84]) # 转成字符
battery_v_sum = int(battery_v_sum_str.hex(), 16)
# 对总电压进行转换, *0.01V
battery_v_sum = battery_v_sum * 0.01
print('电池总电压为:', battery_v_sum, 'V')
if data_all[25+len_0x79] == '84':
print('开始电流数据...')
battery_c_sum_str = bytes.fromhex(data_all[86] + data_all[87]) # 转成字符
battery_c_sum = int(battery_c_sum_str.hex(), 16)
if battery_c_sum < 32768:
print('放电电流为:', battery_c_sum, 'mA')
else:
battery_c_sum = battery_c_sum - 32768
print('充电电电流为:', battery_c_sum, 'mA')
if data_all[28+len_0x79] == '85':
print('开始解析电池剩余容量...')
battery_left_str = bytes.fromhex(data_all[89]) # 转成字符
battery_left = int(battery_left_str.hex(), 16)
battery_left = battery_left/255
print('电池剩余容量为:', battery_left) # 这个是一个百分比
if data_all[30+len_0x79] == '86':
print('开始解析电池温度传感器数量...')
temp_sensor_num_str = bytes.fromhex(data_all[91]) # 转成字符
temp_sensor_num = int(temp_sensor_num_str.hex(), 16)
print('电池温度传感器数量为:', temp_sensor_num)
if data_all[32+len_0x79] == '87':
print('开始解析电池循环使用次数...')
battery_ues_time_str = bytes.fromhex(data_all[93] + data_all[94]) # 转成字符
battery_ues_time = int(battery_ues_time_str.hex(), 16)
print('电池循环使用次数:', battery_ues_time)
if data_all[35+len_0x79] == '89':
print('开始解析电池循环总容量...')
battery_ues_sumtime_str = bytes.fromhex(data_all[96]+data_all[97]+data_all[98]+data_all[99]) # 转成字符
battery_ues_sumtime = int(battery_ues_sumtime_str.hex(), 16)
print('电池循环使用次数:', battery_ues_sumtime)
if data_all[40+len_0x79] == '8a':
print('开始解析电池总串数...')
battery_sum_num_str = bytes.fromhex(data_all[101] + data_all[102]) # 转成字符
battery_sum_num = int(battery_sum_num_str.hex(), 16)
print('电池总串数:', battery_sum_num)
if data_all[43+len_0x79] == '8b':
print('开始解析电池警告信息...')
battery_alarm_sum_str = bytes.fromhex(data_all[104] + data_all[105]) # 转成字符
battery_alarm_sum = int(battery_alarm_sum_str.hex(), 16)
print('电池警告信息:', battery_alarm_sum)
if data_all[46+len_0x79] == '8c':
print('开始解析电池状态信息...')
battery_state_str = bytes.fromhex(data_all[107] + data_all[108]) # 转成字符
battery_state = int(battery_state_str.hex(), 16)
print('电池状态信息:', battery_state)
if data_all[49+len_0x79] == '8e':
print('开始解析总电压过压保护...')
voltage_high_prot_str = bytes.fromhex(data_all[110] + data_all[111]) # 转成字符
voltage_high_prot = int(voltage_high_prot_str.hex(), 16)
voltage_high_prot = voltage_high_prot*10 # 最小单位10mv
print('总电压过压保护:', voltage_high_prot)
if data_all[52+len_0x79] == '8f':
print('开始解析总电压欠压保护...')
voltage_low_prot_str = bytes.fromhex(data_all[113] + data_all[114]) # 转成字符
voltage_low_prot = int(voltage_low_prot_str.hex(), 16)
voltage_low_prot = voltage_low_prot*10 # 最小单位10mv
print('总电压欠压保护:', voltage_low_prot, 'mV')
if data_all[55+len_0x79] == '90':
print('开始解析单体过压保护电压...')
voltage_single_high_prot_str = bytes.fromhex(data_all[116] + data_all[117]) # 转成字符
voltage_single_high_prot = int(voltage_single_high_prot_str.hex(), 16)
print('单体过压保护电压:', voltage_single_high_prot, 'mV')
if data_all[58+len_0x79] == '91':
print('开始解析单体过压恢复电压...')
voltage_single_high_recover_str = bytes.fromhex(data_all[119] + data_all[120]) # 转成字符
voltage_single_high_recover = int(voltage_single_high_recover_str.hex(), 16)
print('单体过压恢复电压:', voltage_single_high_recover, 'mV')
if data_all[61+len_0x79] == '92':
print('开始解析单体过压保护延时...')
voltage_single_high_pro_delay_str = bytes.fromhex(data_all[122] + data_all[123]) # 转成字符
voltage_single_high_pro_delay = int(voltage_single_high_pro_delay_str.hex(), 16)
print('单体过压保护延时:', voltage_single_high_pro_delay, 's')
if data_all[64+len_0x79] == '93':
print('开始解析单体欠压保护电压...')
voltage_single_low_pro_str = bytes.fromhex(data_all[125] + data_all[126]) # 转成字符
voltage_single_low_pro = int(voltage_single_low_pro_str.hex(), 16)
print('单体欠压保护电压:', voltage_single_low_pro, 'mV')
if data_all[67+len_0x79] == '94':
print('开始解析单体欠压恢复电压...')
voltage_single_low_recover_str = bytes.fromhex(data_all[128] + data_all[129]) # 转成字符
voltage_single_low_recover = int(voltage_single_low_recover_str.hex(), 16)
print('单体欠压恢复电压:', voltage_single_low_recover, 'mV')
if data_all[70+len_0x79] == '95':
print('开始解析单体欠压保护延时...')
voltage_single_low_pro_delay_str = bytes.fromhex(data_all[131] + data_all[132]) # 转成字符
voltage_single_low_pro_delay = int(voltage_single_low_pro_delay_str.hex(), 16)
print('单体欠压保护延时:', voltage_single_low_pro_delay, 's')
if data_all[73+len_0x79] == '96':
print('开始解析电芯压差保护值...')
battery_vol_diff_pro_str = bytes.fromhex(data_all[134] + data_all[135]) # 转成字符
battery_vol_diff_pro = int(battery_vol_diff_pro_str.hex(), 16)
print('电芯压差保护值:', battery_vol_diff_pro, 'mV')
else:
print('未找到起始帧')
# serial_port.write(data)
# if data == "\r".encode():
# For Windows boxen on the other end
# serial_port.write("\n".encode())
else:
# print(data_all)
a = []
在读数据的时候,先清楚掉缓存中的数据,防止缓存中的数据产生影响,然后进行了两次serial_port.inWaiting()
的操作,是为了保证一次串口传输的数据都接收完全,这个保护板一次传输回来的字节可以达到267。
接收回来的数据需要按照HEX形式读取,转换成10进制数据。在这里利用的函数byets.fromhex()
把bytes
转成str
,然后利用int
把str转成10进制,得到实际的结果。可以参考注释。
-
- 写串口数据函数
# 写串口数据
def serial_write():
data_write_1_1 = '4E' # 起始帧
data_write_1_2 = '57'
data_write_1 = data_write_1_1 + data_write_1_2
data_write_2_1 = '00' # 帧长度
data_write_2_2 = '13'
data_write_2 = data_write_2_1 + data_write_2_2
data_write_3_1 = '00' # BMS终端号
data_write_3_2 = '00'
data_write_3_3 = '00'
data_write_3_4 = '00'
data_write_3 = data_write_3_1 + data_write_3_2 + data_write_3_3 + data_write_3_4
data_write_4 = '06' # 命令字
data_write_5 = '03' # 帧来源
data_write_6 = '00' # 传输类型
data_write_7 = '00' # 数据标识码 '00'代表全部数据类型
data_write_8_1 = '00' # 记录号
data_write_8_2 = '00'
data_write_8_3 = '00'
data_write_8_4 = '00'
data_write_8 = data_write_8_1 + data_write_8_2 + data_write_8_3 + data_write_8_4
data_write_9 = '68' # 结束标识
# 计算校验和 这个是累加求和得到的校验和
data_temp = (int(bytes.fromhex(data_write_1_1).hex(), 16) + int(bytes.fromhex(data_write_1_2).hex(), 16) +
int(bytes.fromhex(data_write_2_1).hex(), 16) + int(bytes.fromhex(data_write_2_2).hex(), 16) +
int(bytes.fromhex(data_write_3_1).hex(), 16) + int(bytes.fromhex(data_write_3_2).hex(), 16) +
int(bytes.fromhex(data_write_3_3).hex(), 16) + int(bytes.fromhex(data_write_3_4).hex(), 16) +
int(bytes.fromhex(data_write_4).hex(), 16) + int(bytes.fromhex(data_write_5).hex(), 16) +
int(bytes.fromhex(data_write_6).hex(), 16) + int(bytes.fromhex(data_write_7).hex(), 16) +
int(bytes.fromhex(data_write_8_1).hex(), 16) + int(bytes.fromhex(data_write_8_2).hex(), 16) +
int(bytes.fromhex(data_write_8_3).hex(), 16) + int(bytes.fromhex(data_write_8_4).hex(), 16) +
int(bytes.fromhex(data_write_9).hex(), 16))
# print(type(data_temp), data_temp)
# data_test1 = str(hex(data_temp))
data_test = "{:04x}".format(data_temp) # 这是格式化输出
# print(type(data_test), data_test)
# print(type(data_test_213), data_test_213)
# data_test_byte = data_test_213.encode('utf-8')
# data_test_byte = str.encode(data_test_213)
# print(type(data_test_byte.decode()), data_test_byte.decode())
data_write_10_1 = '00' # 校验和,data_write_1 ~ data_write_9 的累加和,这两面前两个字节不用
data_write_10_2 = '00'
data_write_10_3 = data_test[0:2] # ‘01’
data_write_10_4 = data_test[2:4] # '29'
data_write_10 = data_write_10_1 + data_write_10_2 + data_write_10_3 + data_write_10_4
data_write = data_write_1 + data_write_2 + data_write_3 + data_write_4 + data_write_5 + data_write_6 + data_write_7 + data_write_8 + data_write_9 + data_write_10
data_write_b = bytes.fromhex(data_write)
print('写串口数据')
serial_port.write(data_write_b)
在写串口数据的时候,因为指令基本不变,所以直接用字符串赋值了,可以参考注释单独复制出来测试。
-
- mian函数
if __name__ == '__main__':
serial_open()
try:
serial_write()
serial_read()
except KeyboardInterrupt:
print("Exiting Program")
finally:
serial_close()
运行程序,可以看到程序先发送数据,在接收返回的数据进行解析