Python套接字综合应用(UDP篇)
1、 主要功能
- UDP客户端实现
- UDP服务端实现
- 输出字体颜色控制
- 响应捕获键盘
Ctrl+C
信号 - 套接字异常捕获及处理
- 通信报文16进制格式化输出
2、 Python UDP套接字应用
Windows程序在WinServer2022上验证运行,Linux程序在银河麒麟V10上验证运行。
通过《网络调试助手》进行调试验证,工具运行界面如下:
①、 Linux服务端
server.py
# -*- coding: gbk -*-
import socket
import datetime
#服务端参数设置
listen_addr = "192.168.58.145"
listen_port = 1281
#输出字体颜色控制
TEXT_COLOR_WHITE = '\033[97m'
TEXT_COLOR_MAGENTA = '\033[95m'
TEXT_COLOR_BLUE = '\033[94m'
TEXT_COLOR_YELLOW = '\033[93m'
TEXT_COLOR_GREEN = '\033[92m'
TEXT_COLOR_RED = '\033[91m'
TXET_COLOR_DEFAULT = '\033[0m'
#输出字体大小控制
TEXT_FONT_WEIGHT_DEFAULT = '\033[0m'
TEXT_FONT_WEIGHT_BOLD = '\033[1m\033[5m'
TEXT_FONT_WEIGHT_THIN = '\033[2m\033[3m'
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定地址
sock.bind((listen_addr, listen_port))
#3. 接收数据
while True:
rx_msg, peer_endpoint = sock.recvfrom(2048)
print('\n')
# 获取当前时间
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
#打印客户地址
str_peer = "[{}]".format(peer_endpoint)
str_rx = "[RX]"
#打印头部信息
print(f"%s" % str_rx + str_now + str_peer)
#接收内容,格式化16进制
line = ['%02X' % i for i in rx_msg]
str_rx_msg = " ".join(line)
print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg)
#发送信息给对方
tx_msg = rx_msg
sock.sendto(tx_msg,peer_endpoint)
str_tx = "[TX]"
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
print(f"%s" % str_tx + str_now + str_peer)
#发送内容格式化
line = ['%02X' % i for i in tx_msg]
str_tx_msg = " ".join(line)
print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % str_tx_msg)
sock.close()
运行效果
②、 Linux客户端
client.py
# -*- coding: gbk -*-
import socket
import datetime
import time
#本地网络参数设置
local_bind_addr = "192.168.58.145"
local_bind_port = 1281
#对方网络参数设置
remote_ep_addr = "192.168.58.1"
remote_ep_port = 50001
#交互模式
#[0]: 固定次数、固定间隔发送,用户只输入一次
#[1]: 每次要求用户输入,用户输入后才发送(默认方式)
tx_mode = 0
#只有tx_mode为[0]时生效
tx_count = 10
tx_interval_second = 2
#输出字体颜色控制
TEXT_COLOR_WHITE = '\033[97m'
TEXT_COLOR_MAGENTA = '\033[95m'
TEXT_COLOR_BLUE = '\033[94m'
TEXT_COLOR_YELLOW = '\033[93m'
TEXT_COLOR_GREEN = '\033[92m'
TEXT_COLOR_RED = '\033[91m'
TXET_COLOR_DEFAULT = '\033[0m'
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定本地地址
sock.bind((local_bind_addr, local_bind_port))
#设置接收超时时间(单位为秒)
timeout_seconds = 2
sock.settimeout(timeout_seconds)
#每次要求用户输入,用户输入后才发送(默认方式)
if tx_mode == 1 :
while True:
#3. 接收用户输入
input_stream = input("请输入16进制发送报文数据,以空格分隔:")
#4. 发送数据
sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
str_tx = "[TX]"
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
print(f"%s" % str_tx + str_now + str_remote_addr)
#发送内容格式化
print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
#5.接收数据
try:
rx_msg, peer_endpoint = sock.recvfrom(2048)
print('\n')
# 获取当前时间
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
#打印对端地址
str_peer = "[{}]".format(peer_endpoint)
str_rx = "[RX]"
#打印头部信息
print(f"%s" % str_rx + str_now + str_peer)
#接收内容,格式化16进制
line = ['%02X' % i for i in rx_msg]
str_rx_msg = " ".join(line)
print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
except socket.timeout:
#超时,继续
continue
#固定次数、固定间隔发送,用户只输入一次
elif tx_mode == 0 :
#3. 接收用户输入
input_stream = input("请输入16进制发送报文数据,以空格分隔:")
while tx_count > 0:
#4. 发送数据
sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
str_tx = "[TX]"
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
print(f"%s" % str_tx + str_now + str_remote_addr)
#发送内容格式化
print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
#5.接收数据
try:
rx_msg, peer_endpoint = sock.recvfrom(2048)
print('\n')
# 获取当前时间
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
#打印对端地址
str_peer = "[{}]".format(peer_endpoint)
str_rx = "[RX]"
#打印头部信息
print(f"%s" % str_rx + str_now + str_peer)
#接收内容,格式化16进制
line = ['%02X' % i for i in rx_msg]
str_rx_msg = " ".join(line)
print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
except socket.timeout:
#接收超时
continue
except Exception as e:
print(f"[处理异常]: {e}")
print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
sock.close()
sys.exit(0)
tx_count -= 1
time.sleep(tx_interval_second)
sock.close()
运行效果
③、 Windows服务端
server.py
# -*- coding: gbk -*-
import socket
import datetime
import sys
#服务端参数设置
listen_addr = "192.168.58.151"
listen_port = 1281
#输出字体颜色控制
TEXT_COLOR_WHITE = '\033[1;30m'
TEXT_COLOR_GRAY = '\033[1;37m'
TEXT_COLOR_MAGENTA = '\033[1;35m'
TEXT_COLOR_BLUE = '\033[1;34m'
TEXT_COLOR_YELLOW = '\033[1;33m'
TEXT_COLOR_GREEN = '\033[1;32m'
TEXT_COLOR_RED = '\033[1;31m'
TXET_COLOR_DEFAULT = '\033[0m'
#输出字体大小控制
TEXT_FONT_WEIGHT_DEFAULT = '\033[0m'
TEXT_FONT_WEIGHT_BOLD = '\033[1m\033[5m'
TEXT_FONT_WEIGHT_THIN = '\033[2m\033[3m'
# "\033"是转义序列的开始,后面跟着一个或多个字符来指定具体的样式。
# [0m表示默认样式,[1m表示加粗,[2m表示常规,[5m表示放大,[3m表示缩小。
#仅限Windows系统
#print颜色控制失败时调用如下语句
import os
if os.name == "nt":
os.system("")
#捕获键盘Ctrl+C信号
import signal
def signal_handler(signal,code):
print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
sys.exit(0)
signal.signal(signal.SIGINT,signal_handler)
signal.signal(signal.SIGTERM,signal_handler)
#16进制格式化输出
def print_hex(bytes):
line = ['%02X' % i for i in bytes]
print(" ".join(line))
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定地址
sock.bind((listen_addr, listen_port))
#3. 设置接收超时时间(单位为秒)
timeout_seconds = 2
sock.settimeout(timeout_seconds)
#4. 接收数据
while True:
try:
rx_msg, peer_endpoint = sock.recvfrom(2048)
print('\n')
# 获取当前时间
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
#打印客户地址
str_peer = "[{}]".format(peer_endpoint)
str_rx = "[RX]"
#打印头部信息
print(f"{TEXT_COLOR_GRAY}%s{TXET_COLOR_DEFAULT}" % str_rx + str_now + str_peer)
#接收内容,格式化16进制
line = ['%02X' % i for i in rx_msg]
str_rx_msg = " ".join(line)
print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % str_rx_msg)
#发送信息给对方
tx_msg = rx_msg
sock.sendto(tx_msg,peer_endpoint)
str_tx = "[TX]"
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
print(f"{TEXT_COLOR_GRAY}%s{TXET_COLOR_DEFAULT}" % str_tx + str_now + str_peer)
#发送内容格式化
line = ['%02X' % i for i in tx_msg]
str_tx_msg = " ".join(line)
print(f"{TEXT_COLOR_GREEN}%s{TXET_COLOR_DEFAULT}" % str_tx_msg)
except socket.timeout:
#超时,继续
continue
sock.close()
运行效果
④、 Windows客户端
client.py
# -*- coding: gbk -*-
import socket
import datetime
import time
#本地网络参数设置
local_bind_addr = "192.168.58.131"
local_bind_port = 1281
#对方网络参数设置
remote_ep_addr = "192.168.58.1"
remote_ep_port = 50001
#交互模式
#[0]: 固定次数、固定间隔发送,用户只输入一次
#[1]: 每次要求用户输入,用户输入后才发送(默认方式)
tx_mode = 0
#只有tx_mode为[0]时生效
tx_count = 10
tx_interval_second = 2
#仅限Windows系统
#print颜色控制失败时调用如下语句
import os
if os.name == "nt":
os.system("")
#捕获键盘Ctrl+C信号
import signal
def signal_handler(signal,code):
print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
sys.exit(0)
signal.signal(signal.SIGINT,signal_handler)
signal.signal(signal.SIGTERM,signal_handler)
#输出字体颜色控制
TEXT_COLOR_WHITE = '\033[1;30m'
TEXT_COLOR_GRAY = '\033[1;37m'
TEXT_COLOR_MAGENTA = '\033[1;35m'
TEXT_COLOR_BLUE = '\033[1;34m'
TEXT_COLOR_YELLOW = '\033[1;33m'
TEXT_COLOR_GREEN = '\033[1;32m'
TEXT_COLOR_RED = '\033[1;31m'
TXET_COLOR_DEFAULT = '\033[0m'
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定本地地址
sock.bind((local_bind_addr, local_bind_port))
#设置接收超时时间(单位为秒)
timeout_seconds = 2
sock.settimeout(timeout_seconds)
#每次要求用户输入,用户输入后才发送(默认方式)
if tx_mode == 1 :
while True:
#3. 接收用户输入
input_stream = input("请输入16进制发送报文数据,以空格分隔:")
#4. 发送数据
sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
str_tx = "[TX]"
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
print(f"%s" % str_tx + str_now + str_remote_addr)
#发送内容格式化
print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
#5.接收数据
try:
rx_msg, peer_endpoint = sock.recvfrom(2048)
print('\n')
# 获取当前时间
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
#打印对端地址
str_peer = "[{}]".format(peer_endpoint)
str_rx = "[RX]"
#打印头部信息
print(f"%s" % str_rx + str_now + str_peer)
#接收内容,格式化16进制
line = ['%02X' % i for i in rx_msg]
str_rx_msg = " ".join(line)
print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
except socket.timeout:
#超时,继续
continue
#固定次数、固定间隔发送,用户只输入一次
elif tx_mode == 0 :
#3. 接收用户输入
input_stream = input("请输入16进制发送报文数据,以空格分隔:")
while tx_count > 0:
#4. 发送数据
sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
str_tx = "[TX]"
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
print(f"%s" % str_tx + str_now + str_remote_addr)
#发送内容格式化
print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
#5.接收数据
try:
rx_msg, peer_endpoint = sock.recvfrom(2048)
print('\n')
# 获取当前时间
now = datetime.datetime.now()
str_date = now.strftime('%Y-%m-%d %H:%M:%S')
str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
str_now = f"[{str_date}.{str_ms}]"
#打印对端地址
str_peer = "[{}]".format(peer_endpoint)
str_rx = "[RX]"
#打印头部信息
print(f"%s" % str_rx + str_now + str_peer)
#接收内容,格式化16进制
line = ['%02X' % i for i in rx_msg]
str_rx_msg = " ".join(line)
print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
except socket.timeout:
#接收超时
continue
except Exception as e:
print(f"[处理异常]: {e}")
print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
sock.close()
sys.exit(0)
tx_count -= 1
time.sleep(tx_interval_second)
sock.close()
运行效果