horovod tensorflow 分布式多gpu

概念
rank is your index within the entire ring, local_rank is your index within your node. For example, you have 4 nodes and 4 GPUs each node, so you spawn 16 workers. Every worker will have a rank [0, 15], and every worker will have a local_rank [0, 3]. You use local_rank for GPU pinning because there’s typically one GPU available on the node per process. It wouldn’t make sense to use rank here because rank could be 10, but you only have 4 GPUs so there is no GPU 10.

# 在其他import前引入
try:import horovod.tensorflow as hvdhvd.init()
except Exception as e:hvd = Noneprint('no horovod')# 打印信息
if hvd:tf.logging.info('Total workers: {}, local workers: {}'.format(hvd.size(), hvd.local_size()))tf.logging.info('Global rank: {}, local rank: {}'.format(hvd.rank(), hvd.local_rank()))# 数据集读取配置:对数据集进行分片, 不同进程读取不同子集。
d = tf.data.TFRecordDataset(input_file)
if is_training:if hvd is not None:d = d.shard(hvd.size(), hvd.rank())d = d.shuffle(buffer_size=100)d = d.repeat()# 加载权重配置:只对第一个rank载入权重
if init_checkpoint and is_training and (hvd is None or hvd.rank()==0):for init_file in init_checkpoint.split(","):assignment_map, tmp_init_map = get_assignment_map_from_checkpoint(tvars, init_file, extra_load_var)tf.train.init_from_checkpoint(init_file, assignment_map)initialized_variable_names.update(tmp_init_map)# 学习率调整:
if hvd:learning_rate = learning_rate * hvd.size()# 分布式优化器配置:使用 ring-allreduce 平均梯度
if hvd is not None:# we enable compression only for fp16from horovod.tensorflow.compression import Compressionif use_fp16:compression = Compression.fp16else:compression = Compression.noneoptimizer = hvd.DistributedOptimizer(optimizer, sparse_as_dense=True,compression=compression)# 配置每个进程模型迭代次数
if FLAGS.do_train:# train_examples = processor.get_train_examples(FLAGS.data_dir, FLAGS.img_dir)num_train_steps = int(train_num / FLAGS.train_batch_size * FLAGS.num_train_epochs)# len(train_examples) / FLAGS.train_batch_size * FLAGS.num_train_epochs)num_warmup_steps = int(num_train_steps * FLAGS.warmup_proportion)if hvd:num_train_steps = num_train_steps // hvd.size()model_fn = model_fn_builder(bert_config=bert_config,num_labels=len(label_list),init_checkpoint=FLAGS.init_checkpoint,learning_rate=FLAGS.learning_rate,num_train_steps=num_train_steps,num_warmup_steps=num_warmup_steps)# GPU config GPU配置:使用local rank分配当前机器上当前进程可视gpu
run_config = tf.ConfigProto()
# train_params.get('gpu_allow_growth', False)
run_config.gpu_options.allow_growth = True
run_config.allow_soft_placement = Trueif hvd:run_config.gpu_options.visible_device_list = str(hvd.local_rank())if FLAGS.use_xla:run_config.graph_options.optimizer_options.global_jit_level = tf.OptimizerOptions.ON_1# checkpoint配置:只对第一个保存模型
save_checkpoints_steps = FLAGS.save_checkpoints_steps if hvd is None or hvd.rank() == 0 else None
estimator = tf.estimator.Estimator(model_fn=model_fn,model_dir=FLAGS.output_dir,config=tf.estimator.RunConfig(save_checkpoints_steps=save_checkpoints_steps,save_checkpoints_secs=None,keep_checkpoint_every_n_hours=2,log_step_count_steps=400,session_config=run_config))# 模型训练hook配置:将变量从第一个流程向其他流程传播,以实现一致性初始化。
if FLAGS.do_train and hvd is not None:training_hook = [hvd.BroadcastGlobalVariablesHook(0)]
else:training_hook = []
estimator.train(input_fn=train_input_fn, max_steps=num_train_steps,hooks=training_hook)


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部