目前Volcano最新版本1.13.0的PyTorch Job Plugin经调研发现存在严重BUG,需要等修复之后再进一步完善文档。
Volcano PyTorch Plugin是什么
Volcano PyTorch Plugin是Volcano批量调度系统中的一个插件,专门用于简化在Kubernetes集群上运行PyTorch分布式训练任务的配置和管理。
Volcano简介
Volcano是一个基于Kubernetes构建的批量调度系统,专为高性能计算场景(如AI训练、大数据分析、科学计算等)设计。它与Spark、TensorFlow、PyTorch、Ray、MPI等多种计算框架深度集成,提供了强大的批量任务调度能力。
PyTorch Plugin定位
PyTorch Plugin是Volcano针对PyTorch分布式训练场景提供的插件,它能够:
- 自动配置分布式训练环境变量:自动注入
MASTER_ADDR、MASTER_PORT、WORLD_SIZE、RANK等PyTorch分布式训练必需的环境变量 - 简化任务定义:用户无需手动配置复杂的网络和环境设置,只需定义基本的任务结构
- 确保任务正常运行:自动处理端口开放、服务发现等底层细节
通俗理解
如果把PyTorch分布式训练比作一个交响乐团演出:
- 训练任务就像整场音乐会(需要多个乐手协同完成)
- Master节点就像指挥家(协调整个训练过程)
- Worker节点就像乐手(执行具体的训练计算)
- Volcano PyTorch Plugin就像音乐会的舞台监督(自动安排座位、配置设备、建立通信渠道),让乐手们只需专注演奏,无需关心座位编号、通信方式等细节
PyTorch Plugin解决什么问题
在Kubernetes上运行PyTorch分布式训练任务时,开发者通常面临以下挑战,PyTorch Plugin针对性地解决了这些问题。
主要解决的问题
| 问题领域 | 传统困境 | PyTorch Plugin的解决方案 | 价值 |
|---|---|---|---|
| 环境变量配置 | 需要手动为每个Pod配置MASTER_ADDR、RANK等变量 | 自动注入所有必需的环境变量 | 减少配置错误,降低80%配置工作量 |
| 端口管理 | 需要手动配置容器端口和服务端口 | 自动开放PyTorch通信端口 | 避免端口冲突和配置遗漏 |
| 服务发现 | 需要手动配置Master节点地址 | 自动生成Master节点的完整域名 | 简化网络配置,提升可靠性 |
| 任务编排 | YAML文件复杂,易出错 | 统一的任务定义格式 | 提升开发效率,降低维护成本 |
| 扩展性 | 修改节点数量需要大量配置变更 | 自动计算WORLD_SIZE和分配RANK | 轻松实现任务扩缩容 |
Plugin工作原理
PyTorch Plugin在Pod创建时自动执行以下操作:
- 创建
Volcano Job PyTorch Plugin启动- 识别
Master和Worker任务 - 生成
Master地址 - 计算
WORLD_SIZE - 分配
RANK给每个Pod - 开放通信端口
- 注入环境变量
Pod启动训练
核心功能详解
1. 自动注入环境变量
PyTorch分布式训练依赖以下关键环境变量:
| 环境变量 | 含义 | Plugin处理方式 |
|---|---|---|
MASTER_ADDR | Master节点的网络地址 | 自动生成为{hostname}.{subdomain}格式 |
MASTER_PORT | Master节点的通信端口 | 默认23456,可配置 |
WORLD_SIZE | 总的进程数(所有节点) | 自动计算Master + Worker的副本数 |
RANK | 当前进程的全局排名 | Master为0,Worker依次递增 |
2. 自动开放端口
Plugin会为所有容器自动添加端口配置:
ports:
- name: pytorchjob-port
containerPort: 23456 # 默认端口,可配置
3. 强制启用服务插件
Plugin会自动启用svc插件,为每个任务创建Headless Service,实现Pod之间的稳定网络通信。
如何安装和配置
前置条件
Kubernetes集群(版本1.19+)kubectl命令行工具- 集群管理员权限(用于安装
CRD和控制器)
安装Volcano
# 添加Volcano Helm仓库
helm repo add volcano-sh https://volcano-sh.github.io/helm-charts
helm repo update
# 安装Volcano(版本可自行指定)
helm install volcano volcano-sh/volcano \
-n volcano-system \
--create-namespace \
--version 1.13.0
# 等待Volcano组件就绪,预计需要几分钟时间
# 验证安装
kubectl get pods -n volcano-system
预期输出示例:
NAME READY STATUS RESTARTS AGE
volcano-admission-b84bbd89-dgv2p 1/1 Running 0 10s
volcano-controllers-7b97b6455c-rghzf 1/1 Running 0 10s
volcano-scheduler-65d4d4645b-p9llx 1/1 Running 0 10s
配置参数说明
PyTorch Plugin支持以下配置参数:
| 参数名 | 类型 | 默认值 | 必填 | 说明 | 示例 |
|---|---|---|---|---|---|
--master | string | master | 否 | Master任务的名称 | --master=master |
--worker | string | worker | 否 | Worker任务的名称 | --worker=worker |
--port | int | 23456 | 否 | 通信端口号 | --port=23456 |
PyTorch分布式训练示例
以下示例中使用的镜像为
pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime,可根据需要替换为其他版本。
脚本示例
我们使用一个简单的PyTorch分布式训练脚本来演示。该脚本实现了一个简单的线性回归模型训练:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
# ====================== 1. 定义简单的数据集和模型 ======================
class SimpleDataset(Dataset):
"""简单的数据集:输入是随机数,标签是输入的2倍(简单回归任务)"""
def __init__(self, size=100):
self.data = torch.randn(size, 1) # 输入:(size, 1)
self.labels = self.data * 2 + 0.1 * torch.randn_like(self.data) # 标签:2x + 噪声
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
class SimpleModel(nn.Module):
"""简单的线性模型:y = wx + b"""
def __init__(self):
super(SimpleModel, self).__init__()
self.linear = nn.Linear(1, 1)
def forward(self, x):
return self.linear(x)
# ====================== 2. 初始化分布式进程组 ======================
def init_distributed():
"""
初始化分布式环境(CPU + gloo后端)
Volcano PyTorch Plugin 会自动设置以下环境变量:
- RANK: 全局进程排名
- WORLD_SIZE: 总进程数
- MASTER_ADDR: 主节点地址
- MASTER_PORT: 主节点端口
"""
# 从环境变量中获取分布式训练参数
rank = int(os.environ.get("RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
print(f"[初始化] RANK={rank}, WORLD_SIZE={world_size}")
print(f"[初始化] MASTER_ADDR={os.environ.get('MASTER_ADDR', 'N/A')}")
print(f"[初始化] MASTER_PORT={os.environ.get('MASTER_PORT', 'N/A')}")
# 初始化分布式进程组
# 使用 gloo 后端(CPU训练),自动从环境变量读取配置
dist.init_process_group(backend="gloo")
print(f"[初始化完成] 成功初始化分布式进程组")
return rank, world_size
# ====================== 3. 核心训练函数 ======================
def train():
print("=" * 60)
print("开始 PyTorch 分布式训练(Volcano 版本)")
print("=" * 60)
# 初始化分布式环境
rank, world_size = init_distributed()
# 设置当前进程的设备(CPU)
device = torch.device("cpu")
# 1. 构建数据集和分布式采样器(核心:将数据分发给不同进程)
dataset = SimpleDataset(size=100)
# DistributedSampler:保证不同进程读取不同的数据分片
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(
dataset,
batch_size=10,
sampler=sampler, # 必须用分布式采样器,替代shuffle
num_workers=0 # 简化demo,关闭多线程
)
# 2. 构建模型并包装为分布式数据并行(DDP)
model = SimpleModel()
# DDP:自动处理参数同步、梯度聚合
model = nn.parallel.DistributedDataParallel(model, device_ids=None) # CPU训练,device_ids设为None
# 3. 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 4. 训练循环
epochs = 5
print(f"\n[Rank {rank}] 开始训练,共 {epochs} 个 epoch")
print("-" * 60)
for epoch in range(epochs):
# 每个epoch开始前,更新采样器的epoch(保证分布式数据打乱的一致性)
sampler.set_epoch(epoch)
model.train()
total_loss = 0.0
for batch_idx, (data, labels) in enumerate(dataloader):
# 将数据移动到设备
data, labels = data.to(device), labels.to(device)
# 前向传播
outputs = model(data)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
# 只在主进程(rank=0)打印训练信息
if rank == 0:
avg_loss = total_loss / len(dataloader)
print(f"Epoch [{epoch+1}/{epochs}], Average Loss: {avg_loss:.4f}")
print("-" * 60)
print(f"[Rank {rank}] 训练完成!")
# 在主进程打印最终的模型参数
if rank == 0:
print("\n" + "=" * 60)
print("最终模型参数:")
for name, param in model.named_parameters():
print(f" {name}: {param.data}")
print("=" * 60)
# 清理分布式进程组
dist.destroy_process_group()
print(f"[Rank {rank}] 进程组已销毁")
if __name__ == "__main__":
train()
代码关键点:
- 分布式初始化:使用
dist.init_process_group(backend="gloo")初始化分布式环境,CPU训练使用gloo后端 - 数据并行:使用
DistributedSampler确保不同进程读取不同的数据分片 - 模型并行:使用
DistributedDataParallel包装模型,自动处理梯度聚合 - 环境变量:
Volcano PyTorch Plugin会自动设置所有必要的环境变量(RANK、WORLD_SIZE、MASTER_ADDR等)
部署步骤
步骤一:创建 ConfigMap
本示例使用ConfigMap方式管理训练代码,适合快速开发和测试场景,将训练脚本存储在ConfigMap中,然后挂载到训练Pod中。
优点:
- ✅ 快速迭代:修改脚本只需更新
ConfigMap - ✅ 无需构建镜像:避免镜像构建和推送的开销
- ✅ 适合调试:快速验证训练逻辑
缺点:
- ❌ 文件大小限制:
ConfigMap最大1MB - ❌ 不适合生产:缺少版本控制和回滚能力
将训练脚本保存为pytorch-demo.py文件,然后创建ConfigMap:
kubectl create configmap pytorch-demo-script --from-file=pytorch-demo.py
验证ConfigMap:
kubectl get configmap pytorch-demo-script
kubectl describe configmap pytorch-demo-script
步骤二:创建 Volcano Job
创建pytorch-job-with-configmap.yaml:
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: pytorch-job-with-configmap
spec:
# 最小可用副本数
minAvailable: 3
# 使用Volcano调度器
schedulerName: volcano
# 配置插件
plugins:
# 启用PyTorch Plugin
pytorch: ["--master=master", "--worker=worker", "--port=23456"]
# 指定队列
queue: default
# 定义任务
tasks:
# Master任务定义
- replicas: 1
name: master
policies:
# 当Master任务完成时,整个Job标记为完成
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- name: master
image: pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime
imagePullPolicy: IfNotPresent
command:
- python
- /workspace/pytorch-demo.py
env:
- name: PYTHONUNBUFFERED
value: "1"
volumeMounts:
- name: training-script
mountPath: /workspace
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "1"
memory: "2Gi"
volumes:
- name: training-script
configMap:
name: pytorch-demo-script
restartPolicy: OnFailure
# Worker任务定义
- replicas: 2
name: worker
template:
spec:
containers:
- name: worker
image: pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime
imagePullPolicy: IfNotPresent
command:
- python
- /workspace/pytorch-demo.py
env:
- name: PYTHONUNBUFFERED
value: "1"
volumeMounts:
- name: training-script
mountPath: /workspace
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "1"
memory: "2Gi"
volumes:
- name: training-script
configMap:
name: pytorch-demo-script
restartPolicy: OnFailure
步骤三:提交训练任务
# 应用Job定义
kubectl apply -f pytorch-job-with-configmap.yaml
# 查看Job状态
kubectl get vcjob pytorch-job-with-configmap
# 查看Pod状态
kubectl get pods -l volcano.sh/job-name=pytorch-job-with-configmap
期望输出:
NAME READY STATUS RESTARTS AGE
pytorch-job-with-configmap-master-0 1/1 Running 0 30s
pytorch-job-with-configmap-worker-0 1/1 Running 0 30s
pytorch-job-with-configmap-worker-1 1/1 Running 0 30s
步骤四:查看训练日志
# 查看Master节点日志
kubectl logs pytorch-job-with-configmap-master-0 -f
# 查看Worker节点日志
kubectl logs pytorch-job-with-configmap-worker-0
kubectl logs pytorch-job-with-configmap-worker-1
预期输出:
============================================================
开始 PyTorch 分布式训练(Volcano 版本)
============================================================
[初始化] RANK=0, WORLD_SIZE=3
[初始化] MASTER_ADDR=master-0.pytorch-job-with-configmap
[初始化] MASTER_PORT=23456
[初始化完成] 成功初始化分布式进程组
[Rank 0] 开始训练,共 5 个 epoch
------------------------------------------------------------
Epoch [1/5], Average Loss: 0.0234
Epoch [2/5], Average Loss: 0.0189
Epoch [3/5], Average Loss: 0.0156
Epoch [4/5], Average Loss: 0.0128
Epoch [5/5], Average Loss: 0.0103
------------------------------------------------------------
[Rank 0] 训练完成!
- 查看
WORLD_SIZE确认总进程数:1个Master + 2个Worker = 3进程 - 查看
RANK确认进程编号:0(Master), 1(Worker-0), 2(Worker-1) - 查看
MASTER_ADDR确认主节点地址自动生成
步骤五:清理资源
# 删除Job(会自动清理所有相关Pod和Service)
kubectl delete -f pytorch-job-with-configmap.yaml
# 删除ConfigMap
kubectl delete configmap pytorch-demo-script