1、问题背景
最近使用python编写的一个蓝牙应用程序,在使用过程中发现传输大量数据时会产生丢包,导致无法接收到完整的数据包。蓝牙接收程序的代码如下。
#蓝牙数据接收处理线程
def bt_recv_thread(self):
recv_time = time.time()
while(self.thread_run):
try:
recv_data = self.client_socket.recv(1024)
except Exception as e:
self.log.logprint(e)
self.client_socket.close()
self.client_socket = None
self.thread_run = False
break
#print("recv data:", recv_data.hex())
#接收2秒超时,清除原来的接收数据
if(time.time() - recv_time) > 2 and self.recv_index != 0:
self.log.logprint("recv data timeout, index:%d, msg_len:%d"%(self.recv_index, self.msg_len))
self.recv_index = 0
self.msg_len = 0
recv_time = time.time()
for d in recv_data:
if d.to_bytes(1, "little") == T95_BT_HEAD_1 and self.recv_index == 0:
self.msg_len = 0
self.recv_index = 1
self.recv_msg = d.to_bytes(1, "little")
elif d.to_bytes(1, "little") == T95_BT_HEAD_2 and self.recv_index == 1:
self.recv_index = 2
self.recv_msg += d.to_bytes(1, "little")
elif d.to_bytes(1, "little") == T95_BT_HEAD_3 and self.recv_index == 2:
self.recv_index = 3
self.recv_msg += d.to_bytes(1, "little")
elif d.to_bytes(1, "little") == T95_BT_HEAD_4 and self.recv_index == 3:
self.recv_index = 4
self.recv_msg += d.to_bytes(1, "little")
#8-15是报文总长度, 占用8个字节
elif self.recv_index == 15:
self.recv_msg += d.to_bytes(1, "little")
self.recv_index += 1
self.msg_len = int.from_bytes(self.recv_msg[8:16], byteorder="big")
#接收到帧尾,并且长度等于帧头
elif self.recv_index == (self.msg_len - 1) and d.to_bytes(1, "little") == T95_BT_TAIL:
self.recv_msg += d.to_bytes(1, "little")
self.recv_index = 0
#处理消费一个整帧数据,计算数据CRC
crc = int.from_bytes(self.recv_msg[-5:-1], byteorder="big")
if crc != self.CRC32_cacl(self.recv_msg[:-5]):
self.log.logprint("CRC 32 error!")
continue
#判断数据帧类型
msg_type = int.from_bytes(self.recv_msg[16:20], byteorder="big")
seq = int.from_bytes(self.recv_msg[5:7], byteorder="big")
if msg_type == T95_BT_MSG_TYPE_CON_REQ:
#局放主动发送的心跳数据,回复应答
self.send_t95(T95_BT_MSG_TYPE_CON_ACK, T95_BT_ACK_FLAG, seq, b"", b"")
elif msg_type == T95_BT_MSG_TYPE_DATA_UPLOAD:
#局放主动发送的检测数据,回复应答
self.send_t95(T95_BT_MSG_TYPE_DATA_ACK, T95_BT_ACK_FLAG, seq, b"", b"")
msg_len = int.from_bytes(self.recv_msg[39:47], "big")
msg_data = self.recv_msg[47:47+msg_len]
file_len = int.from_bytes(self.recv_msg[47+msg_len:55+msg_len], "big")
file_data = self.recv_msg[msg_len+55: msg_len+55+file_len]
#通过回调函数来处理业务数据与文件
if self.check_file_notify_call != None:
self.check_file_notify_call(msg_data, file_data)
else:
#发送给其他线程处理
self.rcv_sem.release()
else:
self.recv_msg += d.to_bytes(1, "little")
self.recv_index += 1
首先怀疑的是代码逻辑有问题,经过阅读代码与测试,发现代码逻辑正确。其次怀疑设备的蓝牙驱动程序有问题,使用了手机与设备进行蓝牙文件传输也正常。把这份代码移植到笔记本电脑上面运行,测试大数量时依然为丢包。
最后怀疑代码执行慢了,导致未能及时从蓝牙设备中读取到接收的数据。
2、解决办法
分析代码,代码中变量self.recv_msg使用bytes字节串来存储接收到的数据,程序首先从蓝牙设备中读取数据存储在recv_data字节串中,之后遍历整个字节串,把数据放到self.recv_msg中去。
理论分析:由于bytes是不可变字节串,self.recv_msg += d.to_bytes(1, "little")这名代码相当把字节串与一个新的字节串d.to_bytes(1, "little")连接形成一个新的字节串,这此操作中一共要处理3个字节串,都需要在内存中创建这三个字节串,随着接收数据量的变大,会耗费更多的时间处理。
理论验证:编写python程序测试list, bytes, bytearray三种数据类型在大数据量时的处理速度,测试的数据量为100万个数据。
#! /usr/bin/python3.8
# -*- coding: utf-8 -*-
import time
num = 1000000
a = []
b = bytearray()
bb = b"\x02" * num
d = b""
start_time = time.time()
for i in range(num):
a.append(i&0xff)
stop_time = time.time()
print("list start time:%d, end time:%ds, pass:%fs"%(start_time, stop_time, stop_time-start_time))
start_time = time.time()
for i in bb:
b.extend((i&0xff).to_bytes(1, "little"))
stop_time = time.time()
print("bytearray start time:%d, end time:%ds, pass:%fs"%(start_time, stop_time, stop_time-start_time))
start_time = time.time()
for i in bb:
d +=(i&0xff).to_bytes(1, "little")
stop_time = time.time()
print("byte start time:%d, end time:%ds, pass:%fs"%(start_time, stop_time, stop_time-start_time))
c = bytes(b)
print("bytearray",b[0:5])
print("int",int.from_bytes(b[0:5], byteorder="big"))
for i in c:
d = i
print("bytes", c[0:23])
代码的运行结果如下:
从以上结果可以看出,处理100万个数据时,list速度最快,用时0.126秒,bytearray,居中,用时0.270秒, bytes最慢,用于98秒。根据程序的处理数据的需要,代码改动少的情况下使用bytearray来处理,可以提高处理速度。改进后的代码如下:
#蓝牙数据接收处理线程
#由于bytes是不可变序列,因此在做增加操作时,会新生成一个bytes导致程序处理速度很慢,因此使用可变序列bytearray
def bt_recv_thread(self):
recv_time = 0
while(self.thread_run):
try:
recv_data = self.client_socket.recv(1024)
except Exception as e:
self.log.logprint(e)
self.thread_run = False
break
#print("recv data:", recv_data.hex())
#接收2秒超时,清除原来的接收数据
if self.recv_index != 0 and (time.time() - recv_time) > 2:
self.log.logprint("recv data timeout, index:%d, msg_len:%d"%(self.recv_index, self.msg_len))
self.recv_index = 0
self.msg_len = 0
recv_time = time.time()
for d in recv_data:
d_bytes = d.to_bytes(1, "little")
if d_bytes == T95_BT_HEAD_1 and self.recv_index == 0:
self.msg_len = 0
self.recv_index = 1
self.recv_msg = bytearray(d_bytes)
elif d_bytes == T95_BT_HEAD_2 and self.recv_index == 1:
self.recv_index = 2
self.recv_msg.extend(d_bytes)
elif d_bytes == T95_BT_HEAD_3 and self.recv_index == 2:
self.recv_index = 3
self.recv_msg.extend(d_bytes)
elif d_bytes == T95_BT_HEAD_4 and self.recv_index == 3:
self.recv_index = 4
self.recv_msg.extend(d_bytes)
#8-15是报文总长度, 占用8个字节
elif self.recv_index == 15:
self.recv_msg.extend(d_bytes)
self.recv_index += 1
self.msg_len = int.from_bytes(self.recv_msg[8:16], byteorder="big")
#接收到帧尾,并且长度等于帧头
elif self.recv_index == (self.msg_len - 1) and d_bytes == T95_BT_TAIL:
self.recv_msg.extend(d_bytes)
self.recv_index = 0
#处理消费一个整帧数据,计算数据CRC
crc = int.from_bytes(self.recv_msg[-5:-1], byteorder="big")
if crc != self.CRC32_cacl(self.recv_msg[:-5]):
self.log.logprint("CRC 32 error!")
continue
self.log.logprint("receive the data len:%d"%(self.msg_len))
#判断数据帧类型
msg_type = int.from_bytes(self.recv_msg[16:20], byteorder="big")
seq = int.from_bytes(self.recv_msg[5:7], byteorder="big")
if msg_type == T95_BT_MSG_TYPE_CON_REQ:
#局放主动发送的心跳数据,回复应答
self.send_t95(T95_BT_MSG_TYPE_CON_ACK, T95_BT_ACK_FLAG, seq, b"", b"")
elif msg_type == T95_BT_MSG_TYPE_DATA_UPLOAD:
#局放主动发送的检测数据,回复应答
self.send_t95(T95_BT_MSG_TYPE_DATA_ACK, T95_BT_ACK_FLAG, seq, b"", b"")
msg_len = int.from_bytes(self.recv_msg[39:47], "big")
msg_data = self.recv_msg[47:47+msg_len]
file_len = int.from_bytes(self.recv_msg[47+msg_len:55+msg_len], "big")
file_data = self.recv_msg[msg_len+55: msg_len+55+file_len]
#通过回调函数来处理业务数据与文件
if self.check_file_notify_call != None:
self.check_file_notify_call(msg_data, file_data)
else:
#发送给其他线程处理
self.rcv_sem.release()
else:
self.recv_msg.extend(d_bytes)
self.recv_index += 1
self.client_socket.close()
self.client_socket = None
经过以上的代码优化和测试后,蓝牙传输大量数据时不再丢包,完美解决问题。