基于python和aiohttp实现的web请求管理分发服务

news2025/1/11 23:47:30

想实现一个web请求管理分发服务,需要有如下功能:

1、第三方服务上报心跳,管理服务能监控第三方服务是否存活

2、管理服务支持http和ws服务的转发

3、管理服务支持最基础的转发策略,比方说轮询

直接上代码

一、网络和路由接口

async def main():
    app = web.Application()
    manger = Manger()
    app.add_routes([
        web.post('/nlp', manger.post),
        web.get('/ws', manger.stream_nlp),
        web.post('/heartbeat', manger.heartbeat)
    ])
    asyncio.create_task(manger.watch_service_nodes())
    return app


if __name__ == '__main__':
    if not os.path.exists("./logs"):
        os.makedirs("./logs")
    local_ip = "0.0.0.0"
    with open('./config.json','r',encoding='utf-8') as f:
        port = json.load(fp=f)['listen_port']
    web.run_app(main(), port=port)

manger类中完成心跳监测、http_post转发、ws请求转发以及服务节点状态监控,其中服务节点状态监控需要另起一个协程来实现挂起。

二、manager类部分

属性和方法如上所示

1、服务注册——根据第三方服务上报的心跳,对service_nodes进行初始化,注册服务名标签、请求次数、存活时间等。

    def register_node(self, node_dict):
        self.rlock.acquire()
        service_name = node_dict['servicename']
        service_name = service_name.split('_')
        tag = ""
        if len(service_name) == 2:
            tag = service_name[-1]
        service_name = service_name[0]
        addr_ip = node_dict['addrip']
        if service_name not in self.service_nodes:
            node = {
                "servicename":service_name,
                "access_count":0,
                "ip_details":{
                    addr_ip: {
                        "addr_ip": addr_ip,
                        "access_count": 0,
                        "alive_time": 5,
                        "tag": tag
                        }
                    }
                }
            self.service_nodes[service_name] = node
        else:
            node = self.service_nodes[service_name]
            if addr_ip not in node['ip_details']:
                node['ip_details'][addr_ip] = {
                        "addr_ip": addr_ip,
                        "access_count": 0,
                        "alive_time": 5,
                        "tag": tag
                    }
            else:
                node['ip_details'][addr_ip]["alive_time"] = 5
        self.rlock.release()
        return True

2、心跳服务接口

    async def heartbeat(self, request:web.Request):
        req = await request.json()
        self.logger.debug(f"{req}")
        for ele in req['params']['data']:
            # asyncio.to_thread(self.register_node(ele))
            self.register_node(ele)
        return web.json_response('{"status":1}')

接收第三方服务的请求,并调用register_node注册函数对service_nodes进行初始化或者更新

3、服务节点监测功能

    async def watch_service_nodes(self):
        logger = Logger(log_name="node_log",log_level=10, log_file='./logs/node.log').logger
        while True:
            logger.info(f"{self.service_nodes}")
            self.rlock.acquire()
            # 记录需要删除的ip节点和servicename
            node2delete_map = {}
            for k, node in self.service_nodes.items():
                ip_details = node['ip_details']
                for addr,_ in ip_details.items():
                    node['ip_details'][addr]['alive_time'] -= 1
                    if node['ip_details'][addr]['alive_time'] <=0:
                        if k not in node2delete_map:
                            node2delete_map[k] = {
                                "addr2delete":[addr],
                                "alive_addr_count":len(ip_details) -1
                            }
                        else:
                            if addr not in node2delete_map[k]['addr2delete']:
                                node2delete_map[k]['addr2delete'].append(addr)
                                node2delete_map[k]['alive_addr_count'] -= 1

            for k ,v in node2delete_map.items():
                if v['alive_addr_count'] == 0:
                    del self.service_nodes[k]
                else:
                    for addr in v['addr2delete']:
                        del self.service_nodes[k]['ip_details'][addr]
            self.rlock.release()
            await asyncio.sleep(0.2)

