Skip to main content

1. HPC并行计算基础特性

1.1 什么是HPC并行计算

HPCHigh Performance Computing,高性能计算)是指利用多个计算资源(如CPUGPU)协同工作,以解决复杂计算问题的技术。在AI模型训练场景中,HPC并行计算是实现大规模模型训练的核心技术基础。

1.2 并行计算的核心特性

特性描述在AI训练中的应用
任务分解将大任务拆分为多个可并行执行的子任务将训练数据分片到多个GPU
并行执行多个计算单元同时处理不同子任务GPU同时计算梯度
通信同步计算单元间交换数据和同步状态梯度聚合、参数同步
结果合并将各计算单元的结果汇总模型参数更新

1.3 AI训练中的并行策略

1.3.1 数据并行(Data Parallelism)

数据并行是最常用的并行策略,其核心思想是:

  • 每个GPU持有完整的模型副本
  • 训练数据被分片到不同GPU
  • GPU独立计算梯度
  • 通过AllReduce操作同步梯度
  • 所有GPU使用相同的梯度更新模型

适用场景:模型能够放入单个GPU显存的情况。

1.3.2 模型并行(Model Parallelism)

当模型过大无法放入单个GPU时,需要将模型拆分到多个GPU

  • 模型的不同层分布在不同GPU
  • 数据在GPU间流动,依次经过各层
  • 需要处理GPU间的激活值传递

适用场景:超大模型训练(如GPT-3LLaMA-70B等)。

1.4 分布式通信后端

后端适用场景支持操作性能特点
NCCLGPU集群全部集合通信最优GPU通信性能
GlooCPU/GPU全部集合通信跨平台兼容
MPIHPC集群点对点+集合通信传统HPC标准

2. PyTorch分布式训练原理

2.1 torch.distributed核心概念

PyTorch通过torch.distributed包提供分布式训练能力,核心概念包括:

概念说明示例
World Size参与训练的总进程数4节点×8GPU = 32
Rank当前进程的全局唯一标识0, 1, 2, ..., 31
Local Rank当前进程在本节点的标识0, 1, ..., 7
Process Group进程组,用于集合通信默认组包含所有进程
Backend通信后端nccl, gloo, mpi

2.2 分布式数据并行(DDP)

DistributedDataParallelDDP)是PyTorch推荐的数据并行方案:

DDP核心优势

  • 梯度同步与反向传播重叠,减少通信开销
  • 每个进程独立运行,无GIL限制
  • 支持多机多卡扩展

2.3 全分片数据并行(FSDP)

FullyShardedDataParallelFSDP)是PyTorch针对大模型的优化方案:

特性DDPFSDP
模型存储每个GPU存储完整模型模型参数分片存储
显存占用高(N份完整模型)低(1/N模型参数)
通信模式AllReduce梯度AllGather参数 + ReduceScatter梯度
适用场景中小模型大模型(>10B参数)

2.4 torchrun启动器

torchrunPyTorch官方推荐的分布式训练启动工具,它会自动设置以下环境变量:

环境变量说明来源
WORLD_SIZE总进程数--nnodes × --nproc-per-node
RANK全局进程编号自动计算
LOCAL_RANK本地进程编号自动计算
MASTER_ADDR主节点地址--master-addr
MASTER_PORT主节点端口--master-port

torchrun命令示例

# 单机4卡
torchrun --nproc-per-node=4 train.py

# 多机多卡(4节点,每节点8卡)
torchrun \
--nnodes=4 \
--nproc-per-node=8 \
--node-rank=0 \
--master-addr=192.168.1.1 \
--master-port=29500 \
train.py

3. Kubeflow Trainer PyTorch并行计算支持

3.1 整体架构

3.2 核心CRD设计

3.2.1 TrainJob

TrainJob是面向数据科学家的简化API,用于提交训练任务:

