ray使用也有一段时间了, 这篇文章总结下ray的使用场景和用法
ray可以做什么?
总结就两点:
- 可以将其视为一个进程池(当然不仅限于此), 可以用于开发并发应用
- 还可以将应用改造为分布式
基于以上两点, 有人称之为:Modern Parallel and Distributed Python
构成
- Ray AI Runtime
用于开发分布式机器学习应用的工具包, 包括数据处理/模型训练和tuning/强化学习/部署服务… - Ray Core
用于开发/改造并发/分布式应用程序, 大部分通过装饰器形式实现. 因此如果你想改造现有应用, 基本不用怎么修改代码. - Ray Clusters
用于在云上或k8s集群上开发自动伸缩服务的分布式程序.
使用
由于笔者只使用过Ray Core功能, 因此只介绍这个模块, 其他可以参考官网ray doc
- 安装
pip install ray
- 术语
- Task: 任务实例, 通过给函数添加装饰器实现
看下例子:
import time
import ray
@ray.remote
def square(x):
time.sleep(5)
return x * x
start = time.time()
tasks = [square.remote(i) for i in range(3)]
print(ray.get(tasks))
end = time.time()
print(f'time cost:{end-start}')
输出:
[0, 1, 4]
time cost:9.302684783935547
如果顺序执行square()3次, 预计需要15s, 但是使用了@ray.remote后, 总执行时间缩减到了9s(sleep时间太短会出现改造后的执行时间反而更大的现象, 这是因为ray在启动时会先进行初始化(类似于生成进程池), 这个本身也是耗时的, 本机测试大概需要4s)
上面例子可以看出, 要将一个普通函数改造为并发非常简单, 只需要加装饰器后执行func.remote(*args, **kwargs)
即可, 获取函数结果使用rag.get()
, 但需要注意的是, square.remote(i)
是非阻塞的, 立即返回, 而remote.ray.get()
是阻塞的(有点类似multipleprocessing pool中的apply_async()
), 因此在启动多个task期间, 不要去get, 否则就会退化为顺序执行.
- actor: 任务实例, 通过给类添加装饰器实现
看下例子:
@ray.remote
class Counter:
def __init__(self):
self.i = 0
def get(self):
return self.i
def incr(self, value):
self.i += value
c = Counter.remote()
for _ in range(10):
c.incr.remote(1)
print(ray.get(c.get.remote()))
输出: 10
用法和上面的task差不多.
- objects
在ray中, 数据或者可以产生数据的对象(如上面的task和actor)称为object, 该对象一旦生成不可改变. 这么说可能有点抽象, 举个例子, 一般大型任务由多个小任务组成, 这些小任务组成了一个pipeline, 该pipeline上的所有节点都是object, 每个obect都有自己的object ref, 相当于它的id, 在集群所有node上是唯一的. 看下计算 ( ( a + b ) × 2 ) 2 ((a+b)\times2)^2 ((a+b)×2)2例子:
import ray
@ray.remote
def simple_sum(a, b):
return a + b
@ray.remote
class Mul:
def __init__(self, factor):
self.factor = factor
def process(self, x):
return x * self.factor
@ray.remote
def square(a):
return a * a
if __name__ == '__main__':
a, b = 3, 5
simple_sum_ref = simple_sum.remote(a, b)
mul = Mul.remote(2)
mul_ref = mul.process.remote(simple_sum_ref)
square_ref = square.remote(mul_ref)
print('simple_sum_ref==>', simple_sum_ref)
print('mul_ref==>', mul_ref)
print('square_ref==>', square_ref)
print('result: ', ray.get(square_ref))
输出:
simple_sum_ref==> ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)
mul_ref==> ObjectRef(c2668a65bda616c17530094e1437f255eb2e95990100000001000000)
square_ref==> ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000)
result: 256
ray.put()
生成object有两种方式, 一种如上面例子, 调用task或者actor, 便会return回来一个object, 另外一种方式就是使用ray.put(), 看下例子:
import ray
import numpy as np
@ray.remote
def matrix_sum(x):
return np.sum(x)
if __name__ == '__main__':
a_ref = ray.put(np.array([1,2,3]))
print(f'a_ref==>{a_ref}')
print(ray.get(matrix_sum.remote(a_ref)))
输出:
a_ref==>ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000)
6
有小伙伴会问, 直接把上面的numpy array传给task就行了, 为什么要多此一举, 使用put先生成object, 再传递过去呢?
这就涉及到ray的内存管理了, 如下图:
在ray中, 执行任务的实体就是actor和task, 相当于multiprocessing中的worker, 和worker类似, 每个actor或task都是一个单独进程, 他们之间是不共享内存的, 但是实际任务中少不了要交换数据怎么办?通过一块叫Object Store的区域. 这块内存区域是共享的, 是用来存储object的地方, 各个actor或task产生的obect都会放在这个区域, 如果想获取obect的value, 也是从这个区域取的结果.
基于以上原因, 在某些场景, 如果为了方便共享数据, 尤其是在分布式的环境下, 如果想共享某变量, 只需要使用ray.put()将该变量转化为object, 该集群下的所有节点都可以get到该数据了.
比如我有100w张imgs, 需要在分布式的环境下分析所有imgs的数据质量(大小, 曝光等等). 可以通过将100w个imgs路径打包成一个object, ray的各个节点就可以获取到这个object, 然后每个节点各分析一部分即可.
- ray serve
有空再补充