文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

PyTorch 并行训练 DistributedDataParallel 完整代码示例

2024-11-30 18:27

关注

为了应对这些挑战,已经开发了各种技术来扩大具有大型数据集的大型 DNN 的训练,包括模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。

在本文中我们将演示使用 PyTorch 的数据并行性和模型并行性。

我们所说的并行性一般是指在多个gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练时间。数据并行背后的基本思想是将训练数据分成更小的块,让每个GPU或机器处理一个单独的数据块。然后将每个节点的结果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是相同的,但模型参数在节点之间进行了分区。每个节点使用分配的数据块训练自己的本地模型,在每次训练迭代结束时,模型参数在所有节点之间同步。这个过程不断重复,直到模型收敛到一个令人满意的结果。

下面我们用用ResNet50和CIFAR10数据集来进行完整的代码示例:

在数据并行中,模型架构在每个节点上保持相同,但模型参数在节点之间进行了分区,每个节点使用分配的数据块训练自己的本地模型。

PyTorch的DistributedDataParallel 库可以进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何使用ResNet50和CIFAR10数据集使用PyTorch实现数据并行的示例,其中代码在多个gpu或机器上运行,每台机器处理训练数据的一个子集。训练过程使用PyTorch的DistributedDataParallel 库进行并行化。

导入必须要的库

import os
from datetime import datetime
from time import time
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel

接下来,我们将检查GPU。

import subprocess
result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
print(result.stdout.decode())

因为我们需要在多个服务器上运行,所以手动一个一个执行并不现实,所以需要有一个调度程序。这里我们使用SLURM文件来运行代码(slurm面向Linux和Unix类似内核的免费和开源工作调度程序),

def main():

# get distributed configuration from Slurm environment

parser = argparse.ArgumentParser()
parser.add_argument('-b', '--batch-size', default=128, type =int,
help='batch size. it will be divided in mini-batch for each worker')
parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('-c','--checkpoint', default=None, type=str,
help='path to checkpoint to load')
args = parser.parse_args()

rank = int(os.environ['SLURM_PROCID'])
local_rank = int(os.environ['SLURM_LOCALID'])
size = int(os.environ['SLURM_NTASKS'])
master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
port = "29500"
node_id = os.environ['SLURM_NODEID']
ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
train(args, ddp_arg)

然后,我们使用DistributedDataParallel 库来执行分布式训练。

def train(args, ddp_arg):

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg

# display info
if rank == 0:
#print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
#print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))

print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))


# configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
#dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
dist.init_process_group(
backend='nccl',
init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
world_size=size,
rank=rank
)

# distribute model
torch.cuda.set_device(local_rank)
gpu = torch.device("cuda")
#model = ResNet18(classes=10).to(gpu)
model = torchvision.models.resnet50(pretrained=False).to(gpu)
ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
if args.checkpoint is not None:
map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))

# distribute batch size (mini-batch)
batch_size = args.batch_size
batch_size_per_gpu = batch_size // size

# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)


transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# load data with distributed sampler
#train_dataset = torchvision.datasets.CIFAR10(root='./data',
# train=True,
# transform=transform_train,
# download=False)

# load data with distributed sampler
train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=size,
rank=rank)

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)

# training (timers and display handled by process 0)
if rank == 0: start = datetime.now()
total_step = len(train_loader)

for epoch in range(args.epochs):
if rank == 0: start_dataload = time()

for i, (images, labels) in enumerate(train_loader):

# distribution of images and labels to all GPUs
images = images.to(gpu, non_blocking=True)
labels = labels.to(gpu, non_blocking=True)

if rank == 0: stop_dataload = time()

if rank == 0: start_training = time()

# forward pass
outputs = ddp_model(images)
loss = criterion(outputs, labels)

# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()

if rank == 0: stop_training = time()
if (i + 1) % 10 == 0 and rank == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
if rank == 0: start_dataload = time()

#Save checkpoint at every end of epoch
if rank == 0:
torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))

if rank == 0:
print(">>> Training complete in: " + str(datetime.now() - start))


if __name__ == '__main__':

main()

代码将数据和模型分割到多个gpu上,并以分布式的方式更新模型。下面是代码的一些解释:

train(args, ddp_arg)有两个参数,args和ddp_arg,其中args是传递给脚本的命令行参数,ddp_arg包含分布式训练相关参数。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式训练相关参数。

如果rank为0,则打印当前使用的gpu数量和主节点IP地址信息。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式进程组。

torch.cuda.set_device(local_rank):为这个进程选择指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):从torchvision模型中加载ResNet50模型,并将其移动到指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):将模型包装在DistributedDataParallel模块中,也就是说这样我们就可以进行分布式训练了

加载CIFAR-10数据集并应用数据增强转换。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):创建一个DistributedSampler对象,将数据集分割到多个gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):创建一个DataLoader对象,数据将批量加载到模型中,这与我们平常训练的步骤是一致的只不过是增加了一个分布式的数据采样DistributedSampler。

为指定的epoch数训练模型,以分布式的方式使用optimizer.step()更新权重。

rank0在每个轮次结束时保存一个检查点。

rank0每10个批次显示损失和训练时间。

结束训练时打印训练模型所花费的总时间也是在rank0上。

代码测试

在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。

在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。

这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。

来源:DeepHub IMBA内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