Python使用异步线程池实现异步TCP服务器交互

news2024/12/25 14:02:16

背景:

实现客户端与服务端交互,由于效率原因,要发送与接收异步,提高效率。

需要多线程,本文用线程池管理。

common代码:

import pickle
import struct
import time


def send_msg(conn, data):
    time.sleep(1)
    msg = pickle.dumps(data)
    msg = struct.pack('>I', len(msg)) + msg
    conn.sendall(msg)
    return data, len(msg)


def recv_from(conn, n):
    data = b''
    handle_len = 0
    while handle_len < n:
        packet = conn.recv(n - handle_len)
        if not packet:
            return None
        handle_len += len(packet)
        data += packet
    return data


def recv_msg(conn):
    struct_msg_len = recv_from(conn, 4)
    if not struct_msg_len:
        return None, 0
    msg_len = struct.unpack('>I', struct_msg_len)[0]
    msg = recv_from(conn, msg_len)
    msg = pickle.loads(msg)
    return msg, msg_len

客户端:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import time
import select
import socket
import threading
from threading import Thread
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor

from common import send_msg, recv_msg

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 不经过WAIT_TIME,直接关闭
sock.setblocking(False)  # 设置非阻塞编程

inputs = [sock, ]
executor = ThreadPoolExecutor(max_workers=3)  # 设置线程池最大数量

print('client start!!!')

try:
    sock.connect(("127.0.0.1", 789))
except Exception as e:
    print(e)


def handle_received_data(data):
    print("接收服务端信息:", data)
    time.sleep(1)
    return


def receive_service_data():
    """接收服务端返回的数据并处理"""
    while True:
        try:
            r_list, w_list, e_list = select.select(inputs, [], [], 1)
            for event in r_list:
                data, data_len = recv_msg(event)
                if data:
                    try:
                        executor.submit(handle_received_data, data)
                    except Exception as e:
                        print(threading.current_thread(), threading.active_count())
                        print(e)
                else:
                    print("远程断开连接")
                    inputs.remove(event)
                    exit()
        except OSError as e:
            import traceback
            print(traceback.format_exc())
            print(e)
            exit()


def send_client_data(size=100):
    """发送客户端数据"""
    executors = []
    for i in range(size):
        exe = executor.submit(send_msg, sock, {'data': i})
        executors.append(exe)
    for feature in as_completed(executors):
        try:
            data, data_len = feature.result()
        except Exception as e:
            print(e)
        else:
            print(f"客户端发送数据:{data}, len:{data_len}")


if __name__ == '__main__':
    T1 = time.time()

    # 启动接受服务端数据的线程
    Thread(target=receive_service_data).start()

    # 发送客户端数据
    send_client_data(size=10)
    print('all_time:', time.time() - T1)

服务端:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import socket
import select
import threading
from concurrent.futures import ThreadPoolExecutor

from common import send_msg, recv_msg

sock = socket.socket()
sock.bind(('127.0.0.1', 789))
sock.setblocking(False)
sock.listen()

inputs = [sock, ]
lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=3)  # 设置线程池最大数量

print('service start!!!')


def handle_received_data(event, data):
    time.sleep(1)
    send_msg(event, data)
    print(f"服务端发送数据:{data}")


while True:
    r_list, w_list, e_list = select.select(inputs, [], [], 1)
    for event in r_list:
        if event == sock:
            print("新的客户端连接")
            new_sock, addresses = event.accept()
            inputs.append(new_sock)
        else:
            data, msg_len = recv_msg(event)
            if data:
                print("接收到客户端信息", data)
                executor.submit(handle_received_data, event, data)
            else:
                print("客户端断开连接")
                inputs.remove(event)

运行结果:


参考:

Python select.select 模块通信全过程详解_南淮北安的博客-CSDN博客 

Python标准库socketserver使用线程混入实现异步TCP服务器 

Python中的多路复用 (select、poll 和 epoll) 

 Python实现socket的非阻塞式编程 - 简书

https://www.cnblogs.com/i-honey/p/8078518.html 

 Python多线程RuntimeError: can’t start new thread-Grugsum's blog

python自学成才之路 线程间协作之Semaphore,threading.local() - 腾讯云开发者社区-腾讯云 

Python can‘t start new thread_零之领域的博客-CSDN博客 