apiVersion: trainer.kubeflow.org/v1alpha1
kind: TrainJob
metadata:
name: pytorch-ddp-training
namespace: ml-team
spec:
# 引用预定义的训练运行时
runtimeRef:
apiGroup: trainer.kubeflow.org
kind: ClusterTrainingRuntime
name: torch-distributed

# 训练器配置(可覆盖Runtime默认值)
trainer:
image: my-registry/pytorch-training:v1.0
command:
- torchrun
- train.py
args:
- --epochs=100
- --batch-size=64
numNodes: 4 # 训练节点数
numProcPerNode: 8 # 每节点进程数(通常等于GPU数)
resourcesPerNode:
requests:
nvidia.com/gpu: 8
memory: "64Gi"
cpu: "16"
env:
- name: NCCL_DEBUG
value: "INFO"

3.2.2 ClusterTrainingRuntime

ClusterTrainingRuntime是平台工程师预定义的训练任务模板:

apiVersion: trainer.kubeflow.org/v1alpha1
kind: ClusterTrainingRuntime
metadata:
name: torch-distributed
labels:
trainer.kubeflow.org/framework: torch
spec:
# ML策略配置
mlPolicy:
numNodes: 1 # 默认节点数
torch:
numProcPerNode: auto # auto/cpu/gpu/具体数值
# 弹性训练配置(可选)
# elasticPolicy:
# minNodes: 2
# maxNodes: 8
# maxRestarts: 3

# Gang调度策略(可选)
podGroupPolicy:
volcano: {} # 使用Volcano调度器

# JobSet模板
template:
spec:
replicatedJobs:
- name: node
template:
metadata:
labels:
trainer.kubeflow.org/trainjob-ancestor-step: trainer
spec:
template:
spec:
containers:
- name: node
image: pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime
ports:
- containerPort: 29500 # 分布式通信端口

3.3 Torch Plugin实现原理

Kubeflow Trainer通过Torch Plugin实现对PyTorch分布式训练的支持。该插件的核心功能是自动注入分布式环境变量。

3.3.1 环境变量注入

Torch Plugin会自动为每个训练Pod注入以下环境变量:

环境变量说明计算方式
PET_NNODES训练节点总数spec.trainer.numNodes
PET_NPROC_PER_NODE每节点进程数spec.mlPolicy.torch.numProcPerNode
PET_NODE_RANK当前节点编号Job Completion Index获取
PET_MASTER_ADDR主节点地址{trainjob-name}-node-0-0.{trainjob-name}
PET_MASTER_PORT主节点端口默认29500

注意:这些环境变量使用PET_前缀(PyTorch Elastic Training),torchrun会自动识别并使用这些变量。

3.3.2 numProcPerNode计算逻辑

numProcPerNode支持多种配置方式:

配置值行为
auto如果配置了GPU资源,使用GPU数量;否则使用CPU数量
cpu使用CPU核心数
gpu使用GPU数量
整数值使用指定的数值
// 源码逻辑简化
if numProcPerNode == "auto" {
if gpuCount > 0 {
numProcPerNode = gpuCount
} else {
numProcPerNode = cpuCount
}
}

3.4 执行流程详解

3.5 Headless Service与节点发现

Kubeflow Trainer会自动创建Headless Service用于节点间通信:

apiVersion: v1
kind: Service
metadata:
name: pytorch-ddp-training # 与TrainJob同名
spec:
clusterIP: None # Headless Service
selector:
trainer.kubeflow.org/trainjob-name: pytorch-ddp-training
ports:
- port: 29500
targetPort: 29500

节点DNS解析

  • 主节点:pytorch-ddp-training-node-0-0.pytorch-ddp-training
  • 工作节点1:pytorch-ddp-training-node-0-1.pytorch-ddp-training
  • 工作节点2:pytorch-ddp-training-node-0-2.pytorch-ddp-training

4. 算法工程师使用指南

4.1 是否需要感知底层架构?

