Python 全栈系列239 使用消息队列完成分布式任务

news2025/1/1 22:09:21

说明

在Python - 深度学习系列32 - glm2接口部署实践提到,通过部署本地化大模型来完成特定的任务。

由于大模型的部署依赖显卡,且常规量级的任务需要大量的worker支持,从成本考虑,租用算力机是比较经济的。由于任务是属于超高计算传输比的类型,且算力机随时可能出现不稳定的情况。

所以,使用消息队列完成此项任务是比较合适的。本次目标:

  • 1 回顾并快速搭建RabbitMQ和RabbitAgent服务的方法
  • 2 在无端口算力租用商(AutoDL)下部署chatglm2服务,并启动Worker处理数据
  • 3 在有端口算力租用商(仙宫云)下部署chatglm2服务,并用nginx反向代理,然后在异地启动worker测试

内容

1 构建消息队列(Server)

1.1 RabbitMQ镜像

先采用之前的命令启动
在这里插入图片描述
在算力机使用阿里云镜像仓库拉取,分钟级完成启动
在这里插入图片描述

1.2 RabbitAgent服务

理论上,应该封装为镜像后,以容器方式启动。不过租用的算力机系统盘太小(50G),装完CUDA之后只剩下10G多的空间,所以这次就把项目文件搬过去,在宿主机启动。
以后这类轻量级的服务,可以用一个很小的python环境镜像封装。

在这里插入图片描述

在这里插入图片描述

res = req.post('http://IP:24098/send_workq_message/', json = para_dict)
<Response [200]>
# 6 永久启动服务

nohup python3 server.py  >/dev/null 2>&1 &

在这里插入图片描述
消费者(手动确认消息模式)

            import pika
            import json
            credentials = pika.PlainCredentials('user', 'passwd')
            connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials))
            
            channel = connection.channel()

            def callback(ch, method, properties, body):
                input_data = json.loads(body.decode())
                print(f" [x] Received ",input_data)
                # time.sleep(body.count(b'.'))
                print(" [x] Done")
                ch.basic_ack(delivery_tag = method.delivery_tag)


            # channel.queue_declare(queue='hello1',durable=True)
            # 消费者预取消息数
            channel.basic_qos(prefetch_count=3)
            # 1 消费持久化的队列
            #channel.basic_consume(queue='hello1',
            #                        on_message_callback=callback, auto_ack =False)    
            # 2 消费非持久化队列
            channel.basic_consume(queue='hello2',
                                    on_message_callback=callback, auto_ack =False)    

            print(' [*] Waiting for messages. To exit press CTRL+C')
            channel.start_consuming()

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received  {'msg_id': 1, 'msg': 'first msg'}
 [x] Done
 [x] Received  {'msg_id': 2, 'msg': 'second msg'}
 [x] Done
 [x] Received  {'msg_id': 1, 'msg': 'first msg'}
 [x] Done
 [x] Received  {'msg_id': 2, 'msg': 'second msg'}
 [x] Done

1.3 将任务数据通过RabbitAgent写入

写入2.8万条,耗时5秒。
在这里插入图片描述

2 无端口算力租用商Worker测试

除了一定要返回的结果数据,还应该加上机器名称,显卡配置与处理时长。

2.1 启动服务

无端口算力机的代表就是AutoDL了,他们家机器也偏贵,4090一小时2.5~2.6元,比仙宫云高不少(我还是比较prefer后者的)。目前暂时没发现AutoDL有什么特别的优点,中规中矩。

在这里插入图片描述

发送文件

rsync -rvltz  -e 'ssh -p 44620'  --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace  root@connect.westb.seetacloud.com:/root/autodl-tmp/

在这里插入图片描述
然后修改api.py中模型加载的位置和端口号,启动3个服务。

以下是获取单条数据并进行调试的方法