python socket sendto函数返回值_他拍了拍你,来这里看这个Socket。_weixin_39982580的博客-CSDN博客 python线程数设置多少_为什么线程数增加超过threading.BoundedSemaphore在python的设置?..._weixin_39808803的博客-CSDN博客

https://www.cnblogs.com/shuopython/p/14943175.html 

Python线程池及其原理和使用(超级详细) 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

centos7配置静态网络常见问题归纳

系列相似配置与安装软件问题整理与归纳文章目录 安装pymysql库_pymysql库安装_张小鱼༒的博客-CSDN博客 解决pip更新的代码_pip更新代码_张小鱼༒的博客-CSDN博客 python当中的第三方wxPython库的安装解答_pip install wx_张小鱼༒的博客-CSDN博客 spark里面配置jdk后的编程…

MATLAB算法实战应用案例精讲-【优化算法】增强型鲸鱼优化算法(EWOA)(附matlab代码实现)

前言 增强型鲸鱼优化算法(Enhanced Whale Optimization Algorithm,EWOA)是Mohammad H. Nadimi-Shahraki等人于2022年提出的一种改进算法。由于标准的鲸鱼优化算法及其它的改进算法都存在种群多样性低和搜索策略差的问题,因此引入有效的策略来缓解鲸鱼优化算法的这些核心缺点…

Licode—基于webrtc的SFU/MCU实现

1. webrtc浅析webrtc的前世今生、编译方法、行业应用、最佳实践等技术与产业类的文章在网上卷帙浩繁&#xff0c;重复的内容我不再赘述。对我来讲&#xff0c;webrtc的概念可以有三个角度去解释&#xff1a;&#xff08;1&#xff09;.一个W3C和IETF制定的标准&#xff0c;约定…

项目成本管理中的常见误区及解决方案

做过项目的人都明白&#xff0c;项目实施时间一般很长&#xff0c;在实施期间总有很多项目结果不尽人意的问题。要使一个项目取得成功&#xff0c;就要结合很多因素一起才能作用&#xff0c;其中做好项目成本的管理就是最重要的步骤之一&#xff0c;下面列出了常见的项目成本管…

Python进阶-----高阶函数map() 简介和使用

目录 简介&#xff1a; ​编辑 示例&#xff1a; 示例&#xff08;1&#xff09;&#xff1a;输出map()函数返回值&#xff08;迭代器&#xff09;结果 示例&#xff08;2&#xff09;&#xff1a;与循环对比 示例&#xff08;3&#xff09;&#xff1a;字符串转列表 示…

第九届蓝桥杯省赛——7缩位求和

题目&#xff1a;在电子计算机普及以前&#xff0c;人们经常用一个粗略的方法来验算四则运算是否正确。比如&#xff1a;248 * 15 3720把乘数和被乘数分别逐位求和&#xff0c;如果是多位数再逐位求和&#xff0c;直到是1位数&#xff0c;得2 4 8 14 > 1 4 5;1 5 65…

【C++】类与对象(一)

文章目录1、面向过程和面向对象初步认识2、类的引入3、类的定义4、类的访问限定符5、类的作用域6、类的实例化7、计算类对象的大小8、this指针9、 C语言和C实现Stack的对比1、面向过程和面向对象初步认识 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题…

算法练习-排序(一)

算法练习-排序(一) 文章目录算法练习-排序(一)1 排序算法1.1 冒泡排序1.1.1代码1.2插入排序1.2.1代码1.3 选择排序1.3.1代码1.4归并排序1.4.1代码1.5 快速排序1.5.1 思路1.5.2 代码2 题目2.1 特殊排序2.1.1 题目2.1.2 题解2.2 数组中的第k个最大元素2.2.1 题目2.2.2 题解2.3 对…

【Linux学习】基础IO——系统调用 | 文件描述符fd | 重定向

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《Linux学习》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 基础IO&#x1f34e;文件操作&#x1f349;使用C接口进行文件操作&#x1f349;文件操作的系统调…

Ingress

Ingres 目录 文章目录Ingres目录本节实战1、Ingress是什么2、定义1.rules2.Resource3.pathType4.IngressClass5.TLS3、Ingress-controller1.什么是Ingress-controller2.其他ingress-controller控制器FAQ零碎ingress就是借用service实现服务发现机制关于我最后本节实战 无 1、…