简短回答不需要Kubeflow Trainer的设计目标之一就是对算法工程师屏蔽Kubernetes和分布式训练的复杂性。

关注点算法工程师需要关心Kubeflow Trainer自动处理
分布式环境初始化✅ 自动注入环境变量
节点发现与通信✅ 自动创建Headless Service
torchrun参数配置✅ 自动计算nnodesnproc
GPU资源分配✅ 通过Runtime配置
Pod调度与编排✅ 通过JobSet管理
训练代码编写-
模型架构设计-
超参数调优-

4.2 训练代码编写规范

算法工程师只需按照标准的PyTorch分布式训练方式编写代码:

def train():
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

# [1] 初始化分布式环境
# Kubeflow Trainer已自动设置所有必要的环境变量
device = "cuda" if torch.cuda.is_available() else "cpu"
backend = "nccl" if device == "cuda" else "gloo"
dist.init_process_group(backend=backend)

# [2] 获取分布式信息(可选,用于日志或调试)
world_size = dist.get_world_size()
rank = dist.get_rank()
local_rank = int(os.environ.get("LOCAL_RANK", 0))

print(f"Initialized: World Size={world_size}, Rank={rank}, Local Rank={local_rank}")

# [3] 设置当前进程使用的GPU
if device == "cuda":
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")

# [4] 创建模型并包装为DDP
model = MyModel().to(device)
model = DDP(model, device_ids=[local_rank] if device.type == "cuda" else None)

# [5] 创建数据加载器(使用DistributedSampler)
dataset = MyDataset()
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=64, sampler=sampler)

# [6] 训练循环
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
sampler.set_epoch(epoch) # 重要:确保每个epoch数据打乱方式不同
for batch in dataloader:
inputs, labels = batch
inputs, labels = inputs.to(device), labels.to(device)

optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# 只在主节点保存模型
if rank == 0:
torch.save(model.module.state_dict(), f"model_epoch_{epoch}.pt")

# [7] 清理
dist.destroy_process_group()

if __name__ == "__main__":
train()

4.3 使用Kubeflow Python SDK提交任务

算法工程师可以使用Kubeflow Python SDK直接从Python代码提交训练任务:

from kubeflow.trainer import TrainerClient, CustomTrainer

# 定义训练函数
def train_pytorch():
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 初始化分布式环境(Kubeflow Trainer自动配置)
device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
dist.init_process_group(backend=backend)

local_rank = int(os.environ.get("LOCAL_RANK", 0))

# 创建模型
model = torch.nn.Linear(10, 10).to(device)
model = DDP(model)

# 训练逻辑...
print(f"Training on rank {dist.get_rank()}/{dist.get_world_size()}")

dist.destroy_process_group()

# 创建训练任务
client = TrainerClient()

job_id = client.train(
trainer=CustomTrainer(
func=train_pytorch,
num_nodes=4, # 4个训练节点
resources_per_node={
"cpu": 8,
"memory": "32Gi",
"gpu": 2, # 每节点2个GPU
},
packages_to_install=["transformers>=4.53.0"], # 额外依赖
)
)

# 等待任务完成
client.wait_for_job_status(job_id)

# 查看日志
for log in client.get_job_logs(job_id, follow=True):
print(log)

4.4 关键注意事项

4.4.1 数据加载

# ✅ 正确:使用DistributedSampler
sampler = DistributedSampler(dataset)
dataloader = DataLoader(dataset, sampler=sampler)

# ❌ 错误:不使用Sampler会导致所有节点处理相同数据
dataloader = DataLoader(dataset, shuffle=True)

4.4.2 模型保存

# ✅ 正确:只在主节点保存
if dist.get_rank() == 0:
torch.save(model.module.state_dict(), "model.pt")

# ❌ 错误:所有节点都保存会造成冲突
torch.save(model.state_dict(), "model.pt")

4.4.3 日志输出

