3. 网络并发模型
3.1 网络并发模型概述
-
什么是网络并发
在实际工作中,一个服务端程序往往要应对多个客户端同时发起访问的情况。如果让服务端程序能够更好的同时满足更多客户端网络请求的情形,这就是并发网络模型。
-
循环网络模型问题
循环网络模型只能循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。这样的网络模型虽然简单,资源占用不多,但是无法同时处理多个客户端请求就是其最大的弊端,往往只有在一些低频的小请求任务中才会使用。
3.2 多进程/线程并发模型
多进程/线程并发模中每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程,多任务并发模型也是实际工作中最为常用的服务端处理模型。
-
模型特点
- 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
- 缺点: 资源消耗较大
- 适用情况:客户端请求较复杂,需要长时间占有服务器。
-
创建流程
- 创建网络套接字
- 等待客户端连接
- 有客户端连接,则创建新的进程/线程具体处理客户端请求
- 主进程/线程继续等待处理其他客户端连接
- 如果客户端退出,则销毁对应的进程/线程
多进程并发模型示例:
"""
基于多进程的网络并发模型
重点代码 !!
创建tcp套接字
等待客户端连接
有客户端连接,则创建新的进程具体处理客户端请求
父进程继续等待处理其他客户端连接
如果客户端退出,则销毁对应的进程
"""
from socket import *
from multiprocessing import Process
import sys
# 地址变量
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST, PORT)
# 处理客户端具体请求
def handle(connfd):
while True:
data = connfd.recv(1024)
if not data:
break
print(data.decode())
connfd.close()
# 服务入口函数
def main():
# 创建tcp套接字
tcp_socket = socket()
tcp_socket.bind(ADDR)
tcp_socket.listen(5)
print("Listen the port %d"%PORT)
# 循环连接客户端
while True:
try:
connfd, addr = tcp_socket.accept()
print("Connect from", addr)
except KeyboardInterrupt:
tcp_socket.close()
sys.exit("服务结束")
# 创建进程 处理客户端请求
p = Process(target=handle, args=(connfd,),daemon=True)
p.start()
if __name__ == '__main__':
main()
多线程并发模型示例:
"""
基于多线程的网络并发模型
重点代码 !!
思路: 网络构建 线程搭建 / 具体处理请求
"""
from socket import *
from threading import Thread
# 处理客户端具体请求
class Handle:
# 具体处理请求函数 (逻辑处理,数据处理)
def request(self, data):
print(data)
# 创建线程得到请求
class ThreadServer(Thread):
def __init__(self, connfd):
self.connfd = connfd
self.handle = Handle()
super().__init__(daemon=True)
# 接收客户端的请求
def run(self):
while True:
data = self.connfd.recv(1024).decode()
if not data:
break
self.handle.request(data)
self.connfd.close()
# 网络搭建
class ConcurrentServer:
"""
提供网络功能
"""
def __init__(self, *, host="", port=0):
self.host = host
self.port = port
self.address = (host, port)
self.sock = self.__create_socket()
def __create_socket(self):
tcp_socket = socket()
tcp_socket.bind(self.address)
return tcp_socket
# 启动服务 --> 准备连接客户端
def serve_forever(self):
self.sock.listen(5)
print("Listen the port %d" % self.port)
while True:
connfd, addr = self.sock.accept()
print("Connect from", addr)
# 创建线程
t = ThreadServer(connfd)
t.start()
if __name__ == '__main__':
server = ConcurrentServer(host="0.0.0.0", port=8888)
server.serve_forever() # 启动服务
ftp 文件服务器
【1】 分为服务端和客户端,要求可以有多个客户端同时操作。
【2】 客户端可以查看服务器文件库中有什么文件。
【3】 客户端可以从文件库中下载文件到本地。
【4】 客户端可以上传一个本地文件到文件库。
【5】 使用print在客户端打印命令输入提示,引导操作
参考代码:
######################### 服务端 ############################
from socket import *
from threading import Thread
import os
from time import sleep
# 文件库
FTP = "/home/tarena/FTP/"
# 处理客户端具体请求
class Handle:
def __init__(self, connfd):
self.connfd = connfd
def do_list(self):
filelist = os.listdir(FTP)
if filelist:
self.connfd.send(b"OK")
sleep(0.1)
# 发送文件列表
files = "\n".join(filelist)
self.connfd.send(files.encode())
else:
self.connfd.send(b"FAIL")
def do_get(self, filename):
try:
file = open(FTP + filename, 'rb')
except:
self.connfd.send(b"FAIL")
else:
self.connfd.send(b"OK")
sleep(0.1)
# 发送文件
while True:
data = file.read(1024)
if not data:
break
self.connfd.send(data)
file.close()
sleep(0.1)
self.connfd.send(b"##")
def do_put(self, filename):
# 判断文件是否存在
if os.path.exists(FTP + filename):
self.connfd.send(b"FAIL")
else:
self.connfd.send(b"OK")
# 接收文件
file = open(FTP + filename, 'wb')
while True:
data = self.connfd.recv(1024)
if data == b"##":
break
file.write(data)
file.close()
def request(self):
while True:
data = self.connfd.recv(1024).decode()
# 分情况具体处理请求函数
tmp = data.split(' ')
if not data or tmp[0] == "EXIT":
break
elif tmp[0] == "LIST":
self.do_list()
elif tmp[0] == "GET":
# tmp-> [GET,filename]
self.do_get(tmp[1])
elif tmp[0] == "PUT":
self.do_put(tmp[1])
# 创建线程得到请求
class FTPThread(Thread):
def __init__(self, connfd):
self.connfd = connfd
self.handle = Handle(connfd)
super().__init__(daemon=True)
# 接收客户端的请求
def run(self):
self.handle.request()
self.connfd.close()
# 网络搭建
class ConcurrentServer:
"""
提供网络功能
"""
def __init__(self, *, host="", port=0):
self.host = host
self.port = port
self.address = (host, port)
self.sock = self.__create_socket()
def __create_socket(self):
tcp_socket = socket()
tcp_socket.bind(self.address)
return tcp_socket
# 启动服务 --> 准备连接客户端
def serve_forever(self):
self.sock.listen(5)
print("Listen the port %d" % self.port)
while True:
connfd, addr = self.sock.accept()
print("Connect from", addr)
# 创建线程
t = FTPThread(connfd)
t.start()
if __name__ == '__main__':
server = ConcurrentServer(host="0.0.0.0", port=8880)
server.serve_forever() # 启动服务
########################### 客户端 ###############################
"""
文件服务器客户端
"""
from socket import *
import sys
from time import sleep
# 具体发起请求,逻辑处理
class Handle:
def __init__(self):
self.server_address = ("127.0.0.1", 8880)
self.sock = self.__connect_server()
def __connect_server(self):
tcp_socket = socket()
tcp_socket.connect(self.server_address)
return tcp_socket
def do_list(self):
self.sock.send(b"LIST") # 发送请求
response = self.sock.recv(1024) # 接收响应
if response == b"OK":
# 接收文件列表 file1\nfile2\n..
files = self.sock.recv(1024 * 1024)
print(files.decode())
else:
print("获取文件列表失败")
def do_exit(self):
self.sock.send(b"EXIT")
self.sock.close()
sys.exit("谢谢使用")
def do_get(self, filename):
request = "GET " + filename
self.sock.send(request.encode()) # 发送请求
response = self.sock.recv(128) # 接收响应
if response == b"OK":
file = open(filename, 'wb')
# 接收文件内容,写入文件
while True:
data = self.sock.recv(1024)
if data == b"##":
break
file.write(data)
file.close()
else:
print("该文件不存在")
def do_put(self, filename):
try:
file = open(filename, 'rb')
except:
print("该文件不存在")
else:
filename = filename.split("/")[-1] # 获取文件名
request = "PUT " + filename
self.sock.send(request.encode())
response = self.sock.recv(128)
if response == b"OK":
# 发送文件
while True:
data = file.read(1024)
if not data:
break
self.sock.send(data)
file.close()
sleep(0.1)
self.sock.send(b"##")
else:
print("上传失败")
# 图形交互类
class FTPView:
def __init__(self):
self.__handle = Handle()
def __display_menu(self):
print()
print("1. 查看文件")
print("2. 下载文件")
print("3. 上传文件")
print("4. 退 出")
print()
def __select_menu(self):
item = input("请输入选项:")
if item == "1":
self.__handle.do_list()
elif item == "2":
filename = input("要下载的文件:")
self.__handle.do_get(filename)
elif item == "3":
filename = input("要上传的文件:")
self.__handle.do_put(filename)
elif item == "4":
self.__handle.do_exit()
else:
print("请输入正确选项!")
def main(self):
while True:
self.__display_menu()
self.__select_menu()
if __name__ == '__main__':
ftp = FTPView()
ftp.main() # 启动
3.3 IO并发模型
3.3.1 IO概述
-
什么是IO
在程序中存在读写数据操作行为的事件均是IO行为,比如终端输入输出 ,文件读写,数据库修改和网络消息收发等。
-
程序分类
- IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长。
- 计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞。
-
IO分类:阻塞IO ,非阻塞IO,IO多路复用等。
3.3.2 阻塞IO
- 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
- 效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为。
- 阻塞情况
- 因为某种执行条件没有满足造成的函数阻塞
e.g. accept input recv - 处理IO的时间较长产生的阻塞状态
e.g. 网络传输,大文件读写
- 因为某种执行条件没有满足造成的函数阻塞
3.3.3 非阻塞IO
- 定义 :通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态。
-
设置套接字为非阻塞IO
sockfd.setblocking(bool) 功能:设置套接字为非阻塞IO 参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
-
超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
sockfd.settimeout(sec) 功能:设置套接字的超时时间 参数:设置的时间
非阻塞IO示例: """ 设置非阻塞的套接字 """ from socket import * from time import sleep, ctime # 日志文件模拟与网络无关IO file = open("my.log", "a") # 创建tcp套接字 sock = socket() sock.bind(("127.0.0.1", 8888)) sock.listen(5) # 设置为非阻塞 # sock.setblocking(False) # 设置超时事件 sock.settimeout(3) # 循环处理客户端连接 while True: try: connfd, addr = sock.accept() print("Connect from", addr) except timeout as e: # 模拟一个与accept 无关的事件 msg = "%s : %s\n" % (ctime(), e) file.write(msg) except BlockingIOError as e: # 模拟一个与accept 无关的事件 msg = "%s : %s\n" % (ctime(), e) file.write(msg) sleep(2) else: # accept 正常执行 data = connfd.recv(1024) print(data.decode())
3.3.4 IO多路复用
-
定义
同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。
-
具体方案
- select方法 : Windows Linux Unix
- epoll方法: Linux
-
select 方法
rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数: rlist 列表 读IO列表,添加等待发生的或者可读的IO事件
wlist 列表 写IO列表,存放要可以主动处理的或者可写的IO事件
xlist 列表 异常IO列表,存放出现异常要处理的IO事件
timeout 超时时间
返回值: rs 列表 rlist中准备就绪的IO
ws 列表 wlist中准备就绪的IO
xs 列表 xlist中准备就绪的IO
select 方法示例:
"""
IO多路复用 基础演示 select
"""
from select import select
from socket import *
# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)
file = open("my.log",'r')
udp_sock = socket(AF_INET,SOCK_DGRAM)
print("开始监控IO")
rs,ws,xs = select([file,udp_sock],[file,udp_sock],[])
print("rlist:",rs)
print("wlist:",ws)
print("xlist:",xs)
- epoll方法
ep = select.epoll()
功能 : 创建epoll对象
返回值: epoll对象
ep.register(fd,event)
功能: 注册关注的IO事件
参数:fd 要关注的IO
event 要关注的IO事件类型
常用类型EPOLLIN 读IO事件(rlist)
EPOLLOUT 写IO事件 (wlist)
EPOLLERR 异常IO (xlist)
e.g. ep.register(sockfd,EPOLLIN|EPOLLERR)
ep.unregister(fd)
功能:取消对IO的关注
参数:IO对象或者IO对象的fileno
events = ep.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IO
events格式 [(fileno,event),()....]
每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
epoll方法示例:
"""
IO多路复用 基础演示 epoll
"""
from select import *
from socket import *
# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)
file = open("my.log",'r+')
udp_sock = socket(AF_INET,SOCK_DGRAM)
# 创建epoll对象
ep = epoll()
# 关注IO对象
ep.register(tcp_sock,EPOLLIN)
ep.register(udp_sock,EPOLLOUT|EPOLLERR)
# 建立查找字典
map = {
tcp_sock.fileno():tcp_sock,
udp_sock.fileno():udp_sock,
}
print("开始监控IO")
events = ep.poll()
print(events) # 就绪的IO
# 不再关注
ep.unregister(udp_sock)
del map[udp_sock.fileno()]
- select 方法与epoll方法对比
- epoll 效率比select要高
- epoll 同时监控IO数量比select要多
- epoll 支持EPOLLET触发方式
3.3.5 IO并发模型
利用IO多路复用等技术,同时处理多个客户端IO请求。
-
优点 : 资源消耗少,能同时高效处理多个IO行为
-
缺点 : 只针对处理并发产生的IO事件
-
适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求。
-
网络并发服务实现过程
【1】将套接字对象设置为关注的IO,通常设置为非阻塞状态。
【2】通过IO多路复用方法提交,进行IO监控。
【3】阻塞等待,当监控的IO有事件发生时结束阻塞。
【4】遍历返回值列表,确定就绪的IO事件类型。
【5】处理发生的IO事件。
【6】继续循环监控IO发生。
IO多路复用并发模型
################################# select 方法 ####################################
"""
基于select的并发服务模型
使用函数完成
"""
from select import select
from socket import *
# 服务器地址
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST,PORT)
# 监控列表
rlist = []
wlist = []
xlist = []
# 处理客户端连接
def connect_client(sock):
connfd, addr = sock.accept()
print("Connect from", addr)
connfd.setblocking(False)
rlist.append(connfd) # 增加关注对象
# 处理客户端消息
def handle_client(connfd):
data = connfd.recv(1024)
# 处理客户端退出
if not data:
rlist.remove(connfd) # 不再关注
connfd.close()
return
print(data.decode())
connfd.send(b"Thanks")
def main():
# 创建监听套接字
sock = socket()
sock.bind(ADDR)
sock.listen(3)
# 配合非阻塞IO防止网络中断带来的内部阻塞
sock.setblocking(False)
rlist.append(sock) # 初始监控的IO对象
# 循环监控关注的IO发生
while True:
rs,ws,xs = select(rlist,wlist,xlist)
for r in rs:
if r is sock:
connect_client(r) # 连接客户端
else:
handle_client(r) # 处理客户端消息
if __name__ == '__main__':
main()
################################ epoll 方法 ################################
"""
基于epoll的并发服务模型
使用类实现
"""
from select import *
from socket import *
class EpollServer:
def __init__(self, host="", port=0):
self.host = host
self.port = port
self.address = (host, port)
self.sock = self._create_socket()
self.ep = epoll()
self.map = {} # 查找字典
def _create_socket(self):
sock = socket()
sock.bind(self.address)
sock.setblocking(False)
return sock
# 处理客户端连接
def _connect_client(self, fd):
connfd, addr = self.map[fd].accept()
print("Connect from", addr)
connfd.setblocking(False)
# 增加关注对象,设置边缘触发
self.ep.register(connfd, EPOLLIN | EPOLLET)
self.map[connfd.fileno()] = connfd # 维护字典
# 处理客户端消息
def _handle_client(self, fd):
data = self.map[fd].recv(1024)
# 处理客户端退出
if not data:
self.ep.unregister(fd) # 不再关注
self.map[fd].close()
del self.map[fd] # 从字典删除
return
print(data.decode())
self.map[fd].send(b"Thanks")
# 启动服务
def serve_forever(self):
self.sock.listen(3)
print("Listen the port %d" % self.port)
self.ep.register(self.sock, EPOLLIN) # 设置关注
self.map[self.sock.fileno()] = self.sock
while True:
events = self.ep.poll()
# 循环查看哪个IO发生就处理哪个
for fd, event in events:
if fd == self.sock.fileno():
self._connect_client(fd)
elif event == EPOLLIN:
self._handle_client(fd)
if __name__ == '__main__':
ep = EpollServer(host="0.0.0.0", port=8888)
ep.serve_forever() # 启动服务