ray-分布式计算框架-集群与异步Job管理

news2025/1/19 17:19:17

0. ray 简介

ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包

img

  • Ray AI Runtime

ML应用程序库集

  • Ray Core

通用分布式计算库

  • Task -- Ray允许任意Python函数在单独的Python worker上运行,这些异步Python函数称为任务
  • Actor -- 从函数扩展到类,是一个有状态的工作者,当一个Actor被创建,一个新的worker被创建,并且actor的方法被安排到那个特定的worker上,并且可以访问和修改那个worker的状态
  • Object -- Task与Actor在对象上创建与计算,被称为远程对象,被存储在ray的分布式共享内存对象存储上,通过对象引用来引用远程对象。集群中每个节点都有一个对象存储,远程对象存储在何处(一个或多个节点上)与远程对象引用的持有者无关
  • Placement Groups -- 允许用户跨多个节点原子性的保留资源组,以供后续Task与Actor使用
  • Environment Dependencies -- 当Ray在远程机器上执行Task或Actor时,它们的依赖环境项(Python包、本地文件、环境变量)必须可供代码运行。解决环境依赖的方式有两种,一种是在集群启动前准备好对集群的依赖,另一种是在ray的运行时环境动态安装
  • Ray cluster

一组连接到公共 Ray 头节点的工作节点,通过 kubeRay operator管理运行在k8s上的ray集群

  • 关联链接
    • Ray Doc: Overview — Ray 2.6.1
    • Ray Github: Helm (Cluster) - KubeRay Docs
    • Python raycluster 管理API: https://github.com/ray-project/kuberay/tree/master/clients/python-client
    • Ray Job Python SDK Doc: Python SDK API Reference — Ray 2.6.1

1. ray 集群管理

ray版本:2.3.0

  • Kind 创建测试k8s集群

1主3从集群

# 配置文件 -- 一主两从(默认单主),文件名:k8s-3nodes.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker

创建k8s集群

kind create cluster --config k8s-3nodes.yaml
  • KubeRay 部署ray集群
# helm方式安装
# 添加Charts仓库
helm repo add kuberay https://ray-project.github.io/kuberay-helm/

# 安装default名称空间
# 安装 kubeRay operator
# 下载离线的chart包: helm pull kuberay/kuberay-operator --version 0.5.0
# 本地安装: helm install kuberay-operator 
helm install kuberay-operator kuberay/kuberay-operator --version 0.5.0

# 创建ray示例集群,若通过sdk管理则跳过
# 下载离线的ray集群自定义资源:helm pull kuberay/ray-cluster  --version 0.5.0
helm install raycluster kuberay/ray-cluster --version 0.5.0

# 获取ray集群对应的CR
kubectl get raycluster

# 查询pod的状态
kubectl get pods

# 转发svc 8265端口到本地8265端口
kubectl port-forward --address 0.0.0.0 svc/raycluster-kuberay-head-svc 8265:8265

# 登录ray head节点,并执行一个job
kubectl exec -it ${RAYCLUSTER_HEAD_POD} -- bash
python -c "import ray; ray.init(); print(ray.cluster_resources())" # (in Ray head Pod)

# 删除ray集群
helm uninstall raycluster

# 删除kubeRay
helm uninstall kuberay-operator

# 查询helm管理的资源
helm ls --all-namespaces
  • Ray 集群管理

前置要求:

  1. 安装 KubeRay
  2. 安装 k8s sdk: pip install kubernetes
  3. 将python_client拷贝到PYTHONPATH路径下或者直接安装python_client, 该库路径为:https://github.com/ray-project/kuberay/tree/master/clients/python-client/python_client
from python_client import kuberay_cluster_api
from python_client.utils import kuberay_cluster_utils, kuberay_cluster_builder