import pika
import json
credentials = pika.PlainCredentials('x', 'xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('xxxx', 24091, '/', credentials,heartbeat=600))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='ent_intro_task', durable=True)
# 从队列中获取一条消息
method_frame, header_frame, body = channel.basic_get(queue='ent_intro_task')
data = body.decode('utf-8')
data1 = json.loads(data)
...
connection.close()

完成测试之后,打包为ent_intro_worker.py,该脚本接受一个端口输入,以便将worker和server匹配起来,充分利用资源。

import pika
import json
import time

import sys
import requests as req 

# 获取命令行参数
if len(sys.argv) > 1:
    parameter_value = sys.argv[1]
    print("传入的参数值为:", parameter_value)
else:
    print("未传入参数")
def send_resp(a_message):
    message_list = [a_message]
    para_dict = {}
    para_dict['rabbit'] = 'rabbit01'
    para_dict['routing_key'] = 'ent_intro_result'
    para_dict['durable'] = True
    para_dict['message_list'] = message_list
    para_dict['queue'] = 'ent_intro_result'
    
    resp = req.post('http://IP:PORT/send_workq_message/', json = para_dict)
    return True 

tmp ='''
成立日期:%s 
注册地址:%s
%s简介,字数在100-200字之间
'''
credentials = pika.PlainCredentials('andy', 'andy123')
connection = pika.BlockingConnection(pika.ConnectionParameters('IP', PORT, '/', credentials, heartbeat=600))

channel = connection.channel()

# 手动确认
def callback(ch, method, properties, body):
    input_data = json.loads(body.decode())
    print(f" [x] Received ",input_data)
    tick1 = time.time()
    prompt_content = {'prompt': tmp % (input_data['reg_dt'], input_data['addr'], input_data['ent_table_name'] )}
    res = req.post('http://127.0.0.1:%s/' % parameter_value, json =prompt_content).json()
    tick2 = time.time()

    a_message = {}
    a_message['company'] =  input_data['ent_table_name']
    a_message['intro'] =  res['response']
    a_message['spends'] = tick2-tick1
    
    send_resp(a_message)
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)


channel.queue_declare(queue='ent_intro_task',durable=True)
channel.basic_qos(prefetch_count=3)
channel.basic_consume(queue='ent_intro_task',
                        on_message_callback=callback, auto_ack =False)    

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

该worker获取数据,然后向本地大模型服务请求结果,然后将结果写到结果队列。启动worker进行测试,python3 ent_intro_worker.py 24096
没问题后就转入后台运行:nohup python3 ent_intro_worker.py 24096 >/dev/null 2>&1 &

在这里插入图片描述

3 有端口算力租用商Worker测试

3.1 负载均衡

由于单个的量化模型不足以充分利用显卡的性能,所以就要启动多个同样的服务。调用时需要进行多个服务的端口指定,这样就比较麻烦。
在这里插入图片描述
用nginx进行负载均衡,然后只暴露一个端口作为服务接口。然后接下来在远程主机调用这个服务接口(worker)。

租用一台仙宫云主机。数据上传有点问题,感觉它的云盘是外挂的,而且不稳定。最终我把数据先传到系统盘,再从系统盘传到云盘才成功。另外在启动服务时,模型的加载时间明显太长了。感觉云盘是机械盘。

rsync -rvltz  -e 'ssh -p 111'  --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace  root@m1ehp5n70rxvg81b.ssh.x-gpu.com:/root/
==> /root/cloud/

安装包

pip3 install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/

启动三个服务。

安装、配置并启动nginx。

events {
    #设置工作模式为epoll,除此之外还有select,poll,kqueue,rtsig和/dev/poll模式
    use epoll;
    #定义每个进程的最大连接数,受系统进程的最大打开文件数量限制
    worker_connections  1024;
}


http{
    # 配置nginx上传文件最大限制
    client_max_body_size 50000m;

    upstream multi_ma {
            # fair;
            server 172.17.0.1:10000 ;
            server 172.17.0.1:10001 ;
            server 172.17.0.1:10002 ;
        }

    server {
        listen 80;
        location / {
            proxy_pass http://multi_ma;
        }

    }

}

远端使用worker调用。

实操时发现,虽然仙宫云可以给一个80端口,但是似乎也是容器里的虚拟环境,不让再安装包了,所以也没法安装nginx。不过理论上应该可以实现。

最后,还是用类似AutoDL的方式启动3个worker。

两块4090之后,速度明显快多了。
在这里插入图片描述

3.2 获取结果并入库

建立对应的表

# 2 导入包
from Basefuncs import * 

# 快速载入连接
def make_local_wmongo_connect(server_name):
    try:
        tem_w = from_pickle(server_name)
        print('【Loading cur_w】from pickle')
    except:
        w = WMongo('w')
        tem_w = w.TryConnectionOnceAndForever(server_name =server_name)
        to_pickle(tem_w, server_name)
    return tem_w

m8_cur_w = make_local_wmongo_connect('m8.24003')

# 建立索引
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='pid')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='company')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='model_name')

从结果队列里取数,然后入库

