Skip to main content
重要说明
  • 本文档基于Volcano v1.13.0版本编写和测试。
  • 当前版本的Ray插件存在设计缺陷,无法支持自定义command指令,且hostnamesubdomain配置无法正常生效。
  • 本文档的测试版本经过自行修复,允许自定义command指令,以便功能测试能顺利进行。

Volcano Job插件机制概述

Volcano是一个基于Kubernetes的批处理调度系统,专为高性能计算、大数据和机器学习等场景设计。为了支持不同的分布式训练框架,Volcano Job提供了灵活的插件机制,允许用户通过配置不同的插件来适配各种分布式计算框架的特定需求。

插件机制工作原理

Volcano Job插件机制通过以下方式工作:

  1. 插件接口定义Volcano定义了标准的插件接口PluginInterface,所有插件必须实现该接口
  2. 生命周期钩子:插件可以在JobPod的不同生命周期阶段执行自定义逻辑
  3. 资源管理:插件可以自动创建和管理相关资源(如ServiceConfigMap等)
  4. 配置灵活性:用户可以在Job规范中声明需要使用的插件及其参数

插件接口定义

所有Volcano插件都需要实现以下核心接口:

type PluginInterface interface {
// 返回插件的唯一名称
Name() string

// 在Pod创建时被调用,可以修改Pod规范
OnPodCreate(pod *v1.Pod, job *vcbatch.Job) error

// 在Job创建时被调用,可以执行初始化操作
OnJobAdd(job *vcbatch.Job) error

// 在Job删除时被调用,可以执行清理操作
OnJobDelete(job *vcbatch.Job) error

// 在Job更新时被调用
OnJobUpdate(job *vcbatch.Job) error
}

支持的分布式训练框架插件

Volcano内置支持多种分布式训练框架的插件,以下是当前支持的主要框架:

插件名称框架主要用途核心功能
tensorflowTensorFlow深度学习分布式训练自动配置TF_CONFIG环境变量,支持PS-Worker架构
pytorchPyTorch深度学习分布式训练配置分布式训练所需的环境变量,支持多种后端
mpiMPI高性能计算、科学计算配置SSH免密、生成hostfile,支持MPI作业
rayRay通用分布式计算自动创建Ray Cluster,配置HeadWorker节点
hcclrankHCCL华为昇腾分布式训练配置华为昇腾芯片的分布式训练环境
ssh通用节点间通信配置Pod间的SSH免密访问
svc通用服务发现为每个Pod创建独立的Service
env通用环境变量注入自动注入任务相关的环境变量

Ray插件详解

Ray是一个开源的分布式计算框架,专为构建分布式应用和实现机器学习工作负载而设计。VolcanoRay插件使得在Kubernetes上运行Ray集群变得简单高效。

Ray插件的工作原理

Ray插件在Volcano Job运行期间执行以下核心操作:

  1. 自动配置Head节点

    • Head节点容器中注入启动命令:ray start --head --block
    • 配置并开放必要的端口(GCS端口6379Dashboard端口8265Client端口10001
    • 创建Service<job-name>-head-svc)用于集群内访问
  2. 自动配置Worker节点

    • Worker节点容器中注入启动命令:ray start --block --address=<head-service>:6379
    • 自动通过Service名称连接到Head节点
  3. 服务发现管理

    • Head节点创建ClusterIP Service
    • Service名称格式:<job-name>-head-svc
    • Worker节点通过该Service发现并连接Head节点
插件功能范围

Volcano Ray插件只负责创建Ray ClusterHead节点 + Worker节点),不支持直接在Volcano Job中提交Ray Job。任务提交需要通过以下两种方式之一:

  1. 方式一:使用kubectl execHead节点中执行Python脚本
  2. 方式二:创建独立的任务提交Pod,通过它向Ray Cluster提交任务

任务执行完毕后,手动销毁Volcano Job以释放资源。

Ray插件配置参数

Ray插件支持以下配置参数(通过插件参数传递):

参数名称默认值说明
headheadHead节点的任务名称
headContainerheadHead节点的容器名称
workerworkerWorker节点的任务名称
workerContainerworkerWorker节点的容器名称
port6379Ray GCS(Global Control Storage)端口
dashboardPort8265Ray Dashboard端口
clientPort10001Ray Client API端口

插件创建的资源

Ray插件会自动创建以下Kubernetes资源:

Service资源<job-name>-head-svc):为Head节点创建的普通ClusterIP Service,包含以下端口映射:

端口名称端口号用途说明
gcs6379用于Ray GCS通信
dashboard8265用于访问Ray Dashboard
client-server10001用于Ray Client API
Service说明