启动的协程挂起这个函数,执行service_nodes的更新功能——每隔0.2秒就更新一次,service_nodes中的每个node存活时间-1,如果存活时间为0就删除这个node

4、服务节点选择

当用户请求的时候,manger可以根据请求体中service_name以及service_nodes维护的服务状态情况,选择合适的服务节点进行请求转发。

    async def select_node(self,service_name):
        self.rlock.acquire()
        addr = None
        if service_name in self.service_nodes:
            node = self.service_nodes[service_name]
            ips = []
            access_count = node['access_count']
            for k,_ in node['ip_details'].items():
                ips.append(k)
            ips.sort()
            index = access_count%len(ips)
            addr = ips[index]
            node['access_count'] = access_count + 1
            node['ip_details'][addr]['access_count'] += 1
            self.rlock.release()
            return addr
        else:
            self.rlock.release()
            return addr

注意上述3个方法涉及到对同一个字典的键值对更新和删除操作,有资源的竞争,需要加锁。使用重入锁或者普通的Lock这里应该都可以的,都在同一个进程中也没有涉及到递归嵌套的情况。/5、

5、http转发

    async def post(self, request:web.Request):
        req = await request.json()
        self.logger.info(f"receive request: {json.dumps(req, ensure_ascii=False)}")
        service_name = req['service_name']
        addr = await self.select_node(service_name=service_name)
        if not addr:
            return self.build_fail_resp("model not support")
        url = f"http://{addr}/nlp"
        async with aiohttp.ClientSession() as session:
            try:
                async with session.post(url, data=json.dumps(req), verify_ssl=False, timeout=60) as response:
                    res = await response.json()
            except TimeoutError as e:
                self.logger.info(f"{e}")
                return self.build_fail_resp("model time out")
            except Exception as e:
                self.logger.info(f"{e}")
                return self.build_fail_resp("model interal error")

            return web.json_response(res)


    def build_fail_resp(self, msg):
        data = {
            'id': 0,
            'jsonrpc': '2.0',
            'ret': -1,
            'error_text': f"{msg}"
        }
        return web.json_response(data)

用户请求的时候post函数监听到请求时,调用select_node选择对应的服务地址,然后使用sync with aiohttp.ClientSession() as session上下文session自动管理会话,异步的调用第三方服务,得到响应后再响应给用户。

