在torch.distributed中,如何正确平均不同GPU上的梯度?
修改自 https://github.com/seba-1511/dist_tuto.pth/blob/gh-pages/train_dist.py ,下面的代码可以成功使用两个 GPU(可以使用 nvidia-smi 检查)。
但难以理解的一件事是,下面的“average_gradients”是否确实是在两个 GPU 上对两个模型进行梯度平均的正确方法。与下面的代码一样,使用两个进程运行的两个“model = Net()”是两个不同 GPU 上的两个模型,但是“average_gradients(model)”行只是在一个 GPU 上“平均”模型的梯度,而不是两个 GPU两个 GPU 上的模型。
问题是下面的代码确实是在两个 GPU 上平均梯度的正确方法吗?如果是真的,如何阅读、如何理解代码?如果不是,在下面的两个模型上平均梯度的正确方法是什么?
import os import torch import torch.distributed as dist import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from math import ceil from random import Random from torch.multiprocessing import Process from torchvision import datasets, transforms os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" class Partition(object): """ Dataset-like object, but only access a subset of it. """ def __init__(self, data, index): self.data = data self.index = index def __len__(self): return len(self.index) def __getitem__(self, index): data_idx = self.index[index] return self.data[data_idx] class DataPartitioner(object): """ Partitions a dataset into different chuncks. """ def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234): self.data = data self.partitions = [] rng = Random() rng.seed(seed) data_len = len(data) indexes = [x for x in range(0, data_len)] rng.shuffle(indexes) for frac in sizes: part_len = int(frac * data_len) self.partitions.append(indexes[0:part_len]) indexes = indexes[part_len:] def use(self, partition): return Partition(self.data, self.partitions[partition]) class Net(nn.Module): """ Network architecture. """ def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(10, 20, kernel_size=5) self.conv2_drop = nn.Dropout2d() self.fc1 = nn.Linear(320, 50) self.fc2 = nn.Linear(50, 10) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 320) x = F.relu(self.fc1(x)) x = F.dropout(x, training=self.training) x = self.fc2(x) return F.log_softmax(x) def partition_dataset(): """ Partitioning MNIST """ dataset = datasets.MNIST( './data', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307, ), (0.3081, )) ])) size = dist.get_world_size() bsz = int(256 / float(size)) partition_sizes = [1.0 / size for _ in range(size)] partition = DataPartitioner(dataset, partition_sizes) partition = partition.use(dist.get_rank()) train_set = torch.utils.data.DataLoader( partition, batch_size=bsz, shuffle=True) return train_set, bsz def average_gradients(model): """ Gradient averaging. """ size = float(dist.get_world_size()) for param in model.parameters(): dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) param.grad.data /= size def run(rank, size): """ Distributed Synchronous SGD Example """ # print("107 size = ", size) # print("dist.get_world_size() = ", dist.get_world_size()) ## 2 torch.manual_seed(1234) train_set, bsz = partition_dataset() device = torch.device("cuda:{}".format(rank)) model = Net() model = model.to(device) optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) num_batches = ceil(len(train_set.dataset) / float(bsz)) for epoch in range(10): epoch_loss = 0.0 for data, target in train_set: # data, target = Variable(data), Variable(target) # data, target = Variable(data.cuda(rank)), Variable(target.cuda(rank)) data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) epoch_loss += loss.item() loss.backward() average_gradients(model) optimizer.step() print('Rank ', dist.get_rank(), ', epoch ', epoch, ': ', epoch_loss / num_batches) # if epoch == 4: # from utils import module_utils # module_utils.save_model() def init_processes(rank, size, fn, backend='gloo'): """ Initialize the distributed environment. """ os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29500' dist.init_process_group(backend, rank=rank, world_size=size) fn(rank, size) if __name__ == "__main__": size = 2 processes = [] for rank in range(size): p = Process(target=init_processes, args=(rank, size, run)) p.start() processes.append(p) for p in processes: p.join()
最佳答案
我的解决方案是使用 DistributedDataParallel 而不是如下所示的 DataParallel。
代码
for param in self.model.parameters():
torch.distributed.all_reduce(param.grad.data)
可以成功工作。
class DDPOptimizer:
def __init__(self, model, torch_optim=None, learning_rate=None):
"""
:param parameters:
:param torch_optim: like torch.optim.Adam(parameters, lr=learning_rate, eps=1e-9)
or optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
:param is_ddp:
"""
if torch_optim is None:
torch_optim = torch.optim.Adam(model.parameters(), lr=3e-4, eps=1e-9)
if learning_rate is not None:
torch_optim.defaults["lr"] = learning_rate
self.model = model
self.optimizer = torch_optim
def optimize(self, loss):
self.optimizer.zero_grad()
loss.backward()
for param in self.model.parameters():
torch.distributed.all_reduce(param.grad.data)
self.optimizer.step()
pass
def run():
""" Distributed Synchronous SGD Example """
module_utils.initialize_torch_distributed()
start = time.time()
train_set, bsz = partition_dataset()
model = Net()
local_rank = torch.distributed.get_rank()
device = torch.device("cuda", local_rank)
model = model.to(device)
sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
optimizer = DDPOptimizer(model, torch_optim=sgd)
# optimizer = NoamOptimizerDistributed(100, 1, 10, model)
num_batches = math.ceil(len(train_set.dataset) / float(bsz))
epoch, end_epoch = 1, 10
while epoch <= end_epoch:
epoch_loss = 0.0
for data, target in train_set:
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
optimizer.optimize(loss)
print('Rank ', dist.get_rank(), ', epoch ', epoch, ': ', epoch_loss / num_batches)
# if epoch % 6 == 0:
# if local_rank == 0:
# module_utils.save_model(model, "a.pt")
epoch += 1
print("Time take to train: ", time.time() - start)
关于python - 在torch.distributed中,如何正确平均不同GPU上的梯度?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58671916/