我对 tensorflow 中的分布式训练过程感到困惑。
我认为 tensorflow 将一个 batch_size 的数据提供给工作人员,然后工作人员更新 ps 服务器,这是对的吗?
但是在训练时,我注意到日志中的步骤编号可能很奇怪。
如果我只有 2 个 worker ,我认为正确的流程应该是这样的
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 200 xxxxxxx
[worker2] step 300 xxxxxxx
.....
每个 worker 都应该打印不同的步骤来记录。
实际上,日志如下:
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 100 xxxxxxx
[worker2] step 200 xxxxxxx
[worker1] step 300 xxxxxxx
...
为什么 worker1 不打印步骤 200?
我对工作分配感到困惑。
tensorflow 如何进行分布训练?
首席 worker 将数据拆分为 batch_size ,然后给一个 worker 一个批次,然后更新 ps 服务器?
或者,每个工作人员都会运行整个数据,并更新 ps 服务器?
如果可能,请提供一个最小的可重现示例(我们通常没有时间阅读数百行代码)
``
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Read TFRecords files for training
filename_queue = tf.train.string_input_producer(
tf.train.match_filenames_once(FLAGS.train),
num_epochs=epoch_number)
serialized_example = read_and_decode(filename_queue)
batch_serialized_example = tf.train.shuffle_batch(
[serialized_example],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
features = tf.parse_example(
batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
batch_labels = features["label"]
batch_ids = features["ids"]
batch_values = features["values"]
# Read TFRecords file for validatioin
validate_filename_queue = tf.train.string_input_producer(
tf.train.match_filenames_once(FLAGS.eval),
num_epochs=epoch_number)
validate_serialized_example = read_and_decode(validate_filename_queue)
validate_batch_serialized_example = tf.train.shuffle_batch(
[validate_serialized_example],
batch_size=validate_batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
validate_features = tf.parse_example(
validate_batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
validate_batch_labels = features["label"]
validate_batch_ids = features["ids"]
validate_batch_values = features["values"]
logits = inference(batch_ids, batch_values)
batch_labels = tf.to_int64(batch_labels)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits,
batch_labels)
loss = tf.reduce_mean(cross_entropy, name='loss')
print("Use the optimizer: {}".format(FLAGS.optimizer))
optimizer = tf.train.FtrlOptimizer(learning_rate)
global_step = tf.Variable(0, name='global_step', trainable=False)
train_op = optimizer.minimize(loss, global_step=global_step)
# Initialize saver and summary
steps_to_validate = FLAGS.steps_to_validate
init_op = tf.initialize_all_variables()
saver = tf.train.Saver(max_to_keep = 2)
keys_placeholder = tf.placeholder("float")
keys = tf.identity(keys_placeholder)
tf.add_to_collection("inputs", json.dumps({'key': keys_placeholder.name}))
tf.add_to_collection("outputs", json.dumps({'key': keys.name,
'softmax': inference_softmax.name,
'prediction': inference_op.name}))
summary_op = tf.merge_all_summaries()
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir="./train_process/",
init_op=init_op,
summary_op=summary_op,
saver=saver,
global_step=global_step,
save_model_secs=60)
# Create session to run graph
with sv.managed_session(server.target) as sess:
while not sv.should_stop():
# Get coordinator and run queues to read data
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord, sess=sess)
start_time = datetime.datetime.now()
try:
while not coord.should_stop():
_, loss_value, step = sess.run([train_op, loss, global_step])
if step % steps_to_validate == 0:
accuracy_value, auc_value, summary_value = sess.run(
[accuracy, auc_op, summary_op])
end_time = datetime.datetime.now()
print("[{}] Task: {}, Step: {}, loss: {}, accuracy: {}, auc: {}".format(
end_time - start_time,
FLAGS.task_index,
step, loss_value, accuracy_value,
auc_value))
start_time = end_time
except tf.errors.OutOfRangeError:
print("Done training after reading all data")
finally:
coord.request_stop()
print("coord stopped")
# Wait for threads to exit
coord.join(threads)
``
日志或其他有用的输出
(如果日志较大,请上传附件或提供链接)。
``
[0:00:17.115814] Task: 0, Step: 74600, loss: 0.303285002708, accuracy: 0.910000026226, auc: 0.946377456188
[0:00:03.804889] Task: 1, Step: 74700, loss: 0.287385582924, accuracy: 0.879999995232, auc: 0.946395516396
[0:00:03.778589] Task: 0, Step: 74800, loss: 0.247096762061, accuracy: 0.860000014305, auc: 0.946370542049
[0:00:03.772320] Task: 1, Step: 74900, loss: 0.264987647533, accuracy: 0.899999976158, auc: 0.946406364441
[0:00:03.795459] Task: 0, Step: 75000, loss: 0.228719010949, accuracy: 0.899999976158, auc: 0.946437120438
[0:00:01.902293] Task: 1, Step: 75000, loss: 0.217391207814, accuracy: 0.910000026226, auc: 0.946473121643
[0:00:01.942055] Task: 1, Step: 75100, loss: 0.284583866596, accuracy: 0.889999985695, auc: 0.946496844292
[0:00:03.860608] Task: 0, Step: 75200, loss: 0.273199081421, accuracy: 0.850000023842, auc: 0.946503221989
[0:00:03.800881] Task: 1, Step: 75300, loss: 0.189931258559, accuracy: 0.930000007153, auc: 0.946559965611
``
最佳答案
除了 HowTo 之外没有真正的官方文档因此,通过研究示例来弄清楚事情是如何运作的一个好方法。
要理解的基本概念是有 3 种 tensorflow 过程。
Session()
)或远程主机( Session("grpc://...")
)并发出 session.run
调用。 with tf.device(job:worker/task:0):
, 块,那么该块中的计算应该在 task:0 当您使用
server = tf.train.Server
创建新服务器时,启动的进程既是worker又是master,但是了解调试的区别很有用。分布式 TF 的最简单示例是当您有一个客户端时,它启动一个进程内 master 和多个 worker。这是一个这样的 example .在这个用法中,与非分布式版本的主要区别在于你做
with tf.device("worker1")
而不是 tf.device("gpu1")
告诉它在 worker1
上执行图的那部分当您有多个客户端时,情况会变得更加复杂,例如“图间复制”。在参数服务器示例中,您有多个并行训练循环,其中每个循环对应一个单独的客户端,该客户端是一个发出运行调用的 Python 进程。要查看运算符(operator)实际位于哪个 worker ,您可以查看
with tf.device
注释。在您的示例中,您没有明确的
with.device("job:worker/task")
代码段中的块,但这部分是由 tf.device(tf.train.replica_device_setter(
完成的.本质上,代码运行 replica_device_setter
,而不是为块中的所有操作设置固定设备。为每个操作生成设备以放置它。它将所有变量放在 /job:ps/task
上 worker ,以及当前 worker 的其余操作。 replica_device_setter
的代码随着时间的推移变得有点复杂,但你可以使用它的更简单的实现,以获得与下面相同的效果def simple_setter(ps_device="/job:ps/task:0"):
def _assign(op):
node_def = op if isinstance(op, tf.NodeDef) else op.node_def
if node_def.op == "Variable":
return ps_device
else:
return "/job:worker/task:%d" % (FLAGS.task)
return _assign
...
with tf.device(simple_setter):
...
当你运行它时,每个 python 进程将创建略有不同版本的图形,除了变量节点,它们在每个进程中看起来都相同(检查 tf.get_default_graph().as_graph_def())
当您有多个客户端运行训练循环时,一个问题是——谁执行需要为所有客户端完成一次的任务?例如,有人需要为所有变量运行初始化程序。你可以把
sess.run(tf.initialize_all_variables...)
在客户端主体中,但多个客户端并行运行,这意味着操作初始化运行不止一次。因此,解决方案是指定一名 worker 作为“首席” worker ,并且仅让该 worker 运行操作。此外,
worker
之间没有内在的区别。和 ps
设备——这只是一个约定,将变量分配给 ps
设备和操作分配给 worker
设备。您也可以只拥有 worker
设备,并且版本为 replica_device_setter
将变量放到第 0 个 worker 。这是一个准系统 example与
m
工作人员更新分片的变量 n
PS 任务,它使用显式设备分配而不是 replica_device_setter总而言之,在您的情况下
replica_device_setter
确保您的 global_step
是一个存储在 ps
上的变量工作人员,因此使此变量在您的所有训练循环中共享。至于为什么你得到相同的global_step
在两个 worker 中 - 您的图表中没有任何强制global_step
递增后读取。所以如果你运行 sess.run([increment_global_step, fetch_global_step])
在两个不同的工作人员上并行运行,您可能会看到worker 0: 0
worker 1: 0
worker 0: 2
worker 1: 2
etc
关于tensorflow - tensorflow分布式进程中的任务分配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41067398/