6、ws请求转发

    async def stream_nlp(self, request: web.Request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        # 接收客户端的消息
        async for msg in ws:
            if msg.type == web.WSMsgType.TEXT:
                if msg.data == 'close':
                    self.logger.info("ws close")
                    await ws.close()
                else:
                    try:
                        req = msg.json()
                    except Exception as e:
                        self.logger.info("Invalid JSON format")
                        res = {
                            "ret":1,
                            "flag":1,
                            "result":"Invalid JSON format"
                        }
                        await ws.send_str(json.dumps(res, ensure_ascii=False))
                        await ws.close()
                        return ws

                    self.logger.info(f"ws request: {req}")
                    service_name = req['service_name']
                    self.logger.info(f"service_name: {service_name}")
                    addr = await self.select_node(service_name=service_name)
                    self.logger.info(f"add: {addr}")
                    if addr:
                        # 连接到上游 WebSocket 服务
                        upstream_ws_url = f"ws://{addr}/ws"
                        self.logger.info(f"upstream_ws_url: {upstream_ws_url}")
                        async with aiohttp.ClientSession() as session:
                            # 连接到 upstream WebSocket 服务器
                            async with session.ws_connect(upstream_ws_url) as upstream_ws:
                                try:
                                    if not upstream_ws.closed:
                                        await upstream_ws.send_str(json.dumps(req,ensure_ascii=False))
                                        # 接收来自 upstream ws 服务器的消息
                                        async for up_msg in upstream_ws:
                                            if up_msg.type == aiohttp.WSMsgType.TEXT:
                                                data = json.loads(up_msg.data)
                                                ret = data['ret']
                                                flag = data['flag']
                                                result = data['result']
                                                if ret != 0:
                                                    self.logger.info(f"{result}")
                                                    res = {
                                                        "ret": 1,
                                                        "flag": 1,
                                                        "result": result
                                                    }
                                                    await ws.send_str(json.dumps(res, ensure_ascii=False))
                                                    await ws.close()
                                                    return ws
                                                else:
                                                    if flag == 0:
                                                        res = {
                                                            "ret": 0,
                                                            "flag": 0,
                                                            "result": result
                                                        }
                                                    else:
                                                        res = {
                                                            "ret": 0,
                                                            "flag": 1,
                                                            "result": result
                                                        }
                                                        self.logger.info(f"ws last send_str: {json.dumps(res, ensure_ascii=False)}")
                                                    # self.logger.info(f"ws closed: {ws.closed}")
                                                    await ws.send_str(json.dumps(res, ensure_ascii=False))
                                            elif  up_msg.type == web.WSMsgType.CLOSED:
                                                break
                                    else:
                                        self.logger.info(f"upstream_ws closed")
                                        res = {
                                            "ret": 1,
                                            "flag": 1,
                                            "result": f"upstream_ws {upstream_ws_url} closed"
                                        }
                                        await ws.send_str(json.dumps(res, ensure_ascii=False))
                                        await ws.close()
                                except Exception as e:
                                    self.logger.info(f"ws state ws.closed: {ws.closed}")
                                    self.logger.info(f"upstream_ws state upstream_ws.closed: {upstream_ws.closed}")
                                    self.logger.info(f"error  occur ws close : {e}")
                                    await ws.close()
                                finally:
                                    # 确保关闭上游 WebSocket 连接
                                    if not upstream_ws.closed:
                                        await upstream_ws.close()
                                    self.logger.info(f"finally ws close")
                                    await ws.close()
                    else:
                        res = {
                            "ret": 1,
                            "flag": 1,
                            "result": "model not support"
                        }
                        await ws.send_str(json.dumps(res, ensure_ascii=False))
                        await ws.close()

            elif msg.type == web.WSMsgType.BINARY:
                res = {
                    "ret": 1,
                    "flag": 1,
                    "result": "msg type not support"
                }
                await ws.send_str(json.dumps(res, ensure_ascii=False))
                await ws.close()
            else:
                await ws.close()
                break

        return ws

ws请求的服务端监听用户请求

ws = web.WebSocketResponse()

await ws.prepare(request)

解析用户的请求拿到msg,调用select_node选择对应的服务地址,然后使用async with session.ws_connect(upstream_ws_url) as upstream_ws异步请求上游ws服务,发送请求,收到upstream_ws响应数据然后再流式的返回给用户。

7、日志记录

    def create_logger(self):
        log_level = 20 # info
        log_path = "./logs/server.log"
        logger = logging.getLogger(__name__)
        logger.setLevel(level=log_level)

        formatter = logging.Formatter("%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s")

        # 创建一个handler,用于写入日志文件,按大小覆盖
        # file_handler = logging.handlers.RotatingFileHandler(filename=log_path, maxBytes=838860800, backupCount=20, encoding='utf-8')
        # 按日期覆盖
        file_handler = logging.handlers.TimedRotatingFileHandler(filename=log_path, when='D', interval=1,
                                                                 encoding='utf-8')
        file_handler.setFormatter(formatter)
        file_handler.setLevel(level=log_level)
        logger.addHandler(file_handler)

        # 创建一个handler,用于将日志输出到控制台
        console = logging.StreamHandler()
        console.setLevel(level=log_level)
        console.setFormatter(formatter)
        logger.addHandler(console)
        return logger

以上就是全部的小功能代码,比较简单,还可以设计缓存功能,组成batch然后请求诸如模型推理这类第三方服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1986306.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用AI助手写程序

用AI帮助写程序究竟靠不靠谱&#xff0c;下面来测试一下&#xff1a; 在文心一言中输入&#xff1a;写一个C Windows API串口通信程序。结果如下&#xff1a; #include <windows.h> #include <iostream> // 串口配置 void ConfigureCommPort(HANDLE hComm) {…

Linux系统的ARM边缘计算网关在纸张处理机械中的应用

数字化时代纸张处理机械行业也在不断追求智能化和高效化。ARM 边缘计算网关作为一种关键技术&#xff0c;为纸张处理机械的智能化提供了强大的支持。结合 Linux 系统的二次开发&#xff0c;它能够加速生产流程&#xff0c;提高生产效率和质量。 ARM 边缘计算网关具有强大的计算…

Python学习笔记50:游戏篇之外星人入侵(十一)

前言 本篇文章接着之前的内容&#xff0c;继续对游戏功能进行优化&#xff0c;主要是优化游戏状态以及对应的处理。 状态 一个游戏包含多种状态&#xff0c;这个状态是一个可以很复杂也可以很简单的内容。条件所限&#xff0c;我们这个游戏的状态就比较简单&#xff1a; 未…

log4j反序列化-流程分析

分析版本 JDK8u141 依赖 <dependencies><!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><ve…

三、1 一维数组的创建和初始化

数组是一组相同类型元素的集合 1、数组的创建 2、数组的初始化 在创建数组的同时&#xff0c;给数组内容一些合理的值 3、一维数组的使用 4、一维数组在内存中的存储 数组的内容在内存中连续存放

sqli靶场复现(1-8关)

目录 1.sqli-labs第二关 1.判断是否存在sql注入 1.1你输入数字值的ID作为参数&#xff0c;我们输入?id1 1.2在数据库可以查看到users下的对应内容 2.联合注入 2.1首先知道表格有几列&#xff0c;如果报错就是超过列数&#xff0c;如果显示正常就是没有超出列数。 2.2得…

模拟一次XFS故障,分析原因并进行修复

模拟一次XFS故障 在平常处理问题时经常会遇到文件系统损坏的问题&#xff0c;有时候是日志里面出现了报错但文件系统还是可以读写&#xff0c;有时候是文件系统已经无法读写了 分析下不同现象的原因和一些可能出现的情况。 通过直接修改块存储损坏文件系统 1、制作一个xfs文…

Pytorch基础模型,数据加载,优化算法

目录 一.nn.Module 二.优化器类 三.损失函数 四.在GPU上运行代码 五.常见的优化算法 1.梯度下降算法 2.动量法&#xff1a; 3.AdaGrad 4.RMSProp 六.Pytorch中的数据加载 1.数据集类 2.迭代数据集 2.Pytorch自带的数据集 一.nn.Module nn.Modul是torch.nn提供的一个…

嵌入式初学-C语言-十六

形式参数和实际参数 形式参数&#xff08;形参&#xff09; 函数定义时&#xff0c;指定的参数&#xff0c;形参是用来接收数据的&#xff0c;函数定义时&#xff0c;系统不会为形参申请内存&#xff0c;只有当函数调用时&#xff0c;系统才会为形参申请内存&#xff0c;用于存…

信息学奥赛初赛天天练-57-NOIP2018普及组-基础题1-输入输出设备、进制转换、计算机存储单位、 网络地理范围分类、等比数列求和

PDF文档公众号回复关键字:20240806 2019 CSP-J 基础题1 单项选择题&#xff08;共15题&#xff0c;每题2分&#xff0c;共计30分&#xff1a;每题有且仅有一个正确选项&#xff09; 1 以下哪一种设备属于输出设备( ) A 扫描仪 B 键盘 C 鼠标 D 打印机 2 下列四个不同进制的…

PSTNET阅读

ICLR2021 点云序列在空间维度上具有不规则性和无序性&#xff0c;但在时间维度上具有规律性和有序性。 现有的基于网格的卷积不能直接应用于原始点云序列的时空建模。 在时空序列下&#xff0c;基于网格和基于点的卷积对比。 创新点 1.首次尝试在原始点云序列建模中分解空间…

serial靶机教程

靶机下载地址 https://download.vulnhub.com/serial/serial.zip 主机发现 arp-scan -l 端口扫描 nmap 192.168.229.131 -A 根据对⽐可知serial的⼀个ip地址为192.168.47.143 该靶机开启了22端⼝和80端⼝ 对⽹站进⾏⼀个访问&#xff0c;⼤概意思为这是对新的cookie处理程序…

【优秀python案例】基于python爬虫的深圳房价数据分析与可视化实现

现如今&#xff0c;房价问题一直处于风口浪尖&#xff0c;房价的上涨抑或下跌都牵动着整个社会的利益&#xff0c;即便是政府出台各种政策方针也只能是暂时抑制楼市的涨势&#xff0c;对于需要买房的人来说&#xff0c;除了关注这些变化和政策外&#xff0c;还有一个非常头疼的…

工具|阅读PDF时鼠标显示为小手中有向下箭头解决方法

由于工作中&#xff0c;会大量阅读PDF文档&#xff0c;如手册&#xff0c;规格书&#xff0c;各种图纸等&#xff0c;因此好用的PDF工具必不可少。我主要习惯用福昕阅读器&#xff0c;标注比较方便。 所以&#xff0c;本文主要以福昕阅读器为主&#xff0c;当然也适用于其他的阅…

linux文本处理命令:文本搜索工具grep详解

目录 一、概述 二、基本语法 1、语法 2、常用选项 3、获取帮助 三、示例 1. 搜索文件中的字符串 2. 忽略大小写搜索 3. 显示匹配行的行号 4. 反向搜索 5. 递归搜索 6. 使用扩展正则表达式 7. 列出包含匹配项的文件 8. 显示匹配行的上下文 9. 使用正则…

从2013数学建模B题碎纸片拼接问题看递归和迭代思想

目录 1.递归实例说明 2.迭代实例说明 3.迭代思想在碎纸片拼接赛题的运用 1.递归实例说明 斐波那契数列可以使用递归&#xff0c;也可以使用数列的通项公式&#xff0c;但是这个地方建议使用数列的通项公式&#xff0c;因为这个递归的深度过大这个结果很难运行出来&#xff1…

使用WebDAV共享本地文件,轻量易用

特征&#xff1a; 使用 Golang 实现&#xff0c;性能极高。 最终编译成单个二进制文件&#xff0c;不需要 Apache 或类似的环境&#xff0c;依赖性很少。 支持浏览器访问。 可以在同一个端口上启用多个 WebDAV 服务&#xff0c;每个服务具有不同的挂载目录、用户名和密码。 良好…

嵌入式day20

feof&#xff1a; 检测文件是否到达结尾 ferroe&#xff1a; 检测文件是否发生错误 标准IO之文件定位 fseek&#xff08;&#xff09; SEEK_END 指向最后一个字节的后一个&#xff0c;继续加&#xff0c;写文件&#xff0c;会将文件扩大 ftell&#xff08;&#xff09; 获取…

STM32学习笔记1---LED,蜂鸣器

目录 GPIO LED 蜂鸣器 RCC外设 GPIO外设 总概 操作STM32的GPIO 代码 LED闪烁 LED流水灯 蜂鸣器&#xff01; 连接方式 GPIO GPIO输出&#xff1a;向外驱动控制 GPIO输入&#xff1a;读取&#xff0c;捕获&#xff08;信息&#xff09;&#xff08;控制&#xff09…

状压DP,abc359_d - Avoid K Palindrome

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 D - Avoid K Palindrome 二、解题报告 1、思路分析 数据量&#xff1a;N&#xff1a;1000&#xff0c;K&#xff1a;10 提示我们状态压缩 我们发现长度为K的字符串&#xff0c;我们可以用0表示A&#xff…