6.python网络编程

news2025/1/11 8:43:45

文章目录

    • 1.生产者消费者-生成器版
    • 2.生产者消费者--异步版本
    • 3.客户端/服务端-多线程版
    • 4.IO多路复用TCPServer模型
      • 4.1Select
      • 4.2Epoll
    • 5.异步IO多路复用TCPServer模型

1.生产者消费者-生成器版

import time


# 消费者
def consumer():
    cnt = yield
    while True:
        if cnt <= 0:
            # 暂停、让出CPU
            cnt = yield cnt
        cnt -= 1
        time.sleep(1)
        print('consumer consum 1 cnt. cnt =', cnt)


# 生产者 (调度器)
def producer(cnt):
    gen = consumer()
    # 激活生成器
    next(gen)
    gen.send(cnt)
    while True:
        cnt += 1
        print('producer producer 5 cnt. cnt =', cnt)
        # 调度消费者
        current = int(time.time())
        if current % 5 == 0:
            cnt = gen.send(cnt)
        else:
            time.sleep(1)


if __name__ == '__main__':
    producer(0)

2.生产者消费者–异步版本

import asyncio
import time
from queue import Queue
from threading import Thread


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_sleep(x, queue):
    await asyncio.sleep(x)
    queue.put('ok')


def consumer(input_queue1, out_queue1):
    while True:
        task = input_queue1.get()
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_sleep(int(task), out_queue1), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()
    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.daemon = True
    loop_thread.start()

    input_queue = Queue()
    input_queue.put(5)
    input_queue.put(3)
    input_queue.put(1)

    out_queue = Queue()

    consumer_thread = Thread(target=consumer, args=(input_queue, out_queue,))
    consumer_thread.daemon = True
    consumer_thread.start()

    while True:
        msg = out_queue.get()
        print("协程运行完...")
        print("当前时间:", time.ctime())

3.客户端/服务端-多线程版

客户端/服务模型

在这里插入图片描述

客户端

# -*- encoding=utf-8 -*-
# 客户端

import socket


client = socket.socket()
print('client.fileno:', client.fileno())

client.connect(('127.0.0.1', 8999))

while True:
    content = str(input('>>>'))
    client.send(content.encode())
    content = client.recv(1024)
    print('client recv content:', content)

服务端

import socket
import threading


def thread_process(s):
    while True:
        content = s.recv(1024)
        if len(content) == 0:
            break
        s.send(content.upper())
        print(str(content, encoding='utf-8'))  # 接受来自客户端的消息,并打印出来
        s.close()


server = socket.socket()  # 1. 新建socket
server.bind(('127.0.0.1', 8999))  # 2. 绑定IP和端口(其中127.0.0.1为本机回环IP)
server.listen(5)  # 3. 监听连接

while True:
    s, addr = server.accept()  # 4. 接受连接
    new_thread = threading.Thread(target=thread_process, args=(s,))
    print('new thread process connect addr:{}'.format(addr))
    new_thread.start()

注意:

  • AddressFamily=AF_INET:(用于 Internet 进程间通信)

  • AddressFamily=AF_UNIX(用于同一台机器进程间通信)

  • 现象:报错[WinError 10038],原因分析:socket 先 close 再调 recv 就会报错,解决办法:if not tcpCliSock._closed:

4.IO多路复用TCPServer模型

4.1Select

服务端

import select
import socket
from queue import Queue, Empty
from time import sleep

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server_address = ("127.0.0.1", 8999)
print('starting up on %s port %s' % server_address)
server.bind(server_address)
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}

