在分布式训练期间,我想在每个时期后同步,对首席工作人员进行一些计算,并根据这些计算继续或停止训练。我需要一个障碍才能做到这一点。
我在文档中没有看到任何类似的内容,因此我实现了基于队列的解决方案(类似于分布式训练中梯度的存储和应用方式):
def build_barrier(tasks, task_index, barrier_name):
queues = []
for i, task in enumerate(tasks):
with tf.device('%s/cpu:0' % task):
with tf.name_scope(barrier_name):
queues.append(
tf.FIFOQueue(
len(tasks),
(tf.float32),
shapes=(()),
name=str(i),
shared_name=str(i)))
with tf.control_dependencies([queue.enqueue(1.) for queue in queues]):
return queues[task_index].dequeue_many(len(tasks))
这个想法是为每个工作人员创建一个队列。对于“信号”,我在每个队列中推送一个 token ,对于“加入”,我从相应的队列中取出如此多的 token ,我想同步多少个任务。
问题是:这是正确的方法还是有更好的方法?
最佳答案
您的解决方案与 SyncReplicasOptimizer 非常相似。在SyncReplicasOptimizer中,它使用同步 token 队列来模拟屏障,并为每个变量使用累加器来累积并平均梯度更新。这是一种非常典型的批量同步并行,同时它还有在 Tensorflow 中实现陈旧同步并行的额外工作。
此外,Tensorflow还提供了Barrier最新版本,您可以查看更多信息。
关于concurrency - 在分布式 tensorflow 中制作屏障的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39638468/