# ✅ 推荐:只在主节点输出详细日志
if dist.get_rank() == 0:
print(f"Epoch {epoch}, Loss: {loss.item()}")

# 或使用条件日志
def log(msg):
if dist.get_rank() == 0:
print(msg)

4.4.4 随机种子

# ✅ 正确:设置确定性种子,但每个节点使用不同的数据增强
torch.manual_seed(42 + dist.get_rank())

4.5 FSDP使用示例

对于大模型训练,可以使用FSDP

from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy

def train_large_model():
import torch
import torch.distributed as dist
from transformers import AutoModelForCausalLM

dist.init_process_group(backend="nccl")
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)

# 加载大模型
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b")

# 使用FSDP包装
model = FSDP(
model,
auto_wrap_policy=transformer_auto_wrap_policy,
device_id=local_rank,
)

# 训练逻辑...

dist.destroy_process_group()

5. 完整示例

5.1 TrainingRuntime配置

apiVersion: trainer.kubeflow.org/v1alpha1
kind: ClusterTrainingRuntime
metadata:
name: pytorch-ddp-gpu
spec:
mlPolicy:
numNodes: 2
torch:
numProcPerNode: auto
podGroupPolicy:
volcano: {}
template:
spec:
replicatedJobs:
- name: node
template:
metadata:
labels:
trainer.kubeflow.org/trainjob-ancestor-step: trainer
spec:
template:
spec:
schedulerName: volcano
containers:
- name: node
image: pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime
resources:
limits:
nvidia.com/gpu: 4
memory: "64Gi"
cpu: "16"
env:
- name: NCCL_DEBUG
value: "INFO"

5.2 TrainJob提交

apiVersion: trainer.kubeflow.org/v1alpha1
kind: TrainJob
metadata:
name: mnist-ddp-training
namespace: ml-team
spec:
runtimeRef:
name: pytorch-ddp-gpu
trainer:
image: my-registry/mnist-training:v1.0
command:
- torchrun
- train.py
args:
- --epochs=50
- --batch-size=128
numNodes: 4
resourcesPerNode:
requests:
nvidia.com/gpu: 8

5.3 训练脚本

#!/usr/bin/env python3
"""
MNIST分布式训练示例
此脚本可直接在Kubeflow Trainer中运行,无需修改
"""

import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision import datasets, transforms


class MNISTNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)

def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)


def train(args):
# 初始化分布式环境
device = "cuda" if torch.cuda.is_available() else "cpu"
backend = "nccl" if device == "cuda" else "gloo"
dist.init_process_group(backend=backend)

rank = dist.get_rank()
world_size = dist.get_world_size()
local_rank = int(os.environ.get("LOCAL_RANK", 0))

if device == "cuda":
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")

if rank == 0:
print(f"Starting distributed training with {world_size} processes")

# 准备数据
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])

# 只在rank 0下载数据
if rank == 0:
datasets.MNIST('./data', train=True, download=True)
dist.barrier()

dataset = datasets.MNIST('./data', train=True, transform=transform)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=args.batch_size, sampler=sampler)

# 创建模型
model = MNISTNet().to(device)
model = DDP(model, device_ids=[local_rank] if device.type == "cuda" else None)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练循环
for epoch in range(args.epochs):
sampler.set_epoch(epoch)
model.train()

for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.to(device), target.to(device)

optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()

if batch_idx % 100 == 0 and rank == 0:
print(f"Epoch {epoch} [{batch_idx * len(data)}/{len(dataset)}] Loss: {loss.item():.6f}")

# 保存检查点
if rank == 0:
torch.save({
'epoch': epoch,
'model_state_dict': model.module.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
}, f'checkpoint_epoch_{epoch}.pt')

if rank == 0:
print("Training completed!")

dist.destroy_process_group()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=10)
parser.add_argument("--batch-size", type=int, default=64)
args = parser.parse_args()

train(args)

6. 参考资料