while inputs:
    print('waiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    for s in readable:
        if s is server:
            connection, client_address = s.accept()
            print(f"connection from {client_address}")
            connection.setblocking(False)
            inputs.append(connection)
            message_queues[connection] = Queue()
            continue

        data = s.recv(1024).decode()
        if data == "":
            print(f'closing:{s.getpeername()}')
            if s in outputs:
                outputs.remove(s)
            inputs.remove(s)
            s.close()
            message_queues.pop(s)
            continue

        print(f'received {data} from {s.getpeername()} ')
        message_queues[s].put(data)
        if s not in outputs:
            outputs.append(s)

    for s in writable:
        try:
            queue_item = message_queues.get(s)
            send_data = ''
            if queue_item:
                send_data = queue_item.get_nowait()
        except Empty:
            print(outputs.remove(s))
            print(f"{s.getpeername()} has closed")
        else:
            if queue_item:
                s.send(send_data.encode())

    for s in exceptional:
        print(f"Exception condition on {s.getpeername}")
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        message_queues.pop(s)

    sleep(1)

客户端

import socket

messages = ['This is the message ', 'It will be sent ', 'in parts ', ]
server_address = ("127.0.0.1", 8999)
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for index, message in enumerate(messages):
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message + str(index)))
        s.send((message + str(index)).encode('utf-8'))

for s in socks:
    data = s.recv(1024)
    print('%s: received "%s"' % (s.getsockname(), data))
    if data != "":
        print('closing socket', s.getsockname())
        s.close()

  • 为什么要将server放入到inputs中

在select模型中,将server放入到inputs中,当执行select时就会去检查server是否可读,就说明在缓冲区里有数据,对于server来说,有连接进入。使用accept获得客户端socket文件后,首先要放入到inputs当中,等待其发送消息。

  • readable

select会将所有可读的socket返回,包括server在内,假设一个客户端socket的缓冲区里有2000字节的内容,而这一次你只是读取了1024个字节,没有关系,下一次执行select模型时,由于缓冲区里还有数据,这个客户端socket还会被放入到readable列表中。因此,在读取数据时,不必再像之前那样使用一个while循环一直读取。

  • writable

在每一次写操作执行后,都从socket从writable中删除,这样做的原因很简单,该写的数据已经写完了,如果不删除,下一次select操作时,又会把他放入到writable中,可是现在已经没有数据需要写了啊,这样做没有意义,只会浪费select操作的时间,因为它要遍历outputs中的每一个socket,判断他们是否可写以决定是否将其放入到writtable中

  • 异常

在exceptional中,是发生错误和异常的socket,有了这个数组,就在也不用操心错误和异常了,不然程序写起来非常的复杂,有了统一的管理,发生错误后的清理工作将变得非常简单

4.2Epoll

服务端

# -*- encoding=utf-8 -*-
# IO多路复用TCPServer模型

import select
import socket


def serve():
    server = socket.socket()
    server.bind(('127.0.0.1', 8999))
    server.listen(1)

    epoll = select.epoll()
    epoll.register(server.fileno(), select.EPOLLIN)

    connections = {}
    contents = {}

    while True:
        events = epoll.poll(10)
        for fileno, event in events:
            if fileno == server.fileno():
                # 当fd为当前服务器的描述符时,获取新连接
                s, addr = server.accept()  # 获取套接字和地址
                print(f"new connection from addr:{addr},fileno:{s.fileno()},socket:{s}")
                epoll.register(s.fileno(), select.EPOLLIN)
                connections[s.fileno()] = s
            elif event == select.EPOLLIN:
                # 当fd不为服务器描述符为客户端描述符时,读事件就绪,有新数据可读
                s = connections[fileno]
                content = s.recv(1024)
                if content:
                    # 当客户端发送数据时
                    print(f"recv content is {content}")
                    print(f"fileno:{fileno} event:{event}")
                    epoll.modify(fileno, select.EPOLLOUT)
                    contents[fileno] = content
                else:
                    # 当客户端退出连接时
                    print(f"recv content is null")
                    print(f"fileno;{fileno} event:{event} ")
                    epoll.unregister(fileno)
                    s.close()
                    connections.pop(fileno)
            elif event == select.EPOLLOUT:
                # 当fd不为服务器描述符为客户端描述符时,写事件就绪
                try:
                    content = contents[fileno]
                    s = connections[fileno]
                    s.send(content)
                    epoll.modify(s.fileno(), select.EPOLLIN)
                    print(f"modify content is {content}")
                    print(f"fileno;{fileno} event:{event} ")
                except Exception as error:
                    epoll.unregister(fileno)
                    s.close()
                    connections.pop(fileno)
                    contents.pop(fileno)
                    print(f"modify content is failed")
                    print(f"fileno;{fileno} event:{event} ")