def main():
    
    # ray集群管理的api 获取集群列表、创建集群、更新集群、删除集群
    kuberay_api = kuberay_cluster_api.RayClusterApi()

    # CR 构建器,构建ray集群对应的字典格式的CR
    cr_builder = kuberay_cluster_builder.ClusterBuilder()

    # CR资源对象操作工具,更新cr资源
    cluster_utils = kuberay_cluster_utils.ClusterUtils()

    # 构建集群的CR,是一个字典对象,可以修改、删除、添加额外的属性
    # 可以指定包含特定环境依赖的人ray镜像
    cluster = (
        cr_builder.build_meta(name="new-cluster1", labels={"demo-cluster": "yes"}) # 输入ray群名称、名称空间、资源标签、ray版本信息
        .build_head(cpu_requests="0", memory_requests="0")   # ray集群head信息: ray镜像名称、对应service类型、cpu memory的requests与limits、ray head启动参数
        .build_worker(group_name="workers", cpu_requests="0", memory_requests="0") # ray集群worker信息: worker组名称、 ray镜像名称、ray启动命令、cpu memory的requests与limits、默认副本个数、最大与最小副本个数
        .get_cluster()
    )
    
    # 检查CR是否构建成功
    if not cr_builder.succeeded:
        print("error building the cluster, aborting...")
        return

    # 创建ray集群
    kuberay_api.create_ray_cluster(body=cluster)

    # 更新ray集群CR中的worker副本集合
    cluster_to_patch, succeeded = cluster_utils.update_worker_group_replicas(
        cluster, group_name="workers", max_replicas=4, min_replicas=1, replicas=2
    )

    if succeeded:
        # 更新ray集群
        kuberay_api.patch_ray_cluster(
            name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
        )

    # 在原来的集群的CR中的工作组添加新的工作组
    cluster_to_patch, succeeded = cluster_utils.duplicate_worker_group(
        cluster, group_name="workers", new_group_name="duplicate-workers"
    )

    if succeeded:
        kuberay_api.patch_ray_cluster(
            name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
        )

    # 列出所有创建的集群
    kube_ray_list = kuberay_api.list_ray_clusters(k8s_namespace="default", label_selector='demo-cluster=yes')
    if "items" in kube_ray_list:
        for cluster in kube_ray_list["items"]:
            print(cluster["metadata"]["name"], cluster["metadata"]["namespace"])

    # 删除集群
    if "items" in kube_ray_list:
        for cluster in kube_ray_list["items"]:
            print("deleting raycluster = {}".format(cluster["metadata"]["name"]))
            
            # 通过指定名称删除ray集群
            kuberay_api.delete_ray_cluster(
                name=cluster["metadata"]["name"],
                k8s_namespace=cluster["metadata"]["namespace"],
            )


if __name__ == "__main__":
    main()

2. ray Job 管理

前置: pip install -U "ray[default]"

  • 创建一个job任务
# 文件名称: test_job.py
# python 标准库
import json
import ray
import sys

# 已经在ray节点安装的库
import redis

# 通过job提交时传递的模块依赖 runtime_env 配置 py_modules,通过 py_nodules传递过来就可以直接在job中导入
from test_module import test_1
import stk12

# 创建一个连接redeis对象,通过redis作为中转向job传递输入并获取job的输出
redis_cli = redis.Redis(host='192.168.6.205', port=6379,  decode_responses=True)

# 通过redis获取传入过来的参数
input_params_value = None
if len(sys.argv) > 1:
    input_params_key = sys.argv[1]
    input_params_value = json.loads(redis_cli.get(input_params_key))


# 执行远程任务
@ray.remote
def hello_world(value):
    return [v + 100 for v in value]

ray.init()

# 输出传递过来的参数
print("input_params_value:", input_params_value, type(input_params_key))

# 执行远程函数
result = ray.get(hello_world.remote(input_params_value))

# 获取输出key
output_key = input_params_key.split(":")[0] + ":output"

# 将输出结果放入redis
redis_cli.set(output_key, json.dumps(result))

# 测试传递过来的Python依赖库是否能正常导入
print(test_1.test_1())
print(stk12.__dir__())
  • 创建测试自定义模块
# 模块路径: test_module/test_1.py
def test_1():
    return "test_1"
  • 创建一个job提交对象
import json

from ray.job_submission import JobSubmissionClient, JobStatus
import time
import uuid
import redis

# 上传un到ray集群供job使用的模块
import test_module
from agi import stk12

# 创建一个连接redeis对象
redis_cli = redis.Redis(host='192.168.6.205', port=6379,  decode_responses=True)

# 创建一个client,指定远程ray集群的head地址
client = JobSubmissionClient("http://127.0.0.1:8265")

# 创建任务的ID
id = uuid.uuid4().hex
input_params_key = f"{id}:input"
input_params_value = [1, 2, 3, 4, 5]

# 将输入参数存入redis,供远程函数job使用
redis_cli.set(input_params_key, json.dumps(input_params_value))


# 提交一个ray job 是一个独立的ray应用程序
job_id = client.submit_job(
    # 执行该job的入口脚本
    entrypoint=f"python test_job.py {input_params_key}",

    # 将本地文件上传到ray集群
    runtime_env={
        "working_dir": "./",
        "py_modules": [test_module, stk12],
        "env_vars": {"testenv": "test-1"}
    },

    # 自定义任务ID
    submission_id=f"{id}"
)

# 输出job ID
print("job_id:", job_id)


def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    """轮询获取Job的状态,当完成时获取任务的的日志输出"""
    start = time.time()
    while time.time() - start <= timeout_seconds:
        # 获取任务的状态
        status = client.get_job_status(job_id)
        print(f"status: {status}")

        # 检查任务的状态
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})

