python socket编程
- 概念说明
- 心跳包的作用
- 基于以上知识的基础上我们来实现一个代码
- socket server
- socket client
- 执行结果
概念说明
socket传输数据是基于字节流的,默认情况下是无边界的字节流。 一般情况下数据里中包含心跳包和数据包。数据包就是我们实际需要发送给server端的数据。
心跳包的作用
有人可能会有疑问,tcp协不是长连接么,只要建立了连接在不出意外的情况下不是可以一直维持连接吗?为什么还需要心跳包。
在TCP/IP网络通信中,尽管TCP连接本身是面向连接的、可靠的传输层协议,并且TCP协议自身包含了许多机制来确保数据的可靠传输(如序列号、确认应答、超时重传等),但这并不意味着TCP连接就是“永远在线”或“长连接”就一定不会出现问题。实际上,长连接(也称为持久连接)指的是TCP连接在数据传输完成后不会被立即关闭,而是保持一段时间以便后续的数据传输,这样可以减少因为频繁建立和关闭连接所带来的开销。
然而,即使使用了长连接,也仍然可能遇到以下问题:
- 网络问题:网络中的设备(如路由器、交换机、防火墙等)可能会因为各种原因(如负载均衡、故障恢复、安全策略等)而中断或重置空闲的TCP连接。这些设备通常会有超时时间设置,用于清理长时间没有数据交换的连接。
- NAT(网络地址转换)超时:在使用NAT的私有网络中,NAT设备可能会因为超时而丢弃空闲的TCP连接。这是因为NAT表项资源有限,需要回收长时间未使用的表项以释放资源。
- 应用程序故障:应用程序本身也可能因为异常、崩溃或重启等原因而失去对TCP连接的跟踪,导致连接处于“挂起”状态。
- 负载均衡器或代理服务器:在客户端和服务器之间部署负载均衡器或代理服务器时,这些中间设备也可能因为超时或负载均衡策略而关闭空闲连接。
为了解决这些问题,心跳包(Heartbeat Packet)被引入。心跳包是一种定期发送的小数据包,用于在客户端和服务器之间保持连接的活跃状态,并通知对方自己仍然在线且愿意继续通信。通过发送心跳包,可以:
检测死连接:如果一方长时间没有收到对方的心跳包,则可以认为对方已经不可用,从而关闭连接并释放资源。
绕过NAT超时:定期的心跳包可以确保NAT表项不被超时删除,从而保持连接的活性。
保持负载均衡器或代理服务器的连接状态:类似地,心跳包也可以帮助维持负载均衡器或代理服务器中的连接状态,防止它们因为超时而关闭连接。
因此,尽管TCP连接本身是面向连接的,但在长连接场景中,心跳包仍然是保持连接活跃性和可靠性的重要手段之一。
基于以上知识的基础上我们来实现一个代码
socket server
import socket
import struct
import json
import time
import threading
"""
socket_server.py
~~~~~~~~~~~~~~~~
socket server
"""
import logging
logger = logging.getLogger("[SOCKET]")
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
streamHandler.setLevel(logging.DEBUG)
streamHandler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(streamHandler)
lock = threading.RLock
class SocketServer(object):
def __init__(self, host='127.0.0.1', port=9090):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def _t(self):
while True:
time.sleep(0.01)
# 读取4字节头长
header_length = self._read_4()
# 读取头
header = json.loads(self._read_header(header_length).decode("utf-8"))
msg_type = header["msg_type"]
if msg_type == "heart":
logger.info("header=%s,是心跳包" % header)
elif msg_type == "data":
body_length = header["body_length"]
body = json.loads(self._read_body(body_length).decode("utf-8"))
logger.info(f"header=%s, 数据包 data=%s" %(header, body))
else:
logger.error(f"Received Unknown msg_type from {addr}")
def start(self):
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
# 接受客户端连接
self.client_socket, addr = self.server_socket.accept()
print(f"Connected by {addr}")
thread = threading.Thread(target=self._t)
thread.start()
def _read_4(self):
data = b''
while True:
data += self.client_socket.recv(4 - len(data))
if len(data) == 4:
break
header_length = struct.unpack('>I', data)[0] # 将 4 字节数据解析为整数
return header_length
def _read_header(self, header_length):
data = b''
while len(data) < header_length: # 确保读取完整的 header_length 字节
data_chunk = self.client_socket.recv(header_length - len(data))
data += data_chunk
return data
def _read_body(self, body_length):
data = b''
while len(data) < body_length: # 确保读取完整的 body_length 字节
data_chunk = self.client_socket.recv(body_length - len(data))
data += data_chunk
return data
# 启动server
if __name__ == "__main__":
server = SocketServer()
server.start()
socket client
import socket
import struct
import json
import time
import threading
class SocketClient(object):
def __init__(self, host='127.0.0.1', port=9090):
self.host = host
self.port = port
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.lock=threading.RLock()
def connect(self):
self.client_socket.connect((self.host, self.port))
def _send_heartbeat(self):
while True:
self.lock.acquire()
header_dict = {
"msg_type": "heart",
"body_length": 0 # 因为心跳包没有body
}
header = json.dumps(header_dict)
header_length = struct.pack('>I', len(header)) # 打包header长度为4字节的整数
self.client_socket.send(header_length)
self.client_socket.send(header.encode('utf-8'))
logger.info("发送头。。。。。。")
self.lock.release()
time.sleep(5)
def send_data(self, data):
while True:
self.lock.acquire()
body = json.dumps(data)
header_dict = {
"msg_type": "data",
"body_length": len(body)
}
header = json.dumps(header_dict)
header_length_packed = struct.pack('>I', len(header)) # 打包header长度的长度为4字节的整数
# 先发送4字节长度的header长度
self.client_socket.send(header_length_packed)
# 再发送header
self.client_socket.send(header.encode('utf-8'))
# 最后发送body
self.client_socket.send(body.encode('utf-8'))
logger.info("发送数据。。。。。。成功")
self.lock.release()
time.sleep(3)
def close(self):
self.client_socket.close()
# 启动客户端
if __name__ == "__main__":
client = SocketClient()
client.connect()
# 发送心跳包
t_heart = threading.Thread(target=client._send_heartbeat)
t_heart.start()
# 发送数据包
t_data = threading.Thread(target=client.send_data, args=({"key": "value", "number": 123},))
t_data.start()
t_heart.join()
t_data.join()