简介
基础示例
最简单的可以通过 pip install ray
安装最小版本,然后直接通过如下方式使用。
import ray
ray.init(num_cpus=4) # 会自动查找地址
@ray.remote
def hello():
return "Hello World!!!"
oid = hello.remote()
print(ray.get(oid))
上述会按照多线程方式启动,如下介绍如何本地启动一个集群。
安装部署
建议采用一个独立的虚拟环境,在本地启动 Head 和 Worker 进程。。
(llm)$ pip install ray[default]
(llm)$ ray start --head --port=6379 --node-ip-address='127.0.0.1' \
--include-dashboard=true --dashboard-host='127.0.0.1' --dashboard-port=8265
(llm)$ ray start --address='7.250.139.152:6379'
(llm)$ ray status
(llm)$ ray stop
注意,最初通过 pip install ray
方式安装的是最小模式,不含 dashboard
功能,此时,上述的示例代码也可以工作,会自动查找本地集群,不过可以通过 ray.init(_node_ip_address='7.250.139.152')
手动指定。
也可以通过如下方式提交。
RAY_ADDRESS='http://127.0.0.1:8265' ray job submit --working-dir . -- python test.py
其中运行日志默认会保存在 /tmp/ray
目录下。
示例
上述的示例中,通过 @ray.remote
装饰器使得函数变成可分布式调用的任务,而通过 remote()
函数提交任务,最终 ray.get()
来获取任务返回值。其中 get()
阻塞执行,而且,分布式调度必定存在通讯成本,尽量避免小任务。
import time
import ray
#ray.init(num_cpus = 4)
ray.init()
@ray.remote
def work(x):
time.sleep(0.01)
return x
start = time.time()
results = ray.get([work.remote(x) for x in range(1000)])
print("duration = ", time.time() - start)
print("results = ", sum(results))
异步返回
ray.get()
会等所有任务执行完,如果有多个任务,而且执行时间各不相同,那么最终的耗时依赖最长的任务执行时间,此时可以通过 ray.wait()
实现,会返回执行完和未执行完的任务,对于执行完的任务可以继续执行后面的操作。
import ray
import time
import random
@ray.remote
def work(x):
time.sleep(random.uniform(0, 4))
return x
def sum(total, value):
return total + value
start = time.time()
oids = [work.remote(x) for x in range(4)]
total = 0
while len(oids):
done, oids = ray.wait(oids)
for task in done:
total = sum(total, ray.get(task))
print("duration = ", time.time() - start)
print("result = ", total)