python tqdm pytorch_pytorch使用horovod多gpu训练的实现

pytorch在Horovod上训练步骤分为以下几步:

import torch

import horovod.torch as hvd

# Initialize Horovod 初始化horovod

hvd.init()

# Pin GPU to be used to process local rank (one GPU per process) 分配到每个gpu上

torch.cuda.set_device(hvd.local_rank())

# Define dataset... 定义dataset

train_dataset = ...

# Partition dataset among workers using DistributedSampler 对dataset的采样器进行调整,使用torch.utils.data.distributed.DistributedSampler

train_sampler = torch.utils.data.distributed.DistributedSampler(

train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

# Build model...

model = ...

model.cuda()

optimizer = optim.SGD(model.parameters())

# Add Horovod Distributed Optimizer 使用Horovod的分布式优化器函数包裹在原先optimizer上

optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

# Broadcast parameters from rank 0 to all other processes. 参数广播到每个gpu上

hvd.broadcast_parameters(model.state_dict(), root_rank=0)

for epoch in range(100):

for batch_idx, (data, target) in enumerate(train_loader):

optimizer.zero_grad()

output = model(data)

loss = F.nll_loss(output, target)

loss.backward()

optimizer.step()

if batch_idx % args.log_interval == 0:

print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(

epoch, batch_idx * len(data), len(train_sampler), loss.item()))

完整示例代码如下,在imagenet上采用resnet50进行训练

from __future__ import print_function

import torch

import argparse

import torch.backends.cudnn as cudnn

import torch.nn.functional as F

import torch.optim as optim

import torch.utils.data.distributed

from torchvision import datasets, transforms, models

import horovod.torch as hvd

import os

import math

from tqdm import tqdm

from distutils.version import LooseVersion

# Training settings

parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',

formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),

help='path to training data')

parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),

help='path to validation data')

parser.add_argument('--log-dir', default='./logs',

help='tensorboard log directory')

parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',

help='checkpoint file format')

parser.add_argument('--fp-allreduce', action='store_true', default=False,

help='use fp compression during allreduce')

parser.add_argument('--batches-per-allreduce', type=int, default=,

help='number of batches processed locally before '

'executing allreduce across workers; it multiplies '

'total batch size.')

parser.add_argument('--use-adasum', action='store_true', default=False,

help='use adasum algorithm to do reduction')

# Default settings from https://arxiv.org/abs/1706.02677.

parser.add_argument('--batch-size', type=int, default=32,

help='input batch size for training')

parser.add_argument('--val-batch-size', type=int, default=32,

help='input batch size for validation')

parser.add_argument('--epochs', type=int, default=90,

help='number of epochs to train')

parser.add_argument('--base-lr', type=float, default=0.0125,

44 help='learning rate for a single GPU')

45 parser.add_argument('--warmup-epochs', type=float, default=5,

help='number of warmup epochs')

parser.add_argument('--momentum', type=float, default=0.9,

help='SGD momentum')

parser.add_argument('--wd', type=float, default=0.00005,

help='weight decay')

parser.add_argument('--no-cuda', action='store_true', default=False,

help='disables CUDA training')

parser.add_argument('--seed', type=int, default=42,

help='random seed')

args = parser.parse_args()

args.cuda = not args.no_cuda and torch.cuda.is_available()

allreduce_batch_size = args.batch_size * args.batches_per_allreduce

hvd.init()

torch.manual_seed(args.seed)

if args.cuda:

# Horovod: pin GPU to local rank.

torch.cuda.set_device(hvd.local_rank())

torch.cuda.manual_seed(args.seed)

cudnn.benchmark = True

# If set > 0, will resume training from a given checkpoint.

resume_from_epoch = 0

for try_epoch in range(args.epochs, 0, -1):

if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):

resume_from_epoch = try_epoch

break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have

# checkpoints) to other ranks.

resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,

name='resume_from_epoch').item()

# Horovod: print logs on the first worker.

verbose = 1 if hvd.rank() == 0 else 0

# Horovod: write TensorBoard logs on first worker.

try:

if LooseVersion(torch.__version__) >= LooseVersion('1.2.0'):

from torch.utils.tensorboard import SummaryWriter

else:

from tensorboardX import SummaryWriter

log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None

except ImportError:

log_writer = None

# Horovod: limit # of CPU threads to be used per worker.

torch.set_num_threads(4)

kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}

train_dataset = \

datasets.ImageFolder(args.train_dir,

transform=transforms.Compose([

transforms.RandomResizedCrop(224),

transforms.RandomHorizontalFlip(),

transforms.ToTensor(),

transforms.Normalize(mean=[., ., .],

std=[0.229, 0.224, 0.225])

]))

# Horovod: use DistributedSampler to partition data among workers. Manually specify

# `num_replicas=hvd.size()` and `rank=hvd.rank()`.

train_sampler = torch.utils.data.distributed.DistributedSampler(

train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(

train_dataset, batch_size=allreduce_batch_size,

sampler=train_sampler, **kwargs)

val_dataset = \

datasets.ImageFolder(args.val_dir,

transform=transforms.Compose([

transforms.Resize(256),

transforms.CenterCrop(224),

transforms.ToTensor(),

transforms.Normalize(mean=[0.485, 0.456, 0.406],

std=[0.229, 0.224, 0.225])

]))

val_sampler = torch.utils.data.distributed.DistributedSampler(

val_dataset, num_replicas=hvd.size(), rank=hvd.rank())

val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size,

sampler=val_sampler, **kwargs)