MySQL基础(一)SQL分类、导入、SELECT语句,运算符

目录 MySQL安装以及相关工具 SQL分类 导入数据 最基本的SELECT语句 SELECT FROM 列的别名 去除重复行 着重号 查询常数 描述表结构 过滤数据&#xff08;重要&#xff09; 运算符 算数运算符 比较运算符 符号运算符 非符号运算符 逻辑运算符 位运算符 MySQL安…

【C++】继承与多态

目录前言1. 继承1.1 继承的概念1.2 继承的定义1.3 切片赋值1.4 继承中的作用域1.5 派生类的默认成员函数1.6 继承与友元、静态成员1.7 多继承、菱形继承、菱形虚拟继承1.7.1 区分单继承与多继承1.7.2 菱形继承1.7.3 菱形虚拟继承1.7.4 菱形虚拟继承的原理2. 多态2.1 概念2.2 多…

Elasticsearch实战之(商品搜索API实现)

Elasticsearch实战之&#xff08;商品搜索API实现&#xff09; 1、案例介绍 某医药电商H5商城基于Elasticsearch实现商品搜索 2、案例分析 2.1、数据来源 商品库 - 平台运营维护商品库 - 供应商维护 2.2、数据同步 2.2.1、同步双写 写入 MySQL&#xff0c;直接也同步往…

如何使用C2concealer生成随机化的C2 Malleable配置文件

关于C2concealer C2concealer是一款功能强大的命令行工具&#xff0c;在该工具的帮助下&#xff0c;广大研究人员可以轻松生成随机化的C2 Malleable配置文件&#xff0c;以便在Cobalt Strike中使用。 工具运行机制 开发人员对Cobalt Strike文档进行了详细的研究&#xff0c;…

【转载】2020融云:基于WebRTC的低延迟视频直播

原文直接访问本文是读书笔记。基于WebRTC的低延迟视频直播 需要学习rtp包的缓存设计,于是找到了这一篇文章rtp包缓存 如何适应直播需求?直播与实时通信的区别 流量更少: RTMP或者HLS主要基于TCP传输,WebRTC是基于UDP的传输, **UDP协议的头小。**TCP为了保证传输质量,因…

Zotero设置毕业论文/中文期刊参考文献格式

大家在使用zotero时很容易遇到的问题&#xff1a; 英文参考文献中有多个作者时出现“等”&#xff0c;而不是用"et al"引文最后面有不需要的DOI号&#xff0c;或者论文链接对于一些期刊分类上会出现OL字样&#xff0c;即[J/OL]作者名为全大写 本文主要解决以上几个…

string函数以及string常用接口

本文介绍的是C关键字string中一些重要用法&#xff0c;以及各种字符串序列的处理操作 ——飘飘何所似&#xff0c;天地一沙鸥 文章目录前言一、string&#xff08;字符串类&#xff09;二、string类对象的容量操作2.1 size/length2.2 capacity2.3 empty/clear2.4 resize/reser…

教你如何搭建设备-保养管理系统,demo可分享

1、简介1.1、案例简介本文将介绍&#xff0c;如何搭建设备-保养管理。1.2、应用场景设备管理员进行制定设备保养计划、记录设备保养信息、可以查看设备保养日历。2、设置方法2.1、表单搭建1&#xff09;新建表单【设备档案-履历表】&#xff0c;字段设置如下&#xff1a;名称类…

SSM SpringBoot vue 在线教学质量评价系统

SSM SpringBoot vue 在线教学质量评价系统 SSM 在线教学质量评价系统 功能介绍 首页 图片轮播展示 登录 学生注册 教师注册 督导注册 教师展示 教师详情 学生评价 课程信息 课程详情 提交选修该课 学生选课 学生留言 个人中心 后台管理 管理员或学生或教师或督导登录 个人中…

项目经理处理团队冲突 5大注意事项

1、在时间、场景、体验矩阵中的5种处理方式 第一种方式&#xff1a;强迫命令&#xff0c;即职位高的一方在不考虑对方感受的情况下&#xff0c;强迫职位低的一方接受自己的意见。这种处理方式的适用场景为重要且紧急&#xff0c;这种方式团队成员的体验感低。 第二种方式&#…