本文详细介绍Volcano Job中的Plugin机制及各个插件的配置和使用方法。关于Job的基础介绍,请参考 Volcano Job详解。
Plugin机制概述
Volcano Job Plugin是一套扩展机制,用于在Job的不同生命周期阶段自动注入额外的功能和配置。插件机制使得Volcano能够灵活支持各种分布式计算框架和应用场景,而无需修改核心代码。
Plugin接口定义
所有插件都需要实现以下接口:
type PluginInterface interface {
// Name 返回插件的唯一名称
Name() string
// OnPodCreate 在创建Pod时被调用,可以修改Pod配置
OnPodCreate(pod *v1.Pod, job *vcbatch.Job) error
// OnJobAdd 在Job初始化时被调用,可以创建辅助资源
// 注意:可能被多次调用,必须保证幂等性
OnJobAdd(job *vcbatch.Job) error
// OnJobDelete 在Job删除时被调用,用于清理资源
// 注意:可能被多次调用,必须保证幂等性
OnJobDelete(job *vcbatch.Job) error
// OnJobUpdate 在Job更新时被调用
// 注意:可能被多次调用,必须保证幂等性
OnJobUpdate(job *vcbatch.Job) error
}
Plugin生命周期
插件在Job的以下生命周期阶段发挥作用:
- Job创建阶段:
OnJobAdd()被调用,通常用于创建辅助资源(如Secret、ConfigMap、Service等) - Pod创建阶段:
OnPodCreate()被调用,用于修改Pod配置(如添加环境变量、挂载卷等) - Job更新阶段:
OnJobUpdate()被调用,用于更新相关资源 - Job删除阶段:
OnJobDelete()被调用,用于清理创建的资源
Plugin配置方式
在Job的spec.plugins字段中配置需要启用的插件:
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: example-job
spec:
plugins:
ssh: [] # 启用SSH插件,无参数
env: [] # 启用环境变量插件
svc: ["--publish-not-ready-addresses=true"] # 启用服务插件,带参数
pytorch: ["--master=master", "--worker=worker"] # 启用PyTorch插件,带参数
基础插件
SSH
SSH Plugin为Volcano Job中的所有Pod提供无密码登录功能,这对于需要节点间通信的工作负载(如MPI)是必需的。
工作原理
- 密钥生成:在
Job创建时自动生成RSA密钥对(2048位),或使用用户提供的密钥 - Secret创建:将密钥存储在名为
{JobName}-ssh的Secret中,包含以下内容:id_rsa:私钥id_rsa.pub:公钥authorized_keys:授权密钥文件config:SSH配置文件
- 卷挂载:将
Secret作为卷挂载到所有容器(包括InitContainer)的指定路径 - 主机配置:在
config文件中包含所有Pod的主机名和域名对
配置参数
| 参数名 | 类型 | 默认值 | 必需 | 说明 | 示例 |
|---|---|---|---|---|---|
ssh-key-file-path | string | /root/.ssh | 否 | SSH密钥文件存储路径 | --ssh-key-file-path=/home/user/.ssh |
ssh-private-key | string | 自动生成 | 否 | 用户提供的私钥内容 | --ssh-private-key=-----BEGIN RSA PRIVATE KEY-----... |
ssh-public-key | string | 自动生成 | 否 | 用户提供的公钥内容 | --ssh-public-key=ssh-rsa AAAAB3... |
使用场景
- MPI分布式计算:主节点需要通过
SSH在工作节点上启动进程 - 分布式训练:训练任务需要在不同节点间建立直接通信
- 集群管理:需要在集群内部执行命令或传输文件
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: mpi-job
spec:
queue: default
minAvailable: 3
schedulerName: volcano
plugins:
ssh: [] # 使用默认配置,自动生成密钥
svc: []
tasks:
- replicas: 1
name: mpimaster
template:
spec:
containers:
- name: mpimaster
image: lyd911/mindspore-gpu-example:0.2.0
command:
- /bin/bash
- -c
- |
mkdir -p /var/run/sshd; /usr/sbin/sshd;
MPI_HOST=`cat /etc/volcano/mpiworker.host | tr "\n" ","`;
sleep 10;
mpiexec --allow-run-as-root --host ${MPI_HOST} -np 2 python /tmp/gpu-test.py;
ports:
- containerPort: 22
name: mpijob-port
workingDir: /home
restartPolicy: OnFailure
- replicas: 2
name: mpiworker
template:
spec:
containers:
- name: mpiworker
image: lyd911/mindspore-gpu-example:0.2.0
command:
- /bin/bash
- -c
- |
mkdir -p /var/run/sshd; /usr/sbin/sshd -D;
resources:
limits:
nvidia.com/gpu: "1"
ports:
- containerPort: 22
name: mpijob-port
workingDir: /home
restartPolicy: OnFailure
验证SSH连接:
在master Pod中执行:
# 查看SSH配置
cat /root/.ssh/config
# 输出:
# StrictHostKeyChecking no
# UserKnownHostsFile /dev/null
# Host mpi-job-mpimaster-0
# HostName mpi-job-mpimaster-0.mpi-job
# Host mpi-job-mpiworker-0
# HostName mpi-job-mpiworker-0.mpi-job
# Host mpi-job-mpiworker-1
# HostName mpi-job-mpiworker-1.mpi-job
# 无密码登录worker节点
ssh mpi-job-mpiworker-0
注意事项
- 确保容器镜像中安装了
sshd服务 - 如果使用非默认路径,需要确保应用程序能够访问该路径
- 建议大多数场景使用默认配置,让
Volcano自动生成密钥 SSH Plugin通常与SVC Plugin配合使用,以提供主机名解析功能
SVC
SVC Plugin为Volcano Job中的Pod提供网络通信能力,使Pod之间可以通过域名相互访问。这对于需要节点间通信的分布式应用(如TensorFlow、MPI)是必需的。
工作原理
-
主机名和子域名配置:
- 自动设置每个
Pod的hostname为Pod名称 - 自动设置每个
Pod的subdomain为Job名称 - 完整的
FQDN格式为:{PodName}.{JobName}.{Namespace}.svc.cluster.local
- 自动设置每个
-
Headless Service创建:
- 创建名为
{JobName}的Headless Service(ClusterIP: None) - 该服务将
Pod的FQDN指向其IP地址
- 创建名为
-
ConfigMap创建:
- 创建名为
{JobName}-svc的ConfigMap - 包含每个
Task的主机列表和副本数量 - 将
ConfigMap挂载到所有Pod的/etc/volcano/目录
- 创建名为
-
环境变量注入:
- 为每个
Task注入VC_{TaskName}_HOSTS环境变量(包含该Task所有Pod的域名) - 为每个
Task注入VC_{TaskName}_NUM环境变量(包含该Task的副本数量)
- 为每个
-
NetworkPolicy创建(可选):
- 默认创建
NetworkPolicy,允许Job内部的Pod相互通信 - 可通过参数禁用
- 默认创建
配置参数
| 参数名 | 类型 | 默认值 | 必需 | 说明 | 示例 |
|---|---|---|---|---|---|
publish-not-ready-addresses | bool | false | 否 | 是否发布未就绪的Pod地址 | --publish-not-ready-addresses=true |
disable-network-policy | bool | false | 否 | 是否禁用NetworkPolicy | --disable-network-policy=true |
使用场景
- TensorFlow分布式训练:
PS和Worker需要相互通信 - MPI并行计算:主节点需要知道所有工作节点的地址
- Spark分布式计算:
Driver和Executor需要建立连接 - 分布式数据库集群:节点间需要建立集群关系
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tensorflow-dist-mnist
spec:
queue: default
minAvailable: 3
schedulerName: volcano
plugins:
env: []
svc: ["--publish-not-ready-addresses=false", "--disable-network-policy=false"]
policies:
- event: PodEvicted
action: RestartJob
queue: default
tasks:
- replicas: 1
name: ps
template:
spec:
containers:
- name: tensorflow
image: volcanosh/dist-mnist-tf-example:0.0.1
command:
- sh
- -c
- |
PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
export TF_CONFIG={\"cluster\":{\"ps\":[${PS_HOST}],\"worker\":[${WORKER_HOST}]},\"task\":{\"type\":\"ps\",\"index\":${VK_TASK_INDEX}},\"environment\":\"cloud\"};
python /var/tf_dist_mnist/dist_mnist.py
ports:
- containerPort: 2222
name: tfjob-port
restartPolicy: Never
- replicas: 2
name: worker
policies:
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- name: tensorflow
image: volcanosh/dist-mnist-tf-example:0.0.1
command:
- sh
- -c
- |
PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
export TF_CONFIG={\"cluster\":{\"ps\":[${PS_HOST}],\"worker\":[${WORKER_HOST}]},\"task\":{\"type\":\"worker\",\"index\":${VK_TASK_INDEX}},\"environment\":\"cloud\"};
python /var/tf_dist_mnist/dist_mnist.py
ports:
- containerPort: 2222
name: tfjob-port
restartPolicy: Never
验证服务和环境变量:
# 查看创建的Service
kubectl get service
# 输出:
# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
# tensorflow-dist-mnist ClusterIP None <none> <none> 5s
# 在Pod中查看环境变量
kubectl exec tensorflow-dist-mnist-ps-0 -- env | grep VC_
# 输出:
# VC_PS_HOSTS=tensorflow-dist-mnist-ps-0.tensorflow-dist-mnist
# VC_PS_NUM=1
# VC_WORKER_HOSTS=tensorflow-dist-mnist-worker-0.tensorflow-dist-mnist,tensorflow-dist-mnist-worker-1.tensorflow-dist-mnist
# VC_WORKER_NUM=2
# 查看主机列表文件
kubectl exec tensorflow-dist-mnist-ps-0 -- cat /etc/volcano/ps.host
# 输出:
# tensorflow-dist-mnist-ps-0.tensorflow-dist-mnist
kubectl exec tensorflow-dist-mnist-ps-0 -- cat /etc/volcano/worker.host
# 输出:
# tensorflow-dist-mnist-worker-0.tensorflow-dist-mnist
# tensorflow-dist-mnist-worker-1.tensorflow-dist-mnist
注意事项
SVC Plugin为分布式应用提供基础网络能力,通常是其他插件的前置依赖- 环境变量中的
Task名称会自动转换:将-替换为_,并转换为大写 - 当
Pod重建时,FQDN保持不变,但IP地址会改变 publish-not-ready-addresses设置为true时,即使Pod未就绪也会发布其地址,适合需要提前建立连接的场景
Env
Env Plugin为Volcano Job中的每个Pod自动注入索引环境变量,使Pod能够感知自己在Task中的位置。这对于需要根据索引分配不同数据分片或角色的应用非常有用。
工作原理
- 索引计算:根据
Pod在其Task中的位置计算索引值(从0开始) - 环境变量注入:为所有容器(包括
InitContainer)注入以下环境变量:VK_TASK_INDEX:Pod在Task中的索引(历史兼容,将来会废弃)VC_TASK_INDEX:Pod在Task中的索引(推荐使用)
配置参数
无需配置参数,直接启用即可:
plugins:
env: []
使用场景
- 数据分片:每个
Worker根据索引处理不同的数据分片 - 角色分配:根据索引分配不同的角色(如
rank 0作为主节点) - 分布式训练:
TensorFlow、PyTorch等框架需要知道每个进程的rank - 配置差异化:根据索引加载不同的配置文件
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tensorflow-dist-mnist
spec:
queue: default
minAvailable: 3
schedulerName: volcano
plugins:
env: [] # 启用环境变量插件
svc: []
policies:
- event: PodEvicted
action: RestartJob
queue: default
tasks:
- replicas: 1
name: ps
template:
spec:
containers:
- name: tensorflow
image: volcanosh/dist-mnist-tf-example:0.0.1
command:
- sh
- -c
- |
PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
# 使用VC_TASK_INDEX环境变量配置TensorFlow任务索引
export TF_CONFIG={\"cluster\":{\"ps\":[${PS_HOST}],\"worker\":[${WORKER_HOST}]},\"task\":{\"type\":\"ps\",\"index\":${VC_TASK_INDEX}},\"environment\":\"cloud\"};
python /var/tf_dist_mnist/dist_mnist.py
ports:
- containerPort: 2222
name: tfjob-port
restartPolicy: Never
- replicas: 2
name: worker
policies:
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- name: tensorflow
image: volcanosh/dist-mnist-tf-example:0.0.1
command:
- sh
- -c
- |
PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`;
# 使用VC_TASK_INDEX环境变量配置TensorFlow任务索引
export TF_CONFIG={\"cluster\":{\"ps\":[${PS_HOST}],\"worker\":[${WORKER_HOST}]},\"task\":{\"type\":\"worker\",\"index\":${VC_TASK_INDEX}},\"environment\":\"cloud\"};
python /var/tf_dist_mnist/dist_mnist.py
ports:
- containerPort: 2222
name: tfjob-port
restartPolicy: Never
验证环境变量:
# 查看ps Pod的索引
kubectl exec tensorflow-dist-mnist-ps-0 -- env | grep TASK_INDEX
# 输出:
# VK_TASK_INDEX=0
# VC_TASK_INDEX=0
# 查看worker Pod的索引
kubectl exec tensorflow-dist-mnist-worker-0 -- env | grep TASK_INDEX
# 输出:
# VK_TASK_INDEX=0
# VC_TASK_INDEX=0
kubectl exec tensorflow-dist-mnist-worker-1 -- env | grep TASK_INDEX
# 输出:
# VK_TASK_INDEX=1
# VC_TASK_INDEX=1
注意事项
VK_TASK_INDEX将在未来版本中废弃,推荐使用VC_TASK_INDEX- 索引值从
0开始,最大值为replicas - 1 - 当
Pod重建时,如果Pod名称不变,索引值也保持不变 - 该插件无需配置参数,注册时传入空数组即可
分布式框架插件
PyTorch
PyTorch Plugin为PyTorch分布式训练任务自动配置必要的环境变量,支持DistributedDataParallel (DDP)等分布式训练模式。
工作原理
- 角色识别:识别
master和worker任务 - 环境变量注入:为所有容器注入以下环境变量:
MASTER_ADDR:主节点的域名地址MASTER_PORT:主节点的通信端口WORLD_SIZE:总的进程数量RANK:当前进程的全局排名(master为0,worker从1开始)
- 端口开放:为所有容器开放指定的通信端口
配置参数
| 参数名 | 类型 | 默认值 | 必需 | 说明 | 示例 |
|---|---|---|---|---|---|
master | string | master | 否 | Master任务名称 | --master=main |
worker | string | worker | 否 | Worker任务名称 | --worker=node |
port | integer | 23456 | 否 | PyTorch通信端口 | --port=29500 |
使用场景
- PyTorch DDP训练:自动配置分布式数据并行训练
- 多GPU训练:跨节点的多
GPU训练 - 弹性训练:支持节点动态伸缩的训练任务
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: pytorch-job
spec:
queue: default
minAvailable: 3
schedulerName: volcano
plugins:
pytorch: ["--master=master", "--worker=worker", "--port=23456"]
svc: []
tasks:
- replicas: 1
name: master
policies:
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.4.1-cuda11.8-cudnn9-runtime
imagePullPolicy: IfNotPresent
command:
- python
- -c
- |
import os
import torch
import torch.distributed as dist
print(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
print(f"MASTER_PORT: {os.environ['MASTER_PORT']}")
print(f"WORLD_SIZE: {os.environ['WORLD_SIZE']}")
print(f"RANK: {os.environ['RANK']}")
# 初始化分布式环境
dist.init_process_group(backend="gloo")
print(f"Initialized process group, rank: {dist.get_rank()}")
# PyTorch训练代码...
restartPolicy: OnFailure
- replicas: 2
name: worker
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.4.1-cuda11.8-cudnn9-runtime
imagePullPolicy: IfNotPresent
command:
- python
- -c
- |
import os
import torch
import torch.distributed as dist
print(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
print(f"MASTER_PORT: {os.environ['MASTER_PORT']}")
print(f"WORLD_SIZE: {os.environ['WORLD_SIZE']}")
print(f"RANK: {os.environ['RANK']}")
# 初始化分布式环境
dist.init_process_group(backend="gloo")
print(f"Initialized process group, rank: {dist.get_rank()}")
# PyTorch训练代码...
restartPolicy: OnFailure
验证环境变量:
# 查看master的环境变量
kubectl exec pytorch-job-master-0 -- env | grep -E "MASTER|WORLD|RANK"
# 输出:
# MASTER_ADDR=pytorch-job-master-0.pytorch-job
# MASTER_PORT=23456
# WORLD_SIZE=3
# RANK=0
# 查看worker-0的环境变量
kubectl exec pytorch-job-worker-0 -- env | grep -E "MASTER|WORLD|RANK"
# 输出:
# MASTER_ADDR=pytorch-job-master-0.pytorch-job
# MASTER_PORT=23456
# WORLD_SIZE=3
# RANK=1
# 查看worker-1的环境变量
kubectl exec pytorch-job-worker-1 -- env | grep -E "MASTER|WORLD|RANK"
# 输出:
# MASTER_ADDR=pytorch-job-master-0.pytorch-job
# MASTER_PORT=23456
# WORLD_SIZE=3
# RANK=2
注意事项
- 插件会自动启用
SVC Plugin,确保主节点地址可以被解析 WORLD_SIZE包含master和所有worker的总数RANK分配:master为0,worker从1开始递增- 确保防火墙规则允许指定端口的通信
- 如果使用
NCCL后端,需要额外配置GPU直通和网络设置
TensorFlow
TensorFlow Plugin为TensorFlow分布式训练任务自动配置TF_CONFIG环境变量,简化了分布式训练的配置过程。
工作原理
- 集群拓扑识别:识别
Job中的不同角色(ps、worker、chief、evaluator) - TF_CONFIG生成:为每个
Pod生成包含集群信息和任务信息的TF_CONFIGJSON配置 - 环境变量注入:将
TF_CONFIG注入到所有容器中
TF_CONFIG结构
{
"cluster": {
"ps": ["ps-0.job-name:2222", "ps-1.job-name:2222"],
"worker": ["worker-0.job-name:2222", "worker-1.job-name:2222"],
"chief": ["chief-0.job-name:2222"]
},
"task": {
"type": "worker", // 当前Pod的角色
"index": 0 // 当前Pod在该角色中的索引
}
}
配置参数
| 参数名 | 类型 | 默认值 | 必需 | 说明 | 示例 |
|---|---|---|---|---|---|
ps | string | ps | 否 | Parameter Server任务名称 | --ps=ps-role |
worker | string | worker | 否 | Worker任务名称 | --worker=worker-role |
chief | string | chief | 否 | Chief任务名称 | --chief=chief-role |
evaluator | string | evaluator | 否 | Evaluator任务名称 | --evaluator=evaluator-role |
port | integer | 2222 | 否 | TensorFlow通信端口 | --port=3333 |
使用场景
- TensorFlow分布式训练:自动配置
PS-Worker架构 - 多角色训练任务:支持
Chief、Evaluator等多种角色 - 参数服务器架构:简化
Parameter Server模式的配置
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tensorflow-training
spec:
queue: default
minAvailable: 4
schedulerName: volcano
plugins:
tensorflow: ["--ps=ps", "--worker=worker", "--port=2222"]
svc: []
tasks:
- replicas: 2
name: ps
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:2.4.0
command:
- python
- -c
- |
import os
import json
tf_config = json.loads(os.environ['TF_CONFIG'])
print(f"TF_CONFIG: {tf_config}")
# TensorFlow训练代码...
resources:
limits:
cpu: 4
memory: 8Gi
restartPolicy: OnFailure
- replicas: 3
name: worker
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:2.4.0
command:
- python
- -c
- |
import os
import json
tf_config = json.loads(os.environ['TF_CONFIG'])
print(f"TF_CONFIG: {tf_config}")
# TensorFlow训练代码...
resources:
limits:
cpu: 2
memory: 4Gi
restartPolicy: OnFailure
验证TF_CONFIG:
# 查看worker-0的TF_CONFIG
kubectl exec tensorflow-training-worker-0 -- env | grep TF_CONFIG
# 输出(格式化后):
# TF_CONFIG={
# "cluster": {
# "ps": [
# "tensorflow-training-ps-0.tensorflow-training:2222",
# "tensorflow-training-ps-1.tensorflow-training:2222"
# ],
# "worker": [
# "tensorflow-training-worker-0.tensorflow-training:2222",
# "tensorflow-training-worker-1.tensorflow-training:2222",
# "tensorflow-training-worker-2.tensorflow-training:2222"
# ]
# },
# "task": {
# "type": "worker",
# "index": 0
# }
# }
注意事项
- 对于单
Pod的TensorFlow Job(replicas=1),不会生成TF_CONFIG - 插件会自动启用
SVC Plugin功能,确保域名解析正常工作 - 任务名称必须与参数中配置的名称匹配,否则不会包含在
TF_CONFIG中 - 确保容器中的
TensorFlow版本支持通过TF_CONFIG进行分布式配置
MPI
MPI Plugin为MPI (Message Passing Interface)并行计算任务提供必要的配置,包括主机列表、SSH配置和端口开放。
工作原理
- 角色识别:识别
master和worker任务 - 主机列表生成:为
masterPod生成包含所有worker主机域名的MPI_HOST环境变量 - 端口开放:为所有容器开放
SSH端口 - 依赖插件启动:自动启用
SSH和SVC插件
配置参数
| 参数名 | 类型 | 默认值 | 必需 | 说明 | 示例 |
|---|---|---|---|---|---|
master | string | master | 否 | Master任务名称 | --master=mpimaster |
worker | string | worker | 否 | Worker任务名称 | --worker=mpiworker |
port | integer | 22 | 否 | SSH通信端口 | --port=5000 |
使用场景
- MPI并行计算:科学计算、数值模拟
- 高性能计算:大规模并行任务
- 分布式模拟:物理模拟、流体力学计算
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: lm-mpi-job
spec:
queue: default
minAvailable: 1
schedulerName: volcano
plugins:
mpi: ["--master=mpimaster", "--worker=mpiworker", "--port=22"]
tasks:
- replicas: 1
name: mpimaster
policies:
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- name: mpimaster
image: volcanosh/example-mpi:0.0.3
command:
- /bin/sh
- -c
- |
mkdir -p /var/run/sshd; /usr/sbin/sshd;
# MPI_HOST环境变量包含所有worker的域名
echo "MPI_HOST: ${MPI_HOST}"
# 使用mpiexec在所有worker上运行程序
mpiexec --allow-run-as-root --host ${MPI_HOST} -np 2 mpi_hello_world;
workingDir: /home
restartPolicy: OnFailure
- replicas: 2
name: mpiworker
template:
spec:
containers:
- name: mpiworker
image: volcanosh/example-mpi:0.0.3
command:
- /bin/sh
- -c
- |
mkdir -p /var/run/sshd; /usr/sbin/sshd -D;
workingDir: /home
restartPolicy: OnFailure
验证MPI配置:
# 查看master的MPI_HOST环境变量
kubectl exec lm-mpi-job-mpimaster-0 -- env | grep MPI_HOST
# 输出:
# MPI_HOST=lm-mpi-job-mpiworker-0.lm-mpi-job,lm-mpi-job-mpiworker-1.lm-mpi-job
# 在master中测试SSH连接
kubectl exec lm-mpi-job-mpimaster-0 -- ssh lm-mpi-job-mpiworker-0 hostname
# 输出:
# lm-mpi-job-mpiworker-0
# 查看MPI任务执行结果
kubectl logs lm-mpi-job-mpimaster-0
注意事项
- 插件会自动启用
SSH Plugin和SVC Plugin,无需手动配置 - 确保容器镜像中安装了
MPI运行时环境和sshd服务 MPI_HOST只在masterPod中注入,包含所有worker的域名列表- 如果使用
gang插件,确保minAvailable等于worker的replicas数量 SSH端口必须与mpiexec使用的端口一致
Ray
Ray Plugin为Ray分布式计算框架提供自动化配置,包括Head节点和Worker节点的命令配置、端口开放和服务创建。
关于使用
Volcano Job中运行Ray节点的详细调研测试请参考:通过Volcano Job插件部署使用Ray Cluster
工作原理
- 节点角色配置:
- 为
Head节点配置ray start --head命令 - 为
Worker节点配置ray start命令,连接到Head节点
- 为
- 端口开放:为
Head节点开放三个关键端口:GCS端口(默认6379):全局控制服务Dashboard端口(默认8265):Web界面Client Server端口(默认10001):客户端连接
- Service创建:创建名为
{JobName}-head-svc的Service,映射到Head节点的端口 - 依赖插件启动:自动启用
SVC Plugin
配置参数
| 参数名 | 类型 | 默认值 | 必需 | 说明 | 示例 |
|---|---|---|---|---|---|
head | string | head | 否 | Head任务名称 | --head=ray-head |
worker | string | worker | 否 | Worker任务名称 | --worker=ray-worker |
headContainer | string | head | 否 | Head容器名称 | --headContainer=ray-head-container |
workerContainer | string | worker | 否 | Worker容器名称 | --workerContainer=ray-worker-container |
port | integer | 6379 | 否 | GCS端口 | --port=6380 |
dashboardPort | integer | 8265 | 否 | Dashboard端口 | --dashboardPort=8266 |
clientServerPort | integer | 10001 | 否 | Client Server端口 | --clientServerPort=10002 |
使用场景
- Ray分布式计算:数据处理、机器学习推理
- 强化学习:
RLlib分布式训练 - 分布式Python应用:大规模并行
Python任务
使用示例
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: ray-cluster-job
spec:
queue: default
minAvailable: 3
schedulerName: volcano
plugins:
ray: [] # 使用默认配置
svc: []
policies:
- event: PodEvicted
action: RestartJob
queue: default
tasks:
- replicas: 1
name: head
template:
spec:
containers:
- name: head
image: rayproject/ray:latest-py311-cpu
resources:
limits:
cpu: 2
memory: 4Gi
restartPolicy: OnFailure
- replicas: 2
name: worker
template:
spec:
containers:
- name: worker
image: rayproject/ray:latest-py311-cpu
resources:
limits:
cpu: 2
memory: 4Gi
restartPolicy: OnFailure
验证Ray集群:
# 查看创建的Service
kubectl get service
# 输出:
# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
# ray-cluster-job ClusterIP None <none> <none> 5s
# ray-cluster-job-head-svc ClusterIP 10.96.184.65 <none> 6379/TCP,8265/TCP,10001/TCP 5s
# 查看Pod状态
kubectl get pod
# 输出:
# NAME READY STATUS RESTARTS AGE
# ray-cluster-job-head-0 1/1 Running 0 60s
# ray-cluster-job-worker-0 1/1 Running 0 60s
# ray-cluster-job-worker-1 1/1 Running 0 60s
# 访问Ray Dashboard(需要端口转发)
kubectl port-forward ray-cluster-job-head-0 8265:8265
# 在浏览器中访问 http://localhost:8265
# 提交Ray任务
kubectl exec ray-cluster-job-head-0 -- python -c "
import ray
ray.init()
@ray.remote
def hello():
return 'Hello from Ray!'
print(ray.get(hello.remote()))
"
注意事项
- 插件会自动启用
SVC Plugin,确保节点间通信正常 Ray Plugin基于Ray CLI,需要使用官方或兼容的Ray容器镜像- 通过
Service可以从集群外部访问Ray Dashboard和提交任务 - 插件自动配置
ray start命令,无需在容器command中手动指定 - 确保
Head节点有足够的资源,因为它承担集群管理职责