# 封装函数
def get_some_batch_updated():
    credentials = pika.PlainCredentials('xxx', 'xxx')
    connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials, heartbeat=600))

    # 2 迭代的获取数据
    res_list = []
    with connection.channel() as channel:
        for i in range(100):
            # 声明一个队列
            channel.queue_declare(queue='ent_intro_result', durable=True)
            # 从队列中获取一条消息
            method_frame, header_frame, body = channel.basic_get(queue='ent_intro_result')
            res_dict = json.loads(body.decode())
            res_list.append(res_dict)
            channel.basic_ack(delivery_tag=method_frame.delivery_tag)

    # 3 拼凑为标准数据框
    res_df = pd.DataFrame(res_list)
    # 增加必要的模型字段
    res_df['model_name'] = 'chatglm2_6b_int4'
    res_df['pid'] = (res_df['company'] + res_df['model_name']).apply(md5_trans)

    m8_cur_w.insert_or_update_with_key(tier1 = 'llm', tier2 = 'company_intro', data_listofdict= res_df.to_dict(orient='records') , key_name='pid')
    connection.close()

# 获取并存储100条
get_some_batch_updated()

4 结语

本次完成了:

  • 1 RabbitMQ 和 RabbitAgent的建立。这使得其他机器可以不必要使用端口,非常适合超高计算传输比的任务。
  • 2 将原始数据通过rabbit agent 发布到任务队列
  • 3 将chatglm2-6b部署到算力租用机:测试了主流的三家autodl, anygpu和仙宫云,都是ok的
  • 4 在各算力机上启动worker进行处理
  • 5 将结果获取,然后存在本地的mongo

没能成功完成的实践是在仙宫云使用nginx做负载均衡,简化worker的请求。

结论:用llm来做任务成本还是比较高的。价格折算下来,大约 ¥1/千条。所以,要把大模型用在高价值领域,例如替代人工打标,写函数这些。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1589871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Harmony鸿蒙南向驱动开发-UART接口使用

功能简介 UART指异步收发传输器&#xff08;Universal Asynchronous Receiver/Transmitter&#xff09;&#xff0c;是通用串行数据总线&#xff0c;用于异步通信。该总线双向通信&#xff0c;可以实现全双工传输。 两个UART设备的连接示意图如下&#xff0c;UART与其他模块一…

浏览器工作原理与实践--同源策略:为什么XMLHttpRequest不能跨域请求资源

通过前面6个模块的介绍&#xff0c;我们已经大致知道浏览器是怎么工作的了&#xff0c;也了解这种工作方式对前端产生了什么样的影响。在这个过程中&#xff0c;我们还穿插介绍了一些浏览器安全相关的内容&#xff0c;不过都比较散&#xff0c;所以最后的5篇文章&#xff0c;我…

[Linux][进程控制][进程程序替换]详细解读

目录 1.进程创建1.fork函数初识2.fork函数返回值3.写时拷贝4.fork之后&#xff0c;父子进程代码共享5.fork常规用法6.fork调用失败的原因 2.进程终止0.进程终止时&#xff0c;操作系统做了什么&#xff1f;1.进程退出场景2.进程常见退出方法4 _exit函数(系统接口)4.exit函数(库…

数据结构DAY5--二叉树相关流程

流程有&#xff1a;创建->遍历->得到信息->销毁 创建 根据先序遍历的流程以及对叶子结点的左后驱结点和右后驱结点以#号替代的原则&#xff0c;写出一个数组&#xff0c;并建立一个结构体&#xff0c;包括数据域&#xff0c;结构体类型的左后驱结点和右后驱结点指针…

【C++核心-基础知识】内存分析和new操作符

内存分析和new操作符 一、内存分析1. 程序运行前就存在的区域1.1 代码区1.2 全局区1.3 代码演示 2. 程序运行后才存在的区域2.1 栈区2.2 堆区 二、new操作符1. 基本介绍2. 代码演示 一、内存分析 C程序在执行时&#xff0c;将内存大方向划分为4个区域&#xff1a; 代码区&…

堆排序及调整算法

调整算法 向上调整&#xff1a; 对大堆向上调整&#xff1a; adujust_up void adjust_up(int* a, int child) {int parent (child - 1) / 2;while (child > 0){if (a[child] > a[parent]){swap(a[child], a[parent]);child parent;parent (child - 1) / 2;}else//默…

有关格式输入输出的问题

对于格式输入输出问题&#xff0c;我们最好用c语言编写代码&#xff01;&#xff01;&#xff01; 成绩统计 难点&#xff1a;格式化输出 #include <cstdio> using namespace std; typedef long long ll;ll n,score,a,b;int main() {//及格>60 优秀>85 求及格率…

脱发治疗2.0时代,植发企业如何找到新发展高地?