# 输出job日志
logs = client.get_job_logs(job_id)
print(logs)

# 输出从job中获取的任务
output_key = job_id + ":output"
output_value = redis_cli.get(output_key)
print("output:", output_value)
  • job 管理
from ray.job_submission import JobSubmissionClient, JobDetails, JobInfo, JobType, JobStatus
# 创建一个job提交客户端,如果管理多个ray集群的Job则切换或者创建多个连接ray head节点的客户端
job_cli = JobSubmissionClient("http://127.0.0.1:8265")

# Job信息,对应Job中submission_id属性
job_id = "b9ad6ff9ada445a29fb54307f1394594"
job_info = job_cli.get_job_info(job_id)

# 获取提交的所有job
jobs = job_cli.list_jobs()

for job in jobs:

    # 获取job的状态
    job_status = job_cli.get_job_status(job.submission_id)
    print(f"job_id: {job.submission_id}, job_status: {job_status}")

    # 输出job的json格式详情
    print("job:", job.json())

# 停止Job
job_cli.stop_job(job_id)

# 删除 job
# job_cli.delete_job(job_id)

# 提交 Job
# job_cli.submit_job()


# 获取版本信息
print("version:", job_cli.get_version())

3. 产品场景

  • 将周期、耗时任务异步化

镜像文件打包下载、文件同步、运维脚本、数据导出与同步、镜像同步、服务启停、TATC卫星项目中算法任务的执行、批量同类型任务的计算(如卫星项目中卫星轨迹的计算)、备份任务

  • k8s中每个租户可以创建与删除自己的ray集群实例,在线IDE中将计算型任务交给ray来执行,不消耗IED所在环境的计算资源

  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/925845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

矢量调制分析基础

前言 本文介绍VSA 的矢量调制分析和数字调制分析测量能力。某些扫频调谐频谱分析仪也能通过使用另外的数字无线专用软件来提供数字调制分析。然而&#xff0c;VSA 通常在调制格式和解调算法配置等方面提供更大的测量灵活性&#xff0c;并提供更多的数据结果和轨迹轨迹显示。本…

Codeforces Round #894 (Div.3)

文章目录 前言A. Gift Carpet题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; B. Sequence Game题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; C. Flower City Fence题目&#xff1a;输入&#xff1a…

keepalived双机热备 (四十五)

一、概述 Keepalived 是一个基于 VRRP 协议来实现的 LVS 服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题。 原理 在一个 LVS 服务集群中通常有主服务器&#xff08;MASTER&#xff09;和备份服务器&#xff08;BACKUP&#xff09;两种角色的服务器…

nuxt2-storybook-vite:环境搭建、基础使用 / nuxt项目组件库

一、创建 nuxt2 项目 安装 - NuxtJS | Nuxt.js 中文网 yarn create nuxt-app <项目名> 二、安装 storybook 2.1、初始化 Storybook pnpm add -g storybook/cli npx -p storybook/cli sb init 命令解释 序号命令命令解释1npx -p storybook/cli sb init是一个命令行…

基于JSP+Servlet+mysql员工权限管理系统

基于JSPServletmysql员工权限管理系统 一、系统介绍二、功能展示四、其他系统实现五、获取源码 一、系统介绍 项目类型&#xff1a;Java web项目 项目名称&#xff1a;基于JSPServlet的员工权限管理系统[qxxt] 项目架构&#xff1a;B/S架构 开发语言&#xff1a;Java语言 …

【ag-grid-vue】column

网格中的每一列都使用列定义(ColDef)来定义。列根据在网格选项中指定的列定义的顺序在网格中定位。 列定义 下面的例子展示了一个定义了3列的简单网格: <template><ag-grid-vuestyle"height: 300px; width: 1000px"class"ag-theme-balham":colum…

最新外卖点餐微信小程序源码系统 完整前后端+安装部署教程

分享一个完整的外卖点餐微信小程序源码系统&#xff0c;含完整代码程序包和详细的安装搭建教程。 系统为多用户&#xff0c;可以无限多开&#xff0c;轻松帮助商家开发各种外卖点餐小程序。系统功能特别强大。 小程序源码下载地址&#xff1a;春哥技术博客获取

python SystemRDL 包介绍

对于芯片验证&#xff0c;在验证寄存器环节&#xff0c;如果我们需要根据大量的寄存器来构建我们的sequence或者激励&#xff0c;比如irq测试&#xff0c;我们需要测试irq信号源到寄存器门口的连接是否正常&#xff0c;irq 寄存器各个field的接线排序是否有弄错&#xff0c;以及…

Linux常用命令——dhcpd命令

