✅ 功能特性:
1. 高并发支持:采用 threading.Thread + socket,可承载多并发连接
2. 异常处理完善:确保线程内异常不会崩溃整个程序
3. 可持续运行:守护线程 + 主线程监控机制
4. 运行状态监控:
• 当前活跃连接数
• 累计已完成请求数
5. 实时日志记录:每次连接及关闭、异常都记录到控制台,可接入日志文件
6. 详细代码注释:便于后续维护或交接
📦 优化后的完整代码(Python 3.x)
import socket
import threading
import time
import logging
# ---------------- 配置参数 ----------------
LOCAL_HOST = '0.0.0.0' # 中间服务器监听地址
LOCAL_PORT = 8000 # 中间服务器监听端口
REMOTE_HOST = '192.168.1.100' # 目标服务器地址
REMOTE_PORT = 9000 # 目标服务器端口
# ---------------- 日志配置 ----------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("tcp_proxy.log"),
logging.StreamHandler()
]
)
# ---------------- 运行状态监控变量 ----------------
active_connections = 0 # 当前活跃连接数
total_completed_connections = 0 # 总共完成连接数
lock = threading.Lock() # 用于线程安全的状态变量访问
def forward(src_socket, dst_socket):
"""
数据转发线程:不断从 src_socket 读取数据,并写入 dst_socket
"""
try:
while True:
data = src_socket.recv(4096)
if not data:
break
dst_socket.sendall(data)
except Exception as e:
logging.warning(f"数据转发异常:{e}")
finally:
# 结束时关闭 socket
src_socket.close()
dst_socket.close()
def handle_client(client_socket, client_addr):
"""
客户端连接处理线程:连接目标服务器,并启动双向转发
"""
global active_connections, total_completed_connections
logging.info(f"[+] 接收到来自 {client_addr} 的连接")
# 更新活跃连接数
with lock:
active_connections += 1
try:
# 连接目标服务器
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect((REMOTE_HOST, REMOTE_PORT))
logging.info(f"[→] 成功连接目标服务器 {REMOTE_HOST}:{REMOTE_PORT}")
# 启动两个线程:客户端 -> 目标服务器,目标服务器 -> 客户端
t1 = threading.Thread(target=forward, args=(client_socket, remote_socket), daemon=True)
t2 = threading.Thread(target=forward, args=(remote_socket, client_socket), daemon=True)
t1.start()
t2.start()
# 等待两个线程结束
t1.join()
t2.join()
# 统计已完成请求
with lock:
total_completed_connections += 1
logging.info(f"[✓] 连接 {client_addr} 完成转发并已关闭")
except Exception as e:
logging.error(f"[×] 处理连接 {client_addr} 出错:{e}")
finally:
# 保证连接数一致
with lock:
active_connections -= 1
client_socket.close()
def monitor_status():
"""
监控线程:每10秒打印一次运行状态
"""
while True:
with lock:
logging.info(f"[监控] 当前活跃连接:{active_connections} | 累计完成连接:{total_completed_connections}")
time.sleep(10)
def start_proxy():
"""
主启动函数:监听端口,接收连接,分配处理线程
"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((LOCAL_HOST, LOCAL_PORT))
server.listen(100) # 最大监听队列数
logging.info(f"[启动] TCP 中转服务启动,监听 {LOCAL_HOST}:{LOCAL_PORT}")
# 启动状态监控线程
threading.Thread(target=monitor_status, daemon=True).start()
while True:
try:
client_socket, client_addr = server.accept()
handler_thread = threading.Thread(target=handle_client, args=(client_socket, client_addr), daemon=True)
handler_thread.start()
except Exception as e:
logging.error(f"[×] 接收连接出错:{e}")
if __name__ == '__main__':
start_proxy()
🧩 说明
部分 描述
forward() 双向数据转发函数,保持 TCP 长连接
handle_client() 为每个客户端建立目标连接并开启转发线程
monitor_status() 每10秒打印一次当前连接状态和统计信息
logging 日志记录模块,可写入文件并打印到控制台
lock 线程安全更新全局变量
📈 监控扩展建议(可选)
如果后续需要对接 Prometheus 或可视化平台,可以:
• 使用 prometheus_client 模块暴露指标(如 /metrics HTTP 接口)
• 将日志接入 ELK、Loki、OpenObserve 等日志系统
🧪 使用 Python 通过 TCP 中转器请求百度搜索“老师”(测试用)
🧩 场景说明
你已经实现了一个 TCP 中转代理程序(监听 localhost:8000),现在你需要一个 Python 客户端程序,通过这个代理向百度发送 HTTP 请求,搜索关键词“老师”,用来测试中转器的转发是否正常工作。
📌 流程示意图
Python 测试程序
↓(原始 TCP 请求)
中转器 (localhost:8000)
↓(TCP 转发)
百度服务器 (www.baidu.com:80)
✅ 测试代码(带详细注释)
import socket
# ------------------------- 配置中转器地址 -------------------------
PROXY_HOST = '127.0.0.1' # 中间服务器地址(假设你的转发器就在本机)
PROXY_PORT = 8000 # 中间服务器监听的端口
# ------------------------- 构造 HTTP 请求 -------------------------
# 目标请求是百度搜索“老师”
keyword = "老师"
path = f"/s?wd={keyword}" # 百度搜索路径,GET 请求参数放在 URL 中
# 注意:这里请求的是百度服务器,但是通过中转器转发出去
http_request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: www.baidu.com\r\n"
f"User-Agent: Python-TCP-Test\r\n"
f"Connection: close\r\n"
f"\r\n"
)
# ------------------------- 通过 TCP socket 请求 -------------------------
try:
# 1. 建立与中间服务器的 TCP 连接
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((PROXY_HOST, PROXY_PORT))
print(f"✅ 成功连接中转服务器 {PROXY_HOST}:{PROXY_PORT}")
# 2. 发送 HTTP 请求(给百度服务器的请求,但通过中转器发)
client_socket.sendall(http_request.encode('utf-8'))
print(f"📤 已发送百度搜索请求:{path}")
# 3. 接收响应内容
response_data = b""
while True:
chunk = client_socket.recv(4096)
if not chunk:
break
response_data += chunk
# 4. 打印前部分响应内容(HTML 页面)
print("📥 收到响应内容:")
print(response_data.decode('utf-8', errors='ignore')[:500])
print("...(以下省略)")
except Exception as e:
print(f"❌ 发生异常:{e}")
finally:
# 5. 关闭连接
client_socket.close()
print("🔚 已关闭连接")
🔧 配置中转器(回顾)
你的 TCP 中转程序应设置如下目标地址(转发到百度):
REMOTE_HOST = "www.baidu.com"
REMOTE_PORT = 80
📝 注意事项
• 请确保你的中转器程序已经启动并监听正确的端口
• 中转器的机器能访问外网(百度)
• 百度响应的是 HTML 页面,你可以看到类似 … 的网页内容
🚀 下一步建议(可选扩展)
• 使用 BeautifulSoup 提取网页中搜索结果标题
• 用 threading 发送多个并发请求测试中转器性能
• 将响应写入文件用于调试排查
如需我写并发测试版本或 HTML 内容解析器,也可以继续提!😎