文章目录
- 1.生产者消费者-生成器版
- 2.生产者消费者--异步版本
- 3.客户端/服务端-多线程版
- 4.IO多路复用TCPServer模型
- 4.1Select
- 4.2Epoll
- 5.异步IO多路复用TCPServer模型
1.生产者消费者-生成器版
import time
# 消费者
def consumer():
cnt = yield
while True:
if cnt <= 0:
# 暂停、让出CPU
cnt = yield cnt
cnt -= 1
time.sleep(1)
print('consumer consum 1 cnt. cnt =', cnt)
# 生产者 (调度器)
def producer(cnt):
gen = consumer()
# 激活生成器
next(gen)
gen.send(cnt)
while True:
cnt += 1
print('producer producer 5 cnt. cnt =', cnt)
# 调度消费者
current = int(time.time())
if current % 5 == 0:
cnt = gen.send(cnt)
else:
time.sleep(1)
if __name__ == '__main__':
producer(0)
2.生产者消费者–异步版本
import asyncio
import time
from queue import Queue
from threading import Thread
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put('ok')
def consumer(input_queue1, out_queue1):
while True:
task = input_queue1.get()
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), out_queue1), new_loop)
if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.daemon = True
loop_thread.start()
input_queue = Queue()
input_queue.put(5)
input_queue.put(3)
input_queue.put(1)
out_queue = Queue()
consumer_thread = Thread(target=consumer, args=(input_queue, out_queue,))
consumer_thread.daemon = True
consumer_thread.start()
while True:
msg = out_queue.get()
print("协程运行完...")
print("当前时间:", time.ctime())
3.客户端/服务端-多线程版
客户端/服务模型
客户端
# -*- encoding=utf-8 -*-
# 客户端
import socket
client = socket.socket()
print('client.fileno:', client.fileno())
client.connect(('127.0.0.1', 8999))
while True:
content = str(input('>>>'))
client.send(content.encode())
content = client.recv(1024)
print('client recv content:', content)
服务端
import socket
import threading
def thread_process(s):
while True:
content = s.recv(1024)
if len(content) == 0:
break
s.send(content.upper())
print(str(content, encoding='utf-8')) # 接受来自客户端的消息,并打印出来
s.close()
server = socket.socket() # 1. 新建socket
server.bind(('127.0.0.1', 8999)) # 2. 绑定IP和端口(其中127.0.0.1为本机回环IP)
server.listen(5) # 3. 监听连接
while True:
s, addr = server.accept() # 4. 接受连接
new_thread = threading.Thread(target=thread_process, args=(s,))
print('new thread process connect addr:{}'.format(addr))
new_thread.start()
注意:
-
AddressFamily=AF_INET:(用于 Internet 进程间通信)
-
AddressFamily=AF_UNIX(用于同一台机器进程间通信)
-
现象:报错[WinError 10038],原因分析:socket 先 close 再调 recv 就会报错,解决办法:
if not tcpCliSock._closed:
4.IO多路复用TCPServer模型
4.1Select
服务端
import select
import socket
from queue import Queue, Empty
from time import sleep
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server_address = ("127.0.0.1", 8999)
print('starting up on %s port %s' % server_address)
server.bind(server_address)
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}
while inputs:
print('waiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for s in readable:
if s is server:
connection, client_address = s.accept()
print(f"connection from {client_address}")
connection.setblocking(False)
inputs.append(connection)
message_queues[connection] = Queue()
continue
data = s.recv(1024).decode()
if data == "":
print(f'closing:{s.getpeername()}')
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
message_queues.pop(s)
continue
print(f'received {data} from {s.getpeername()} ')
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
for s in writable:
try:
queue_item = message_queues.get(s)
send_data = ''
if queue_item:
send_data = queue_item.get_nowait()
except Empty:
print(outputs.remove(s))
print(f"{s.getpeername()} has closed")
else:
if queue_item:
s.send(send_data.encode())
for s in exceptional:
print(f"Exception condition on {s.getpeername}")
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
message_queues.pop(s)
sleep(1)
客户端
import socket
messages = ['This is the message ', 'It will be sent ', 'in parts ', ]
server_address = ("127.0.0.1", 8999)
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]
print('connecting to %s port %s' % server_address)
for s in socks:
s.connect(server_address)
for index, message in enumerate(messages):
for s in socks:
print('%s: sending "%s"' % (s.getsockname(), message + str(index)))
s.send((message + str(index)).encode('utf-8'))
for s in socks:
data = s.recv(1024)
print('%s: received "%s"' % (s.getsockname(), data))
if data != "":
print('closing socket', s.getsockname())
s.close()
- 为什么要将server放入到inputs中
在select模型中,将server放入到inputs中,当执行select时就会去检查server是否可读,就说明在缓冲区里有数据,对于server来说,有连接进入。使用accept获得客户端socket文件后,首先要放入到inputs当中,等待其发送消息。
- readable
select会将所有可读的socket返回,包括server在内,假设一个客户端socket的缓冲区里有2000字节的内容,而这一次你只是读取了1024个字节,没有关系,下一次执行select模型时,由于缓冲区里还有数据,这个客户端socket还会被放入到readable列表中。因此,在读取数据时,不必再像之前那样使用一个while循环一直读取。
- writable
在每一次写操作执行后,都从socket从writable中删除,这样做的原因很简单,该写的数据已经写完了,如果不删除,下一次select操作时,又会把他放入到writable中,可是现在已经没有数据需要写了啊,这样做没有意义,只会浪费select操作的时间,因为它要遍历outputs中的每一个socket,判断他们是否可写以决定是否将其放入到writtable中
- 异常
在exceptional中,是发生错误和异常的socket,有了这个数组,就在也不用操心错误和异常了,不然程序写起来非常的复杂,有了统一的管理,发生错误后的清理工作将变得非常简单
4.2Epoll
服务端
# -*- encoding=utf-8 -*-
# IO多路复用TCPServer模型
import select
import socket
def serve():
server = socket.socket()
server.bind(('127.0.0.1', 8999))
server.listen(1)
epoll = select.epoll()
epoll.register(server.fileno(), select.EPOLLIN)
connections = {}
contents = {}
while True:
events = epoll.poll(10)
for fileno, event in events:
if fileno == server.fileno():
# 当fd为当前服务器的描述符时,获取新连接
s, addr = server.accept() # 获取套接字和地址
print(f"new connection from addr:{addr},fileno:{s.fileno()},socket:{s}")
epoll.register(s.fileno(), select.EPOLLIN)
connections[s.fileno()] = s
elif event == select.EPOLLIN:
# 当fd不为服务器描述符为客户端描述符时,读事件就绪,有新数据可读
s = connections[fileno]
content = s.recv(1024)
if content:
# 当客户端发送数据时
print(f"recv content is {content}")
print(f"fileno:{fileno} event:{event}")
epoll.modify(fileno, select.EPOLLOUT)
contents[fileno] = content
else:
# 当客户端退出连接时
print(f"recv content is null")
print(f"fileno;{fileno} event:{event} ")
epoll.unregister(fileno)
s.close()
connections.pop(fileno)
elif event == select.EPOLLOUT:
# 当fd不为服务器描述符为客户端描述符时,写事件就绪
try:
content = contents[fileno]
s = connections[fileno]
s.send(content)
epoll.modify(s.fileno(), select.EPOLLIN)
print(f"modify content is {content}")
print(f"fileno;{fileno} event:{event} ")
except Exception as error:
epoll.unregister(fileno)
s.close()
connections.pop(fileno)
contents.pop(fileno)
print(f"modify content is failed")
print(f"fileno;{fileno} event:{event} ")
if __name__ == '__main__':
serve()
客户端
# -*- encoding=utf-8 -*-
# 客户端
import socket
client = socket.socket()
print('client.fileno:', client.fileno())
client.connect(('127.0.0.1', 8999))
while True:
content = str(input('>>>'))
client.send(content.encode())
content = client.recv(1024)
print('client recv content:', content.decode())
5.异步IO多路复用TCPServer模型
import socket
import select
from collections import deque
class Future:
"""可等待对象 Future"""
def __init__(self, loop):
self.loop = loop
self.done = False
self.co = None
def set_done(self):
self.done = True
def set_coroutine(self, co):
self.co = co
def __await__(self):
if not self.done:
yield self
return
class SocketWrapper:
"""套接字协程适配器"""
def __init__(self, sock: socket.socket, loop):
self.loop = loop
self.sock = sock
self.sock.setblocking(False)
self.fileno = self.sock.fileno()
def create_future_for_events(self, events):
future: Future = Future(loop=self.loop)
def handler():
future.set_done()
self.loop.unregister_handler(self.fileno)
if future.co:
self.loop.add_coroutine(future.co)
self.loop.register_handler(self.fileno, events, handler)
return future
async def accept(self):
while True:
try:
sock, addr = self.sock.accept()
return SocketWrapper(sock, self.loop), addr
except BlockingIOError:
future = self.create_future_for_events(select.EPOLLIN)
await future
async def recv(self, backlog):
while True:
try:
return self.sock.recv(backlog)
except BlockingIOError:
future = self.create_future_for_events(select.EPOLLIN)
await future
async def send(self, data):
while True:
try:
return self.sock.send(data)
except BlockingIOError:
future = self.create_future_for_events(select.EPOLLOUT)
await future
class EventLoop:
"""调度器:epoll事件驱动"""
current = None
runnable = deque()
epoll = select.epoll()
handler = {}
@classmethod
def instance(cls):
if not EventLoop.current:
EventLoop.current = EventLoop()
return EventLoop.current
def register_handler(self, fileno, events, handler):
self.handler[fileno] = handler
self.epoll.register(fileno, events)
def unregister_handler(self, fileno):
self.epoll.unregister(fileno)
self.handler.pop(fileno)
def add_coroutine(self, co):
self.runnable.append(co)
def run_coroutine(self, co):
try:
future: Future = co.send(None)
future.set_coroutine(co)
except Exception as e:
print(e)
print('coroutine {} stopped'.format(co.__name__))
def run_forever(self):
while True:
while self.runnable:
self.run_coroutine(co=self.runnable.popleft())
events = self.epoll.poll(1)
for fileno, event in events:
handler = self.handler.get(fileno)
handler()
class TCPServer:
def __init__(self, loop: EventLoop):
self.loop = loop
self.listen_sock: SocketWrapper = self.create_listen_socket()
self.loop.add_coroutine(self.serve_forever())
def create_listen_socket(self, ip='localhost', port=8999):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((ip, port))
sock.listen()
return SocketWrapper(sock, self.loop)
async def handler_client(self, sock: SocketWrapper):
while True:
data = await sock.recv(1024)
if not data:
print('client disconnected')
break
await sock.send(data.upper())
async def serve_forever(self):
while True:
sock, addr = await self.listen_sock.accept()
print(f'client connect addr = {addr}')
self.loop.add_coroutine(self.handler_client(sock))
if __name__ == '__main__':
loop = EventLoop.instance()
server = TCPServer(loop)
loop.run_forever()