Volcano Ray插件会自动创建一个ClusterIP Service<job-name>-head-svc),该Service用于:

  • 集群内部访问Head节点(包括Worker节点连接)
  • 外部通过Ingress或者kubectl port-forward访问Ray DashboardClient API
  • 提供稳定的服务发现端点

Worker节点通过该Service自动发现并连接到Head节点,无需额外配置。

Ray插件使用方法

基本配置示例

以下是使用Ray插件创建Ray Cluster的最简单配置:

Ray的官方镜像仓库为:https://hub.docker.com/r/rayproject/ray ,为简化示例,这里使用精简版的rayproject/ray:latest-py311-cpu镜像。作者本机是arm64系统,请读者根据实际环境选择合适的镜像。

ray-cluster-job.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: ray-cluster-job
spec:
minAvailable: 3
schedulerName: volcano
plugins:
ray: [] # 启用Ray插件,使用默认配置
queue: default
tasks:
- replicas: 1
name: head # 必须有一个名为head的任务
template:
spec:
containers:
- name: head # 容器名称必须为head
image: rayproject/ray:latest-py311-cpu
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: "1"
memory: "2Gi"
restartPolicy: OnFailure
- replicas: 2
name: worker # worker任务
template:
spec:
containers:
- name: worker # 容器名称必须为worker
image: rayproject/ray:latest-py311-cpu
imagePullPolicy: IfNotPresent
command:
- /bin/bash
- -c
- |
ray start --block --address=ray-cluster-job-head-svc:6379
resources:
requests:
cpu: "1"
memory: "2Gi"
restartPolicy: OnFailure

自定义配置示例

如果需要自定义任务名称、容器名称或端口,可以通过插件参数配置:

ray-cluster-custom.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: ray-cluster-custom
spec:
minAvailable: 3
schedulerName: volcano
plugins:
ray:
- "--head=ray-head"
- "--headContainer=ray-head-container"
- "--worker=ray-worker"
- "--workerContainer=ray-worker-container"
- "--port=6380"
- "--dashboardPort=8266"
- "--clientPort=10002"
queue: default
tasks:
- replicas: 1
name: ray-head
template:
spec:
containers:
- name: ray-head-container
image: rayproject/ray:latest-py311-cpu
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: "2"
memory: "4Gi"
restartPolicy: OnFailure
- replicas: 3
name: ray-worker
template:
spec:
containers:
- name: ray-worker-container
image: rayproject/ray:latest-py311-cpu
imagePullPolicy: IfNotPresent
command:
- /bin/bash
- -c
- |
ray start --block --address=ray-cluster-job-head-svc:6379
resources:
requests:
cpu: "1"
memory: "2Gi"
restartPolicy: OnFailure

完整运行示例

下面提供一个完整的示例,演示如何使用Volcano JobRay插件创建Ray Cluster,并通过不同方式提交分布式计算任务。

为便于测试,以下运行的命名空间均使用volcano-system,请根据实际环境调整。

准备工作

