Skip to main content
注意

目前Volcano最新版本1.13.0PyTorch Job Plugin经调研发现存在严重BUG,需要等修复之后再进一步完善文档。

Volcano PyTorch Plugin是什么

Volcano PyTorch PluginVolcano批量调度系统中的一个插件,专门用于简化在Kubernetes集群上运行PyTorch分布式训练任务的配置和管理。

Volcano简介

Volcano是一个基于Kubernetes构建的批量调度系统,专为高性能计算场景(如AI训练、大数据分析、科学计算等)设计。它与SparkTensorFlowPyTorchRayMPI等多种计算框架深度集成,提供了强大的批量任务调度能力。

PyTorch Plugin定位

PyTorch PluginVolcano针对PyTorch分布式训练场景提供的插件,它能够:

  • 自动配置分布式训练环境变量:自动注入MASTER_ADDRMASTER_PORTWORLD_SIZERANKPyTorch分布式训练必需的环境变量
  • 简化任务定义:用户无需手动配置复杂的网络和环境设置,只需定义基本的任务结构
  • 确保任务正常运行:自动处理端口开放、服务发现等底层细节

通俗理解

如果把PyTorch分布式训练比作一个交响乐团演出:

  • 训练任务就像整场音乐会(需要多个乐手协同完成)
  • Master节点就像指挥家(协调整个训练过程)
  • Worker节点就像乐手(执行具体的训练计算)
  • Volcano PyTorch Plugin就像音乐会的舞台监督(自动安排座位、配置设备、建立通信渠道),让乐手们只需专注演奏,无需关心座位编号、通信方式等细节

PyTorch Plugin解决什么问题

Kubernetes上运行PyTorch分布式训练任务时,开发者通常面临以下挑战,PyTorch Plugin针对性地解决了这些问题。

主要解决的问题

问题领域传统困境PyTorch Plugin的解决方案价值
环境变量配置需要手动为每个Pod配置MASTER_ADDRRANK等变量自动注入所有必需的环境变量减少配置错误,降低80%配置工作量
端口管理需要手动配置容器端口和服务端口自动开放PyTorch通信端口避免端口冲突和配置遗漏
服务发现需要手动配置Master节点地址自动生成Master节点的完整域名简化网络配置,提升可靠性
任务编排YAML文件复杂,易出错统一的任务定义格式提升开发效率,降低维护成本
扩展性修改节点数量需要大量配置变更自动计算WORLD_SIZE和分配RANK轻松实现任务扩缩容

Plugin工作原理

PyTorch PluginPod创建时自动执行以下操作:

  1. 创建Volcano Job
  2. PyTorch Plugin启动
  3. 识别MasterWorker任务
  4. 生成Master地址
  5. 计算WORLD_SIZE
  6. 分配RANK给每个Pod
  7. 开放通信端口
  8. 注入环境变量
  9. Pod启动训练

核心功能详解

1. 自动注入环境变量

PyTorch分布式训练依赖以下关键环境变量:

环境变量含义Plugin处理方式
MASTER_ADDRMaster节点的网络地址自动生成为{hostname}.{subdomain}格式
MASTER_PORTMaster节点的通信端口默认23456,可配置
WORLD_SIZE总的进程数(所有节点)自动计算Master + Worker的副本数
RANK当前进程的全局排名Master0Worker依次递增

2. 自动开放端口

Plugin会为所有容器自动添加端口配置:

ports:
- name: pytorchjob-port
containerPort: 23456 # 默认端口,可配置

3. 强制启用服务插件

Plugin会自动启用svc插件,为每个任务创建Headless Service,实现Pod之间的稳定网络通信。

如何安装和配置

