分布式训练框架Horovod初步学习
简介
Horovod 是 TensorFlow、Keras、PyTorch 和 Apache MXNet 的分布式深度学习训练框架。Horovod 的目标是使分布式深度学习快速且易于使用。
简单来说就是为这些框架提供分布式支持,比如有一个需求,由于数据量过大(千万级),想要在128个GPU上运行,以便于快速得到结果,这时候就可以用horovod,只需要简单改不多的代码,就可以将原来在单GPU上跑的模型,并行跑在128个GPU上。
安装
安装CMake:https://cmake.org/install/
如果您安装了 PyPI 中的 TensorFlow,请确保Tensorflow已安装 或安装了
g++-4.8.5``g++-4.9
如果您从PyPI:https://pypi.org/project/torch 安装了 PyTorch,请确保已安装了
g++-4.9
如果已安装来自Conda 的任一包,请确保已安装 Conda 中的
gxx_linux-64
包安装 pip
horovod
在 CPU 上运行:
$ pip install horovod
要使用 NCCL 在 GPU 上运行:
$ HOROVOD_GPU_OPERATIONS=NCCL pip install horovod
有关使用 GPU 支持安装 Horovod 的更多详细信息,请阅读GPU 上的 Horovod。
名词解释
rank:
表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。
local_rank:
进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。
allreduce:
累加所有数据,并同步到所有节点的操作
allgather:
收集所有数据,并同步到所有节点的操作,完成后每个节点都包含所有节点的数据,并且这些数据单独存在
broadcast:
将数据(需要由根节点确认)从一个节点传播到其他所有节点的操作
使用指南
添加以下步骤:
hvd.init()
用于初始化horovod将每个GPU固定给单个进程处理,以避免资源竞争。
每个进程设置为一个GPU,通过设置local rank参数,服务器上的第一个进程将分配第一个 GPU,第二个进程将分配第二个 GPU,等等
if torch.cuda.is_available():
torch.cuda.set_device(hvd.local_rank())
根据线程个数缩放学习率
将优化器包装在
hvd.DistributedOptimizer
中。分布式优化器将梯度计算委托给原始优化器,使用allduce或allgather来平均梯度,然后应用这些平均梯度。
将初始变量的状态从rank 0广播至其他进程。需要保证初始化的一致性。
修改权重保存部分源码,只通过worker 0保存权重,防止由于多线程操作导致的冲突。
Tensorflow + Horovod
这里Tensorflow是1.x版本,更加稳定一些,以下是一个修改示例。
import tensorflow as tf
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())
# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)
# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]
# Make training operation
train_op = opt.minimize(loss)
# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
config=config,
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Perform synchronous training.
mon_sess.run(train_op)
完整代码如下:
import os
import errno
import tensorflow as tf
import horovod.tensorflow as hvd
import numpy as np
import argparse
from tensorflow import keras
layers = tf.layers
tf.logging.set_verbosity(tf.logging.INFO)
# Training settings
parser = argparse.ArgumentParser(description='Tensorflow MNIST Example')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
help='apply gradient predivide factor in optimizer (default: 1.0)')
args = parser.parse_args()
def conv_model(feature, target, mode):
"""2-layer convolution model."""
# Convert the target to a one-hot tensor of shape (batch_size, 10) and
# with a on-value of 1 for each one-hot vector of length 10.
target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0)
# Reshape feature to 4d tensor with 2nd and 3rd dimensions being
# image width and height final dimension being the number of color channels.
feature = tf.reshape(feature, [-1, 28, 28, 1])
# First conv layer will compute 32 features for each 5x5 patch
with tf.variable_scope('conv_layer1'):
h_conv1 = layers.conv2d(feature, 32, kernel_size=[5, 5],
activation=tf.nn.relu, padding="SAME")
h_pool1 = tf.nn.max_pool(
h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# Second conv layer will compute 64 features for each 5x5 patch.
with tf.variable_scope('conv_layer2'):
h_conv2 = layers.conv2d(h_pool1, 64, kernel_size=[5, 5],
activation=tf.nn.relu, padding="SAME")
h_pool2 = tf.nn.max_pool(
h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# reshape tensor into a batch of vectors
h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])
# Densely connected layer with 1024 neurons.
h_fc1 = layers.dropout(
layers.dense(h_pool2_flat, 1024, activation=tf.nn.relu),
rate=0.5, training=mode == tf.estimator.ModeKeys.TRAIN)
# Compute logits (1 per class) and compute loss.
logits = layers.dense(h_fc1, 10, activation=None)
loss = tf.losses.softmax_cross_entropy(target, logits)
return tf.argmax(logits, 1), loss
def train_input_generator(x_train, y_train, batch_size=64):
assert len(x_train) == len(y_train)
while True:
p = np.random.permutation(len(x_train))
x_train, y_train = x_train[p], y_train[p]
index = 0
while index <= len(x_train) - batch_size:
yield x_train[index:index + batch_size], \
y_train[index:index + batch_size],
index += batch_size
def main(_):
# Horovod: initialize Horovod.
hvd.init()
# Keras automatically creates a cache directory in ~/.keras/datasets for
# storing the downloaded MNIST data. This creates a race
# condition among the workers that share the same filesystem. If the
# directory already exists by the time this worker gets around to creating
# it, ignore the resulting exception and continue.
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets')
if not os.path.exists(cache_dir):
try:
os.mkdir(cache_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(cache_dir):
pass
else:
raise
# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = \
keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it
# into (-1, 784) to feed into our network. Also, need to normalize the
# features between 0 and 1.
x_train = np.reshape(x_train, (-1, 784)) / 255.0
x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model...
with tf.name_scope('input'):
image = tf.placeholder(tf.float32, [None, 784], name='image')
label = tf.placeholder(tf.float32, [None], name='label')
predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)
lr_scaler = hvd.size()
# By default, Adasum doesn't need scaling when increasing batch size. If used with NCCL,
# scale lr by local_size
if args.use_adasum:
lr_scaler = hvd.local_size() if hvd.nccl_built() else 1
# Horovod: adjust learning rate based on lr_scaler.
opt = tf.train.AdamOptimizer(0.001 * lr_scaler)
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, op=hvd.Adasum if args.use_adasum else hvd.Average,
gradient_predivide_factor=args.gradient_predivide_factor)
global_step = tf.train.get_or_create_global_step()
train_op = opt.minimize(loss, global_step=global_step)
hooks = [
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
hvd.BroadcastGlobalVariablesHook(0),
# Horovod: adjust number of steps based on number of GPUs.
tf.train.StopAtStepHook(last_step=20000 // hvd.size()),
tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss},
every_n_iter=10),
]
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting them.
checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
training_batch_generator = train_input_generator(x_train,
y_train, batch_size=100)
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
hooks=hooks,
config=config) as mon_sess:
while not mon_sess.should_stop():
# Run a training step synchronously.
image_, label_ = next(training_batch_generator)
mon_sess.run(train_op, feed_dict={image: image_, label: label_})
if __name__ == "__main__":
tf.app.run()
PyTorch + Horovod
运行
hvd.init()
将每个 GPU 固定到单个进程。
每个进程的典型设置为一个 GPU,请将此设置为本地排名。服务器上的第一个进程将分配第一个 GPU,第二个进程将分配第二个 GPU,等等。
if torch.cuda.is_available():
torch.cuda.set_device(hvd.local_rank())按线程数缩放学习率。
同步分布式培训中的有效批次大小按工作人员数量进行缩放。学习率的提高弥补了批次大小的增加。
将优化器包装在
hvd.DistributedOptimizer
分布式优化器将梯度计算委托给原始优化器,使用allduce或all 聚集来平均梯度,然后应用这些平均梯度。将初始变量状态从排名 0 广播到所有其他进程:
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
在使用随机权重开始训练或从检查点恢复训练时,这对于确保所有工作人员的一致初始化是必要的。
修改代码以仅保存工作线程 0 上的检查点,以防止其他工作人员损坏它们。
通过使用 保护模型检查点代码,实现此目的。hvd.rank() != 0
import torch
import horovod.torch as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
torch.cuda.set_device(hvd.local_rank())
# Define dataset...
train_dataset = ...
# Partition dataset among workers using DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# Build model...
model = ...
model.cuda()
optimizer = optim.SGD(model.parameters())
# Add Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# Broadcast parameters from rank 0 to all other processes.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
epoch, batch_idx * len(data), len(train_sampler), loss.item()))
完整代码:
import argparse
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import torch.utils.data.distributed
import horovod.torch as hvd
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42, metavar='S',
help='random seed (default: 42)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
help='apply gradient predivide factor in optimizer (default: 1.0)')
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
def train(epoch):
model.train()
# Horovod: set epoch to sampler for shuffling.
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
# Horovod: use train_sampler to determine the number of examples in
# this worker's partition.
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_sampler),
100. * batch_idx / len(train_loader), loss.item()))
def metric_average(val, name):
tensor = torch.tensor(val)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
def test():
model.eval()
test_loss = 0.
test_accuracy = 0.
for data, target in test_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, size_average=False).item()
# get the index of the max log-probability
pred = output.data.max(1, keepdim=True)[1]
test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()
# Horovod: use test_sampler to determine the number of examples in
# this worker's partition.
test_loss /= len(test_sampler)
test_accuracy /= len(test_sampler)
# Horovod: average metric values across workers.
test_loss = metric_average(test_loss, 'avg_loss')
test_accuracy = metric_average(test_accuracy, 'avg_accuracy')
# Horovod: print output only on first rank.
if hvd.rank() == 0:
print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
test_loss, 100. * test_accuracy))
if __name__ == '__main__':
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
# Horovod: initialize library.
hvd.init()
torch.manual_seed(args.seed)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(1)
kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
# When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
# issues with Infiniband implementations that are not fork-safe
if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
kwargs['multiprocessing_context'] = 'forkserver'
train_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
test_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
# Horovod: use DistributedSampler to partition the test data.
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
sampler=test_sampler, **kwargs)
model = Net()
# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1
if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = hvd.local_size()
# Horovod: scale learning rate by lr_scaler.
optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
momentum=args.momentum)
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters(),
compression=compression,
op=hvd.Adasum if args.use_adasum else hvd.Average,
gradient_predivide_factor=args.gradient_predivide_factor)
for epoch in range(1, args.epochs + 1):
train(epoch)
test()
训练命令
开始训练,指定worker个数:
# run training with 4 GPUs on a single machine
$ horovodrun -np 4 python train.py
# run training with 8 GPUs on two machines (4 GPUs each)
$ horovodrun -np 8 -H hostname1:4,hostname2:4 python train.py
参考
https://github.com/horovod/horovod
欢迎添加笔者微信加群讨论