# Set up standard ResNet-50 model.

model = models.resnet50()

# By default, Adasum doesn't need scaling up learning rate.

# For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce

lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1

if args.cuda:

# Move model to GPU.

model.cuda()

# If using GPU Adasum allreduce, scale learning rate by local_size.

if args.use_adasum and hvd.nccl_built():

lr_scaler = args.batches_per_allreduce * hvd.local_size()

# Horovod: scale learning rate by the number of GPUs.

optimizer = optim.SGD(model.parameters(),

lr=(args.base_lr *

lr_scaler),

momentum=args.momentum, weight_decay=args.wd)

# Horovod: (optional) compression algorithm.

compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Horovod: wrap optimizer with DistributedOptimizer.

optimizer = hvd.DistributedOptimizer(

optimizer, named_parameters=model.named_parameters(),

compression=compression,

backward_passes_per_step=args.batches_per_allreduce,

op=hvd.Adasum if args.use_adasum else hvd.Average)

# Restore from a previous checkpoint, if initial_epoch is specified.

# Horovod: restore on the first worker which will broadcast weights to other workers.

if resume_from_epoch > 0 and hvd.rank() == 0:

filepath = args.checkpoint_format.format(epoch=resume_from_epoch)

checkpoint = torch.load(filepath)

model.load_state_dict(checkpoint['model'])

optimizer.load_state_dict(checkpoint['optimizer'])

# Horovod: broadcast parameters & optimizer state.

hvd.broadcast_parameters(model.state_dict(), root_rank=)

hvd.broadcast_optimizer_state(optimizer, root_rank=)

def train(epoch):

model.train()

train_sampler.set_epoch(epoch)

train_loss = Metric('train_loss')

train_accuracy = Metric('train_accuracy')

with tqdm(total=len(train_loader),

desc='Train Epoch #{}'.format(epoch + 1),

disable=not verbose) as t:

for batch_idx, (data, target) in enumerate(train_loader):

adjust_learning_rate(epoch, batch_idx)

if args.cuda:

data, target = data.cuda(), target.cuda()

optimizer.zero_grad()

# Split data into sub-batches of size batch_size

for i in range(0, len(data), args.batch_size):

data_batch = data[i:i + args.batch_size]

target_batch = target[i:i + args.batch_size]

output = model(data_batch)

train_accuracy.update(accuracy(output, target_batch))

loss = F.cross_entropy(output, target_batch)

train_loss.update(loss)

# Average gradients among sub-batches

loss.div_(math.ceil(float(len(data)) / args.batch_size))

loss.backward()

# Gradient is applied across all ranks

optimizer.step()

t.set_postfix({'loss': train_loss.avg.item(),

'accuracy': 100. * train_accuracy.avg.item()})

t.update(1)

if log_writer:

log_writer.add_scalar('train/loss', train_loss.avg, epoch)

log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)

def validate(epoch):

model.eval()

val_loss = Metric('val_loss')

val_accuracy = Metric('val_accuracy')

with tqdm(total=len(val_loader),

desc='Validate Epoch #{}'.format(epoch + ),

disable=not verbose) as t:

with torch.no_grad():

for data, target in val_loader:

if args.cuda:

data, target = data.cuda(), target.cuda()

output = model(data)

val_loss.update(F.cross_entropy(output, target))

val_accuracy.update(accuracy(output, target))

t.set_postfix({'loss': val_loss.avg.item(),

'accuracy': 100. * val_accuracy.avg.item()})

t.update(1)

if log_writer:

log_writer.add_scalar('val/loss', val_loss.avg, epoch)

log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)

# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final

# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during

# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.

# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.

def adjust_learning_rate(epoch, batch_idx):

if epoch < args.warmup_epochs:

epoch += float(batch_idx + 1) / len(train_loader)

lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)

elif epoch < 30:

lr_adj = 1.

elif epoch < 60:

lr_adj = 1e-1

elif epoch < 80:

lr_adj = 1e-2

else:

lr_adj = 1e-3

for param_group in optimizer.param_groups:

param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj

def accuracy(output, target):

# get the index of the max log-probability

pred = output.max(1, keepdim=True)[1]

return pred.eq(target.view_as(pred)).cpu().float().mean()

def save_checkpoint(epoch):

if hvd.rank() == 0:

filepath = args.checkpoint_format.format(epoch=epoch + 1)

state = {

'model': model.state_dict(),

'optimizer': optimizer.state_dict(),

}

torch.save(state, filepath)

# Horovod: average metrics from distributed training.

class Metric(object):

def __init__(self, name):

self.name = name

self.sum = torch.tensor(0.)

self.n = torch.tensor(0.)

def update(self, val):

self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)

self.n += 1

@property

def avg(self):

return self.sum / self.n

for epoch in range(resume_from_epoch, args.epochs):

train(epoch)

validate(epoch)

save_checkpoint(epoch)

到此这篇关于pytorch使用horovod多gpu训练的实现的文章就介绍到这了,更多相关pytorch horovod多gpu训练内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部