写c++程序时经常会有发送私有化协议的过程,比如头结构+数据包,数据包往往是一个结构体,有时为了方便调试会用python写一些测试程序。
发送的包的结构图示例如下:
接收包的结构图如下:
当然接收的RespBody会很多, iCount为1时即为图上所示,如果为2,则还要再画一个RespBody
想法很简单,将要发送的数据结构用python写出类:
# 构造要发送的二进制数据
class ReqHead: # 12 byte
def __init__(self):
self.iCheck = 64 # u16
self.iLen = 0 # u32
def pack(self):
return struct.pack('<HI',self.iCheck, self.iLen)
class ReqBody:
def __init__(self):
self.m_lTime = int(datetime.now().timestamp()) - (int(datetime.now().timestamp()) % 86400) - 86400 # 获取当天前1天时间戳
def pack(self):
return struct.pack('<I', self.m_lTime)
class RespHead: # 8
def __init__(self):
self.m_lTime = 0 # long
self.m_uCount = 0 # u32 ///< 回包结构体条目数量
def unpack(self, data):
self.m_lTime, self.m_uCount = struct.unpack('<iI', data)
class RespBody: # 26
def __init__(self):
self.m_uiMarket = 0 # 市场 u16
self.m_szCode = "" # 代码 s8
def unpack(self, data):
self.m_uiMarket, code = struct.unpack('<H8s', data)
self.m_szCode = code.decode('utf-8')
对于请求的,将数据打包成二进制数,对于返回的将二进制数拆解成类结构,在这里pack即打包,unpack为拆解。
然后按照先发送请求,再按结构读取的过程即可。
req_head = ReqHead()
req_body = ReqBody()
req_head.iLen = 4 # req_body的长度
# 建立 TCP 连接并发送数据
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s):
s.connect((TCP_IP, TCP_PORT))
s.sendall(req_head.pack() + req_body.pack()) # 发送数据
data = s.recv(8) # 返回头是8字节
resp_head = RespHead()
resp_head.unpack(data)
for i in range(resp_head.m_uCount):
data = s.recv(10)
resp_body = RespBody()
resp_body.unpack(data)
print(resp_body.m_uiMarket, resp_body.m_szCode)
当然,最后要加上对应的头文件:
import socket import struct from datetime import datetime
小结:
这个过程看起来挺简单,并且也实现了二进制结构的测试功能。代码也很清晰易懂。
不过这是一个玩具程序,实际的工程会异常复杂,如果按这种逻辑来写的话会很累,且后期维护也会非常不易,比如巨大的结构中增加或删除某些变量,某些类型有所改变,改动的过程会很辛苦,因为拆解包时需要一个个核对,稍有不慎就会错误一大堆。