最近一直在学习网络编程,今天把 socket部分做一个总结。
Python 的socket库可以实现不同协议不同地址的发包和收包,无奈资料很少,官方例子有限,大神博客很少提及, 经过一番尝试后,总结以下几点用法以便大家以后使用。
client端
import socket
import time
import random
import string
# 定义一些常量,变量名可以变。自己好记就行
buffer = 1024 # 发送缓冲区大小,这里是1KB
data_size = 128 # 每个UDP数据包的大小
bandwidth = 1 # 目标带宽,1 Mbps
PACKETS_PER_SECOND = bandwidth * 1024 * 1024 / (8 * data_size) # 计算每秒需要发送的数据包数量
# 目标服务器的IP和端口
SERVER_IP = '127.0.0.1' # 请替换为实际服务器IP
SERVER_PORT = 1245 # 请替换为实际端口
def generate_random_data(size):
"""生成随机数据,生成往UDP包中放的随机值"""
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(size)).encode()
def udp_main():
# 创建UDP套接字
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"Starting UDP bandwidth test to {SERVER_IP}:{SERVER_PORT} aiming for {bandwidth} Mbps...")
start_time = time.time()
packets_sent = 0
try:
while True:
# 生成随机数据
data = generate_random_data(data_size)
# 发送数据包
client_socket.sendto(data, (SERVER_IP, SERVER_PORT))
#包计数
packets_sent += 1
# 控制发送速率以达到目标带宽
elapsed_time = time.time() - start_time
if elapsed_time >= 1:
#实际带宽,怎么算需要记一下
actual_bandwidth = (packets_sent * data_size * 8) / (elapsed_time * 1024 * 1024)
print(f"Actual bandwidth: {actual_bandwidth:.2f} Mbps")
#重新计算包
packets_sent = 0
start_time = time.time()
time.sleep(1 / PACKETS_PER_SECOND) # 等待,控制发送频率
except KeyboardInterrupt:
print("\nBandwidth test interrupted by user.")
finally:
client_socket.close()
def tcp_main():
client_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client_socket.connect((SERVER_IP, SERVER_PORT))
start_time = time.time()
packets_sent = 0
try:
while True:
data = generate_random_data(data_size)
client_socket.send(data)
packets_sent += 1
elapsed_time = time.time() - start_time
if elapsed_time >= 1:
client_socket.send("q".encode())
packets_sent = 0
start_time = time.time()
acutalwidth = (packets_sent * data_size * 8) / (elapsed_time * 1024 * 1024)
print(f"Actual bandwidth: {acutalwidth:.2f} Mbps")
time.sleep(1/PACKETS_PER_SECOND)
except KeyboardInterrupt:
print("\nBandwidth test interrupted by user.")
finally:
client_socket.close()
if __name__ == "__main__":
tcp_main(
server端
import socket
def Server_Socket():
serverip = "0.0.0.0"
serverport = 8080
#1、2 创建UDP的socket
server_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
server_socket.bind((serverip,serverport))
print(f"udp server {serverip}:{serverport}")
try:
while True:
#3、接收客户端发来的值
data,addr = server_socket.recvfrom(1024)
print(f"recv from {addr}:{data.decode('utf-8')}") #print的内容不用背
except KeyboardInterrupt:
print("^C received, shutting down the server") #无所谓不用背具体详情
finally:
server_socket.close()
def TCP_socket():
serverip = "127.0.0.1"
serverport = 1245
server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server_socket.bind((serverip,serverport))
print(f"start listening:")
server_socket.listen(1024)
print(f"tcp socket: {serverip}:{serverport}")
try:
dataconn,addr = server_socket.accept()
print(f"client{dataconn}:{addr}")
while True:
data = dataconn.recv(1024)
print(f"recv from {addr},recv data:{data.decode('utf-8')}")
if data.decode('utf-8') == "q":
break
dataconn.send("已送达".encode('utf-8'))
except KeyboardInterrupt:
print("^C received, shutting down the server")
finally:
server_socket.close()
if __name__ == "__main__":
TCP_socket()



