前提条件

  • 已安装Kubernetes集群(版本1.27+
  • 已安装Volcano调度器(版本1.13+
  • 已创建default队列(默认应该有自动创建)

安装Volcano

# 添加Volcano Helm仓库
helm repo add volcano-sh https://volcano-sh.github.io/helm-charts
helm repo update

# 安装Volcano(版本可自行指定)
helm install volcano volcano-sh/volcano \
-n volcano-system \
--create-namespace \
--version 1.13.0

# 等待Volcano组件就绪,预计需要几分钟时间

创建Ray Cluster

创建Volcano Job来运行Ray集群:

ray-cluster.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: ray-cluster
namespace: volcano-system
spec:
minAvailable: 3
schedulerName: volcano
plugins:
ray: []
policies:
- event: PodEvicted
action: RestartJob
queue: default
tasks:
# Head节点 - Ray集群的主节点
- replicas: 1
name: head
template:
spec:
containers:
- name: head
image: rayproject/ray:latest-py311-cpu
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
restartPolicy: OnFailure

# Worker节点 - 执行计算任务的工作节点
- replicas: 2
name: worker
template:
spec:
containers:
- name: worker
image: rayproject/ray:latest-py311-cpu
imagePullPolicy: IfNotPresent
command:
- /bin/bash
- -c
- |
ray start --block --address=ray-cluster-head-svc:6379
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
restartPolicy: OnFailure

按照以下步骤部署Ray Cluster并提交任务:

步骤1:创建Volcano Job(Ray Cluster)

kubectl apply -f ray-cluster.yaml

# 查看Job状态
kubectl get vcjob -n volcano-system ray-cluster

# 查看Pods
kubectl get pods -n volcano-system -l volcano.sh/job-name=ray-cluster

# 等待所有Pod运行
kubectl wait --for=condition=Ready pod -n volcano-system -l volcano.sh/job-name=ray-cluster --timeout=300s

步骤2:验证Ray Cluster就绪

# 查看Service(插件自动创建)
kubectl get svc -n volcano-system | grep ray-cluster
# 应该看到: ray-cluster-head-svc (ClusterIP Service)

# 访问Ray Dashboard
kubectl port-forward -n volcano-system svc/ray-cluster-head-svc 8265:8265
# 在浏览器访问 http://localhost:8265

提交Ray任务

Volcano Ray插件只创建Ray Cluster,不负责任务提交。有两种方式向集群提交任务:

方式一:使用kubectl exec提交任务

这是最简单的方式,直接在Head节点中执行Python脚本:

1. 创建任务脚本

ray-job-code.py
import ray
import time

# 连接到Ray集群
ray.init(address='auto')

print(f"Ray cluster resources: {ray.cluster_resources()}")
print(f"Available nodes: {len(ray.nodes())}")

# 定义一个远程函数
@ray.remote
def compute_square(x):
time.sleep(1)
return x * x

# 并行计算
numbers = range(10)
futures = [compute_square.remote(i) for i in numbers]
results = ray.get(futures)

print(f"Input numbers: {list(numbers)}")
print(f"Squared results: {results}")
print("Job completed successfully!")

2. 将脚本复制到Head节点并执行

# 复制脚本到Head节点
kubectl cp ray-job-code.py volcano-system/ray-cluster-head-0:/tmp/

# 在Head节点中执行任务
kubectl exec -n volcano-system ray-cluster-head-0 -- python /tmp/ray-job-code.py

# 或者直接执行
kubectl exec -n volcano-system ray-cluster-head-0 -- bash -c "
cat <<'EOF' | python
import ray
ray.init(address='auto')

@ray.remote
def compute_square(x):
return x * x

results = ray.get([compute_square.remote(i) for i in range(10)])
print(f'Results: {results}')
EOF
"

方式二:使用任务提交Pod

创建一个独立的Pod,专门用于向Ray Cluster提交任务。这种方式更适合生产环境。

1. 创建任务ConfigMap

ray-job-code.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code
namespace: volcano-system
data:
job.py: |
import ray
import time

# 连接到Ray集群
ray.init(address='auto')

print(f"Ray cluster resources: {ray.cluster_resources()}")
print(f"Available nodes: {len(ray.nodes())}")

# 定义一个远程函数
@ray.remote
def compute_square(x):
time.sleep(1)
return x * x

# 并行计算
numbers = range(10)
futures = [compute_square.remote(i) for i in numbers]
results = ray.get(futures)

print(f"Input numbers: {list(numbers)}")
print(f"Squared results: {results}")
print("Job completed successfully!")

2. 创建任务提交Pod

ray-job-submitter.yaml
apiVersion: v1
kind: Pod
metadata:
name: ray-job-submitter
namespace: volcano-system
spec:
restartPolicy: Never
containers:
- name: submitter
image: rayproject/ray:latest-py311-cpu
command:
- /bin/bash
- -c
- |
# 等待Ray集群就绪
echo "Waiting for Ray cluster to be ready..."
until ray health-check --address=ray-cluster-head-svc:6379 2>/dev/null; do
sleep 5
done

echo "Ray cluster is ready, submitting job..."

# 执行Ray任务
python /mnt/job/job.py

echo "Job completed!"
env:
- name: RAY_ADDRESS
value: "ray://ray-cluster-head-svc:10001" # 使用Ray Client API
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
volumeMounts:
- name: job-code
mountPath: /mnt/job
volumes:
- name: job-code
configMap:
name: ray-job-code

2. 监控任务执行

# 应用ConfigMap
kubectl apply -f ray-job-code.yaml

# 创建任务提交Pod
kubectl apply -f ray-job-submitter.yaml

# 检查任务状态
kubectl get pod -n volcano-system ray-job-submitter

# 查看任务执行日志
kubectl logs -n volcano-system ray-job-submitter -f

预期输出

执行任务后,应该看到类似以下的输出:

方式一(kubectl exec)的输出

Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

方式二(任务提交Pod)的输出

Ray cluster resources: {'CPU': 4.0, 'memory': 8589934592, ...}
Available nodes: 3
Input numbers: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Squared results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Job completed successfully!

清理资源

任务执行完成后,清理创建的资源:

# 删除任务提交Pod(如果使用了方式二)
kubectl delete pod -n volcano-system ray-job-submitter

# 删除ConfigMap
kubectl delete configmap -n volcano-system ray-job-code

# 删除Volcano Job(Ray Cluster)
kubectl delete vcjob -n volcano-system ray-cluster

参考资料