if __name__ == '__main__':
    serve()

客户端

# -*- encoding=utf-8 -*-
# 客户端

import socket


client = socket.socket()
print('client.fileno:', client.fileno())

client.connect(('127.0.0.1', 8999))

while True:
    content = str(input('>>>'))
    client.send(content.encode())
    content = client.recv(1024)
    print('client recv content:', content.decode())

5.异步IO多路复用TCPServer模型

import socket
import select
from collections import deque


class Future:
    """可等待对象 Future"""

    def __init__(self, loop):
        self.loop = loop
        self.done = False
        self.co = None

    def set_done(self):
        self.done = True

    def set_coroutine(self, co):
        self.co = co

    def __await__(self):
        if not self.done:
            yield self
        return


class SocketWrapper:
    """套接字协程适配器"""

    def __init__(self, sock: socket.socket, loop):
        self.loop = loop
        self.sock = sock
        self.sock.setblocking(False)
        self.fileno = self.sock.fileno()

    def create_future_for_events(self, events):
        future: Future = Future(loop=self.loop)

        def handler():
            future.set_done()
            self.loop.unregister_handler(self.fileno)
            if future.co:
                self.loop.add_coroutine(future.co)

        self.loop.register_handler(self.fileno, events, handler)

        return future

    async def accept(self):
        while True:
            try:
                sock, addr = self.sock.accept()
                return SocketWrapper(sock, self.loop), addr
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def recv(self, backlog):
        while True:
            try:
                return self.sock.recv(backlog)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def send(self, data):
        while True:
            try:
                return self.sock.send(data)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLOUT)
                await future


class EventLoop:
    """调度器:epoll事件驱动"""
    current = None
    runnable = deque()
    epoll = select.epoll()
    handler = {}

    @classmethod
    def instance(cls):
        if not EventLoop.current:
            EventLoop.current = EventLoop()
        return EventLoop.current

    def register_handler(self, fileno, events, handler):
        self.handler[fileno] = handler
        self.epoll.register(fileno, events)

    def unregister_handler(self, fileno):
        self.epoll.unregister(fileno)
        self.handler.pop(fileno)

    def add_coroutine(self, co):
        self.runnable.append(co)

    def run_coroutine(self, co):
        try:
            future: Future = co.send(None)
            future.set_coroutine(co)
        except Exception as e:
            print(e)
            print('coroutine {} stopped'.format(co.__name__))

    def run_forever(self):
        while True:
            while self.runnable:
                self.run_coroutine(co=self.runnable.popleft())
            events = self.epoll.poll(1)
            for fileno, event in events:
                handler = self.handler.get(fileno)
                handler()


class TCPServer:

    def __init__(self, loop: EventLoop):
        self.loop = loop
        self.listen_sock: SocketWrapper = self.create_listen_socket()
        self.loop.add_coroutine(self.serve_forever())

    def create_listen_socket(self, ip='localhost', port=8999):
        sock = socket.socket()
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((ip, port))
        sock.listen()
        return SocketWrapper(sock, self.loop)

    async def handler_client(self, sock: SocketWrapper):
        while True:
            data = await sock.recv(1024)
            if not data:
                print('client disconnected')
                break
            await sock.send(data.upper())

    async def serve_forever(self):
        while True:
            sock, addr = await self.listen_sock.accept()
            print(f'client connect addr = {addr}')
            self.loop.add_coroutine(self.handler_client(sock))


if __name__ == '__main__':
    loop = EventLoop.instance()
    server = TCPServer(loop)
    loop.run_forever()



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1638054.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年五一数学建模竞赛C题论文首发

基于随机森林的煤矿深部开采冲击地压危险预测 摘要 煤炭作为中国重要的能源和工业原料&#xff0c;其开采活动对国家经济的稳定与发展起着至关重要的作用。本文将使用题目给出的数据探索更为高效的数据分析方法和更先进的监测设备&#xff0c;以提高预警系统的准确性和可靠性…