前置条件

  • Kubernetes集群(版本1.19+
  • kubectl命令行工具
  • 集群管理员权限(用于安装CRD和控制器)

安装Volcano

# 添加Volcano Helm仓库
helm repo add volcano-sh https://volcano-sh.github.io/helm-charts
helm repo update

# 安装Volcano(版本可自行指定)
helm install volcano volcano-sh/volcano \
-n volcano-system \
--create-namespace \
--version 1.13.0

# 等待Volcano组件就绪,预计需要几分钟时间

# 验证安装
kubectl get pods -n volcano-system

预期输出示例:

NAME                                   READY   STATUS    RESTARTS      AGE
volcano-admission-b84bbd89-dgv2p 1/1 Running 0 10s
volcano-controllers-7b97b6455c-rghzf 1/1 Running 0 10s
volcano-scheduler-65d4d4645b-p9llx 1/1 Running 0 10s

配置参数说明

PyTorch Plugin支持以下配置参数:

参数名类型默认值必填说明示例
--masterstringmasterMaster任务的名称--master=master
--workerstringworkerWorker任务的名称--worker=worker
--portint23456通信端口号--port=23456

PyTorch分布式训练示例

以下示例中使用的镜像为pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime,可根据需要替换为其他版本。

脚本示例

我们使用一个简单的PyTorch分布式训练脚本来演示。该脚本实现了一个简单的线性回归模型训练:

pytorch-demo.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler

# ====================== 1. 定义简单的数据集和模型 ======================
class SimpleDataset(Dataset):
"""简单的数据集:输入是随机数,标签是输入的2倍(简单回归任务)"""
def __init__(self, size=100):
self.data = torch.randn(size, 1) # 输入:(size, 1)
self.labels = self.data * 2 + 0.1 * torch.randn_like(self.data) # 标签:2x + 噪声

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
return self.data[idx], self.labels[idx]

class SimpleModel(nn.Module):
"""简单的线性模型:y = wx + b"""
def __init__(self):
super(SimpleModel, self).__init__()
self.linear = nn.Linear(1, 1)

def forward(self, x):
return self.linear(x)

# ====================== 2. 初始化分布式进程组 ======================
def init_distributed():
"""
初始化分布式环境(CPU + gloo后端)

Volcano PyTorch Plugin 会自动设置以下环境变量:
- RANK: 全局进程排名
- WORLD_SIZE: 总进程数
- MASTER_ADDR: 主节点地址
- MASTER_PORT: 主节点端口
"""
# 从环境变量中获取分布式训练参数
rank = int(os.environ.get("RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))

print(f"[初始化] RANK={rank}, WORLD_SIZE={world_size}")
print(f"[初始化] MASTER_ADDR={os.environ.get('MASTER_ADDR', 'N/A')}")
print(f"[初始化] MASTER_PORT={os.environ.get('MASTER_PORT', 'N/A')}")

# 初始化分布式进程组
# 使用 gloo 后端(CPU训练),自动从环境变量读取配置
dist.init_process_group(backend="gloo")

print(f"[初始化完成] 成功初始化分布式进程组")

return rank, world_size

# ====================== 3. 核心训练函数 ======================
def train():
print("=" * 60)
print("开始 PyTorch 分布式训练(Volcano 版本)")
print("=" * 60)

# 初始化分布式环境
rank, world_size = init_distributed()

# 设置当前进程的设备(CPU)
device = torch.device("cpu")

# 1. 构建数据集和分布式采样器(核心:将数据分发给不同进程)
dataset = SimpleDataset(size=100)
# DistributedSampler:保证不同进程读取不同的数据分片
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(
dataset,
batch_size=10,
sampler=sampler, # 必须用分布式采样器,替代shuffle
num_workers=0 # 简化demo,关闭多线程
)

# 2. 构建模型并包装为分布式数据并行(DDP)
model = SimpleModel()
# DDP:自动处理参数同步、梯度聚合
model = nn.parallel.DistributedDataParallel(model, device_ids=None) # CPU训练,device_ids设为None

# 3. 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 4. 训练循环
epochs = 5
print(f"\n[Rank {rank}] 开始训练,共 {epochs} 个 epoch")
print("-" * 60)

for epoch in range(epochs):
# 每个epoch开始前,更新采样器的epoch(保证分布式数据打乱的一致性)
sampler.set_epoch(epoch)
model.train()
total_loss = 0.0

for batch_idx, (data, labels) in enumerate(dataloader):
# 将数据移动到设备
data, labels = data.to(device), labels.to(device)

# 前向传播
outputs = model(data)
loss = criterion(outputs, labels)

# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

# 只在主进程(rank=0)打印训练信息
if rank == 0:
avg_loss = total_loss / len(dataloader)
print(f"Epoch [{epoch+1}/{epochs}], Average Loss: {avg_loss:.4f}")

print("-" * 60)
print(f"[Rank {rank}] 训练完成!")

# 在主进程打印最终的模型参数
if rank == 0:
print("\n" + "=" * 60)
print("最终模型参数:")
for name, param in model.named_parameters():
print(f" {name}: {param.data}")
print("=" * 60)

# 清理分布式进程组
dist.destroy_process_group()
print(f"[Rank {rank}] 进程组已销毁")

if __name__ == "__main__":
train()

代码关键点:

  1. 分布式初始化:使用dist.init_process_group(backend="gloo")初始化分布式环境,CPU训练使用gloo后端
  2. 数据并行:使用DistributedSampler确保不同进程读取不同的数据分片
  3. 模型并行:使用DistributedDataParallel包装模型,自动处理梯度聚合
  4. 环境变量Volcano PyTorch Plugin会自动设置所有必要的环境变量(RANKWORLD_SIZEMASTER_ADDR等)

部署步骤

步骤一:创建 ConfigMap

本示例使用ConfigMap方式管理训练代码,适合快速开发和测试场景,将训练脚本存储在ConfigMap中,然后挂载到训练Pod中。

优点:

  • ✅ 快速迭代:修改脚本只需更新ConfigMap
  • ✅ 无需构建镜像:避免镜像构建和推送的开销
  • ✅ 适合调试:快速验证训练逻辑

缺点:

  • ❌ 文件大小限制:ConfigMap最大1MB
  • ❌ 不适合生产:缺少版本控制和回滚能力

将训练脚本保存为pytorch-demo.py文件,然后创建ConfigMap

kubectl create configmap pytorch-demo-script --from-file=pytorch-demo.py

验证ConfigMap

kubectl get configmap pytorch-demo-script
kubectl describe configmap pytorch-demo-script

步骤二:创建 Volcano Job

创建pytorch-job-with-configmap.yaml

pytorch-job-with-configmap.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: pytorch-job-with-configmap
spec:
# 最小可用副本数
minAvailable: 3

# 使用Volcano调度器
schedulerName: volcano

# 配置插件
plugins:
# 启用PyTorch Plugin
pytorch: ["--master=master", "--worker=worker", "--port=23456"]

# 指定队列
queue: default

# 定义任务
tasks:
# Master任务定义
- replicas: 1
name: master
policies:
# 当Master任务完成时,整个Job标记为完成
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- name: master
image: pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime
imagePullPolicy: IfNotPresent
command:
- python
- /workspace/pytorch-demo.py
env:
- name: PYTHONUNBUFFERED
value: "1"
volumeMounts:
- name: training-script
mountPath: /workspace
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "1"
memory: "2Gi"
volumes:
- name: training-script
configMap:
name: pytorch-demo-script
restartPolicy: OnFailure

# Worker任务定义
- replicas: 2
name: worker
template:
spec:
containers:
- name: worker
image: pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime
imagePullPolicy: IfNotPresent
command:
- python
- /workspace/pytorch-demo.py
env:
- name: PYTHONUNBUFFERED
value: "1"
volumeMounts:
- name: training-script
mountPath: /workspace
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "1"
memory: "2Gi"
volumes:
- name: training-script
configMap:
name: pytorch-demo-script
restartPolicy: OnFailure

步骤三:提交训练任务

# 应用Job定义
kubectl apply -f pytorch-job-with-configmap.yaml

# 查看Job状态
kubectl get vcjob pytorch-job-with-configmap

# 查看Pod状态
kubectl get pods -l volcano.sh/job-name=pytorch-job-with-configmap

期望输出:

NAME                                   READY   STATUS    RESTARTS   AGE
pytorch-job-with-configmap-master-0 1/1 Running 0 30s
pytorch-job-with-configmap-worker-0 1/1 Running 0 30s
pytorch-job-with-configmap-worker-1 1/1 Running 0 30s

步骤四:查看训练日志

# 查看Master节点日志
kubectl logs pytorch-job-with-configmap-master-0 -f

# 查看Worker节点日志
kubectl logs pytorch-job-with-configmap-worker-0
kubectl logs pytorch-job-with-configmap-worker-1

预期输出:

============================================================
开始 PyTorch 分布式训练(Volcano 版本)
============================================================
[初始化] RANK=0, WORLD_SIZE=3
[初始化] MASTER_ADDR=master-0.pytorch-job-with-configmap
[初始化] MASTER_PORT=23456
[初始化完成] 成功初始化分布式进程组

[Rank 0] 开始训练,共 5 个 epoch
------------------------------------------------------------
Epoch [1/5], Average Loss: 0.0234
Epoch [2/5], Average Loss: 0.0189
Epoch [3/5], Average Loss: 0.0156
Epoch [4/5], Average Loss: 0.0128
Epoch [5/5], Average Loss: 0.0103
------------------------------------------------------------
[Rank 0] 训练完成!
分布式训练验证
  • 查看WORLD_SIZE确认总进程数:1个Master + 2个Worker = 3进程
  • 查看RANK确认进程编号:0(Master), 1(Worker-0), 2(Worker-1)
  • 查看MASTER_ADDR确认主节点地址自动生成

步骤五:清理资源

# 删除Job(会自动清理所有相关Pod和Service)
kubectl delete -f pytorch-job-with-configmap.yaml

# 删除ConfigMap
kubectl delete configmap pytorch-demo-script