“在春天这个万物生长的季节&#xff0c;只有头发在掉”。 脱发已成为现代人日常生活中最大的困扰之一。国家卫健委调查数据显示&#xff0c;我国脱发人口数量已超2.5亿&#xff0c;其中26-30岁人群占比达41.9%&#xff0c;占据脱发人群主流。考虑到快节奏生活的常态化&#x…

取消格式化 SSD磁盘:从格式化 SSD磁盘恢复数据

许多用户认为SSD上删除或格式化的数据就永远消失了。事实上&#xff0c;这是完全错误的。数据恢复软件能够取消格式化SSD或恢复SSD上的数据。在本文中&#xff0c;我们将向您展示一个简单的解决方案&#xff0c;可以从格式化的 SSD 中快速、完整地恢复数据。 格式化 SSD 的原因…

【保姆级讲解Element UI】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

青少年体能素质教育平台

一、项目背景与意义 随着社会的快速发展和人们生活水平的提高&#xff0c;青少年体能素质教育逐渐受到社会各界的广泛关注。体能素质作为青少年全面发展的重要组成部分&#xff0c;对于提升他们的健康水平、增强自信心、培养团队协作精神和创新能力具有重要意义。然而&#xf…

cygwin工具学习记录

文章目录 cygwin工具学习记录主要特点使用场景安装命令使用使用命令行安装命令包的基本步骤&#xff1a;安装apt-cyg&#xff1a;错误处理&#xff1a;错误一&#xff1a;错误二错误三 简单使用 cygwin工具学习记录 Cygwin是一个在Windows操作系统上提供类Unix环境的免费软件工…

tcp early retransmit 和 rack 中神奇的 1/4 minrtt

雨中跑步十公里&#xff0c;沿河看柳&#xff0c;发了一则朋友圈&#xff1a; 为什么采用 1/4 minrtt 作为重传和探测的延时&#xff0c;上图解释的已经很清楚了&#xff0c;主要还是怕乱序&#xff0c;关于乱序的度量&#xff0c;上图解释得非常清楚&#xff0c;乱序预期可在…

蓝桥2021A组E题

回路计数 问题描述格式输入格式输出评测用例规模与约定解析参考程序难度等级 问题描述 格式输入 无 格式输出 输出方案数 评测用例规模与约定 无 解析 题目的意思是21个教学楼对于编号互质的两个楼就有一个通道&#xff0c;可以想成一个图有双向边当编号互质时&#xff0c;…

path环境变量的作用

当我把一个运行文件的路径加入到了path环境变量&#xff0c;就可以在cmd命令行随时使用运行。 在path中有两个path上面的是用户的path&#xff0c;下面的是计算机的path

GD32 HID键盘矩阵键盘发送数据时,一直发送数据问题处理

这个问题找了两三天,开始并不认为是示例程序的问题,只是感觉是自己代码问题。 这个解决流程大概是: 先调好矩阵键盘=> 调用发送函数。 就是因为调用时,一直发送数据,我也在按键抬起做了操作,始终不行。 最后,发现时示例代码中有个 空闲中断 引起的。 udev->reg…

(一)Jetpack Compose 从入门到会写

基本概念 Compose 名称由来 众所周知&#xff0c;继承在功能拓展上表现的很脆弱&#xff0c;容易类、函数爆炸&#xff0c;通过代理和包装进行组合会更健壮。 Compose 意为组合&#xff0c;使用上也是把 Compose 函数以 模拟函数调用层级关系的方式 组合到一起&#xff0c;最终…

数据结构-----Lambda表达式

文章目录 1 背景1.1 Lambda表达式的语法1.2 函数式接口 2 Lambda表达式的基本使用2.1 语法精简 3 变量捕获3.1 匿名内部类3.2 匿名内部类的变量捕获3.3 Lambda的变量捕获 4 Lambda在集合当中的使用4.1 Collection接口4.2 List接口4.3 Map接口 HashMap 的 forEach() 5 总结 1 背…

【愚公系列】2023年12月 HarmonyOS教学课程 041-Stage模型(概述和组件配置)

&#x1f3c6; 作者简介&#xff0c;愚公搬代码 &#x1f3c6;《头衔》&#xff1a;华为云特约编辑&#xff0c;华为云云享专家&#xff0c;华为开发者专家&#xff0c;华为产品云测专家&#xff0c;CSDN博客专家&#xff0c;CSDN商业化专家&#xff0c;阿里云专家博主&#xf…

2024年第十四届 Mathorcup (A题)| PCI 规划问题 | 混淆矩阵 目标规划 |数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 让我们来看看Mathorcup (A题&#xff09;&#xff01; CS团队…