键盘更新计划

作为 IT 搬砖人&#xff0c;一直都认为键盘没有什么太大关系。 每次都是公司发什么用什么。 但随着用几年后&#xff0c;发现现在的键盘经常出问题&#xff0c;比如说调节音量的时候通常莫名其妙的卡死&#xff0c;要不就是最大音量要不就是最小音量。 按键 M 不知道什么原因…

服务运营 | 精选:花钱买开心!体验型服务设计中的调度优化

编者按 在体验经济时代&#xff0c;企业逐渐从提供产品转变为提供体验&#xff0c;只有了解顾客的行为&#xff0c;才能对服务进行更好的设计&#xff0c;从而提高顾客的体验和忠诚度&#xff0c;实现企业与顾客的双赢。如何优化顾客体验便是体验型服务设计&#xff08;Experie…

通过自然语言处理执行特定任务的AI Agents;大模型控制NPC执行一系列的动作;个人化的电子邮件助手Panza

✨ 1: OpenAgents 通过自然语言处理执行特定任务的AI代理 OpenAgents是一个开放平台&#xff0c;旨在使语言代理&#xff08;即通过自然语言处理执行特定任务的AI代理&#xff09;的使用和托管变得更加便捷和实用。它特别适合于日常生活中对数据分析、工具插件获取和网络浏览…

鹏哥C语言复习——内存函数

目录 一.memcpy函数 二.memmove函数 三.memset函数 四.memcmp函数 一.memcpy函数 该函数是针对内存块进行拷贝操作&#xff0c;mem即为memory&#xff0c;是内存的意思&#xff1b;cpy就是copy&#xff0c;是拷贝的意思 int arr[] { 1,2,3,4,5,6,7,8,9,10 }; int arr2[2…

C++中的右值引用和移动语义

1 左值引用和右值引用 传统的C语法中就有引用的语法&#xff0c;而C11中新增了的右值引用语法特性&#xff0c;所以从现在开始我们之前学习的引用就叫做左值引用。无论左值引用还是右值引用&#xff0c;都是给对象取别名。 什么是左值&#xff1f;什么是左值引用&#xff1f; 左…

初探 Google 云原生的CICD - CloudBuild

大纲 Google Cloud Build 简介 Google Cloud Build&#xff08;谷歌云构建&#xff09;是谷歌云平台&#xff08;Google Cloud Platform&#xff0c;GCP&#xff09;提供的一项服务&#xff0c;可帮助开发人员以一致和自动化的方式构建、测试和部署应用程序或构件。它为构建和…

【Java探索之旅】包管理精粹 Java中包的概念与实践

文章目录 &#x1f4d1;前言一、封装1.1 封装的概念1.2 访问限定修饰符 二、封装扩展&#xff08;包&#xff09;2.1 包的概念2.2 带入包中的类2.3 自定义包2.4 常见的包 &#x1f324;️全篇总结 &#x1f4d1;前言 在Java编程中&#xff0c;封装是面向对象编程的核心概念之一…

PyCharm 2024新版图文安装教程(python环境搭建+PyCharm安装+运行测试+汉化+背景图设置)

名人说&#xff1a;一点浩然气&#xff0c;千里快哉风。—— 苏轼《水调歌头》 创作者&#xff1a;Code_流苏(CSDN) 目录 一、Python环境搭建二、PyCharm下载及安装三、解释器配置及项目测试四、PyCharm汉化五、背景图设置 很高兴你打开了这篇博客&#xff0c;如有疑问&#x…

Swift - 可选项(Optional)

文章目录 Swift - 可选项&#xff08;Optional&#xff09;1. 可选项&#xff08;Optional&#xff09;2. 强制解包&#xff08;Forced Unwrapping&#xff09;3. 判断可选项是否包含值4. 可选项绑定&#xff08;Optional Binding&#xff09;5. 等价写法6. while循环中使用可选…