在线Linux命令查询工具 dhcpd 运行DHCP服务器。 语法 dhcpd [选项] [网络接口]选项 -p <端口> 指定dhcpd监听的端口 -f 作为前台进程运行dhcpd -d 启用调试模式 -q 在启动时不显示版权信息 -t 简单地测试配置文件的语法是否正确的&#xff0c;但不会尝试执行任何网络…

五度易链最新“产业大数据服务解决方案”亮相,打造数据引擎,构建智慧产业!

自2015年布局产业大数据服务行业以来&#xff0c;“五度易链”作为全国产业大数据服务行业先锋企业&#xff0c;以“让数据引领决策&#xff0c;以智慧驾驭未来”为愿景&#xff0c;肩负“打造数据智能引擎&#xff0c;构建智慧产业新生态”的使命&#xff0c;坚持着精益生产、…

地下水资源监控中应用的深水液位传感器

地下水是水资源重要的组成部分,虽属可再生资源,但地下水更新和自净非常缓慢,一旦被污染,所造成的环境与生态破坏,往往长时间难以逆转。目前中国90%的城市地下水遭受污染,已呈现由点向面扩展的趋势。因此,加强对地下水的监控和相应技术的开发成为一种迫切需要。 环境监测是环境…

大数据(一)定义、特性

大数据&#xff08;一&#xff09;定义、特性 本文目录&#xff1a; 一、写在前面的话 二、大数据定义 三、大数据特性 3.1、大数据的大量 (Volume) 特性 3.2、大数据的高速(Velocity)特性 3.3、大数据的多样化 (Variety) 特性 3.4、大数据的价值 (value) 特性 3.5、大…

Linux下套接字TCP实现网络通信

Linux下套接字TCP实现网络通信 文章目录 Linux下套接字TCP实现网络通信1.引言2.具体实现2.1接口介绍1.socket()2.bind()3.listen()4.accept()5.connect() 2.2 服务器端server.hpp2.3服务端server.cc2.4客户端client.cc 1.引言 ​ 套接字(Socket)是计算机网络中实现网络通信的一…

【算法】双指针求解盛最多水的容器

Problem: 11. 盛最多水的容器 文章目录 题目解析算法原理讲解复杂度Code 题目解析 首先我们来解析一下本题 题目中说到&#xff0c;要找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 那我们现在来看最外侧的两根&#xff0c;一个高度为8&#…

GaussDB数据库SQL系列:DROP TRUNCATE DELETE

目录 一、前言 二、GaussDB的 DROP & TRUNCATE & DELETE 简述 1、命令简述 2、命令比对 三、GaussDB的DROP TABLE命令及示例 1、功能描述 2、语法 3、示例 四、GaussDB的TRUNCATE命令及示例 1、功能描述 2、语法 3、示例 4、示例 五、GaussDB的DELETE命令…

一文速学-让神经网络不再神秘,一天速学神经网络基础-激活函数(二)

前言 思索了很久到底要不要出深度学习内容&#xff0c;毕竟在数学建模专栏里边的机器学习内容还有一大半算法没有更新&#xff0c;很多坑都没有填满&#xff0c;而且现在深度学习的文章和学习课程都十分的多&#xff0c;我考虑了很久决定还是得出神经网络系列文章&#xff0c;…

VSCode配置终端默认为cmd命令行方式

1、新建终端 2、点击默认配置文件 3、选择第一个即可&#xff01;

臻图信息基于数字孪生技术搭建智慧电厂管理系统解决方案

随着可再生能源在电力行业中占比不断提升&#xff0c;以及互联网技术的深入和大数据时代的到来&#xff0c;智能化应用正在悄然地改变着电力企业运营模式。臻图信息以数字孪生、ZTMap3D、地理信息为技术手段&#xff0c;从管、查、监、云、端等几个层面全面建设电力监管系统平台…

国产调度器之光——Fsched到底有多能打?

这是一篇推荐我们速石自研调度器——Fsched的文章。 看起来在专门写调度器&#xff0c;但又不完全在写。 往下看&#xff0c;你就懂了。 本篇一共五个章节&#xff1a; 一、介绍一下主角——速石自研调度器Fsched 二、只要有个调度器&#xff0c;就够了吗&#xff1f; 三…

伯俊ERP对接打通金蝶云星空表头表体组合查询接口与应收单新增接口

伯俊ERP对接打通金蝶云星空表头表体组合查询接口与应收单新增接口 对接源平台:伯俊ERP 伯俊科技&#xff0c;依托在企业信息化建设方面的领先技术与实践积累&#xff0c;致力于帮助企业实现全渠道一盘货。伯俊提供数字经营的咨询与系统实施&#xff0c;助力企业信息化升级、加速…