【linuxC语言】stat函数

文章目录 前言一、stat函数二、示例代码总结 前言 在Linux系统编程中&#xff0c;stat() 函数是一个非常重要的工具&#xff0c;用于获取文件的元数据信息。无论是在系统管理、文件处理还是应用开发中&#xff0c;都可能会用到 stat() 函数。通过调用 stat() 函数&#xff0c;…

简约大气的全屏背景壁纸导航网源码(免费)

简约大气的全屏背景壁纸导航网模板 效果图部分代码领取源码下期更新预报 效果图 部分代码 <!DOCTYPE html> <html lang"zh-CN"> <!--版权归孤独 --> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible…

用LM Studio搭建微软的PHI3小型语言模型

什么是 Microsoft Phi-3 小语言模型&#xff1f; 微软Phi-3 模型是目前功能最强大、最具成本效益的小型语言模型 &#xff08;SLM&#xff09;&#xff0c;在各种语言、推理、编码和数学基准测试中优于相同大小和更高大小的模型。此版本扩展了客户高质量模型的选择范围&#x…

C# Winform父窗体打开新的子窗体前,关闭其他子窗体

随着Winform项目越来越多&#xff0c;界面上显示的窗体越来越多&#xff0c;窗体管理变得更加繁琐。有时候我们要打开新窗体&#xff0c;然后关闭多余的其他窗体&#xff0c;这个时候如果一个一个去关闭就会变得很麻烦&#xff0c;而且可能还会出现遗漏的情况。这篇文章介绍了三…

Stylus深度解析:开发效率提升秘籍(AI写作)

首先&#xff0c;这篇文章是基于笔尖AI写作进行文章创作的&#xff0c;喜欢的宝子&#xff0c;也可以去体验下&#xff0c;解放双手&#xff0c;上班直接摸鱼~ 按照惯例&#xff0c;先介绍下这款笔尖AI写作&#xff0c;宝子也可以直接下滑跳过看正文~ 笔尖Ai写作&#xff1a;…

基于北京迅为iTOP-RK3588大语言模型部署测试

人工智能&#xff08;AI&#xff09;领域中的大模型&#xff08;Large Model&#xff09;逐渐成为研究的热点。大模型&#xff0c;顾名思义&#xff0c;是指拥有海量参数和高度复杂结构的深度学习模型。它的出现&#xff0c;不仅推动了AI技术的突破&#xff0c;更为各行各业带来…

社交媒体数据恢复:Sugram

Sugram数据恢复的方法 在本文中&#xff0c;我们将探讨Sugram数据恢复的基本方法。通过专业软件按照数据恢复步骤来了解如何进行数据恢复。 1. 立即停止使用设备 一旦发现数据丢失&#xff0c;第一步应该是立即停止使用该设备。这是因为每次设备被使用&#xff0c;都有可能导…

SpringBoot~ dubbo + zookeeper实现分布式开发的应用

配置服务名字, 注册中心地址, 扫描被注册的包 server.port8081 #当前应用名字 dubbo.application.nameprovider-server #注册中心地址 dubbo.registry.addresszookeeper://127.0.0.1:2181 #扫描指定包下服务 dubbo.scan.base-packagescom.demo.service 实现一个接口,在…

IoTDB 入门教程③——基于Linux系统快速安装启动和上手

文章目录 一、前文二、下载三、解压四、上传五、启动六、执行七、停止八、参考 一、前文 IoTDB入门教程——导读 二、下载 下载二进制可运行程序&#xff1a;https://dlcdn.apache.org/iotdb/1.3.1/apache-iotdb-1.3.1-all-bin.zip 历史版本下载&#xff1a;https://archive.…

ROS2专栏(三) | 理解ROS2的动作

​ 1. 创建一个动作 目标&#xff1a; 在ROS 2软件包中定义一个动作。 1.1 新建包 设置一个 workspace 并创建一个名为 action_tutorials_interfaces 的包&#xff1a; mkdir -p ros2_ws/src #you can reuse existing workspace with this naming convention cd ros2_ws/s…