tensorflow - tensorflow分布式进程中的任务分配

标签 tensorflow

我对 tensorflow 中的分布式训练过程感到困惑。

我认为 tensorflow 将一个 batch_size 的数据提供给工作人员,然后工作人员更新 ps 服务器,这是对的吗?

但是在训练时,我注意到日志中的步骤编号可能很奇怪。

如果我只有 2 个 worker ,我认为正确的流程应该是这样的

[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 200 xxxxxxx
[worker2] step 300 xxxxxxx

.....
每个 worker 都应该打印不同的步骤来记录。

实际上,日志如下:
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 100 xxxxxxx
[worker2] step 200 xxxxxxx
[worker1] step 300 xxxxxxx

...
为什么 worker1 不打印步骤 200?

我对工作分配感到困惑。

tensorflow 如何进行分布训练?
首席 worker 将数据拆分为 batch_size ,然后给一个 worker 一个批次,然后更新 ps 服务器?
或者,每个工作人员都会运行整个数据,并更新 ps 服务器?

如果可能,请提供一个最小的可重现示例(我们通常没有时间阅读数百行代码)

``
with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
    # Read TFRecords files for training
    filename_queue = tf.train.string_input_producer(
        tf.train.match_filenames_once(FLAGS.train),
        num_epochs=epoch_number)
    serialized_example = read_and_decode(filename_queue)
    batch_serialized_example = tf.train.shuffle_batch(
        [serialized_example],
        batch_size=batch_size,
        num_threads=thread_number,
        capacity=capacity,
        min_after_dequeue=min_after_dequeue)
    features = tf.parse_example(
        batch_serialized_example,
        features={
            "label": tf.FixedLenFeature([], tf.float32),
            "ids": tf.VarLenFeature(tf.int64),
            "values": tf.VarLenFeature(tf.float32),
        })
    batch_labels = features["label"]
    batch_ids = features["ids"]
    batch_values = features["values"]

    # Read TFRecords file for validatioin
    validate_filename_queue = tf.train.string_input_producer(
        tf.train.match_filenames_once(FLAGS.eval),
        num_epochs=epoch_number)
    validate_serialized_example = read_and_decode(validate_filename_queue)
    validate_batch_serialized_example = tf.train.shuffle_batch(
        [validate_serialized_example],
        batch_size=validate_batch_size,
        num_threads=thread_number,
        capacity=capacity,
        min_after_dequeue=min_after_dequeue)
    validate_features = tf.parse_example(
        validate_batch_serialized_example,
        features={
            "label": tf.FixedLenFeature([], tf.float32),
            "ids": tf.VarLenFeature(tf.int64),
            "values": tf.VarLenFeature(tf.float32),
        })
    validate_batch_labels = features["label"]
    validate_batch_ids = features["ids"]
    validate_batch_values = features["values"]
    logits = inference(batch_ids, batch_values)
    batch_labels = tf.to_int64(batch_labels)
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits,
                                                                   batch_labels)
    loss = tf.reduce_mean(cross_entropy, name='loss')

    print("Use the optimizer: {}".format(FLAGS.optimizer))

    optimizer = tf.train.FtrlOptimizer(learning_rate)

    global_step = tf.Variable(0, name='global_step', trainable=False)
    train_op = optimizer.minimize(loss, global_step=global_step)




    # Initialize saver and summary
    steps_to_validate = FLAGS.steps_to_validate
    init_op = tf.initialize_all_variables()

    saver = tf.train.Saver(max_to_keep = 2)
    keys_placeholder = tf.placeholder("float")
    keys = tf.identity(keys_placeholder)
    tf.add_to_collection("inputs", json.dumps({'key': keys_placeholder.name}))
    tf.add_to_collection("outputs", json.dumps({'key': keys.name,
                                                'softmax': inference_softmax.name,
                                                'prediction': inference_op.name}))

    summary_op = tf.merge_all_summaries()


sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                         logdir="./train_process/",
                         init_op=init_op,
                         summary_op=summary_op,
                         saver=saver,
                         global_step=global_step,
                         save_model_secs=60)

# Create session to run graph
with sv.managed_session(server.target) as sess:

    while not sv.should_stop():
        # Get coordinator and run queues to read data
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord, sess=sess)

        start_time = datetime.datetime.now()

        try:
            while not coord.should_stop():
                _, loss_value, step = sess.run([train_op, loss, global_step])
                if step % steps_to_validate == 0:
                    accuracy_value, auc_value, summary_value = sess.run(
                        [accuracy, auc_op, summary_op])
                    end_time = datetime.datetime.now()
                    print("[{}] Task: {}, Step: {}, loss: {}, accuracy: {}, auc: {}".format(
                        end_time - start_time,
                        FLAGS.task_index,
                        step, loss_value, accuracy_value,
                        auc_value))

                    start_time = end_time
        except tf.errors.OutOfRangeError:
            print("Done training after reading all data")
        finally:
            coord.request_stop()
            print("coord stopped")

        # Wait for threads to exit
        coord.join(threads)

``

日志或其他有用的输出

(如果日志较大,请上传附件或提供链接)。
``
[0:00:17.115814] Task: 0, Step: 74600, loss: 0.303285002708, accuracy: 0.910000026226, auc: 0.946377456188
[0:00:03.804889] Task: 1, Step: 74700, loss: 0.287385582924, accuracy: 0.879999995232, auc: 0.946395516396
[0:00:03.778589] Task: 0, Step: 74800, loss: 0.247096762061, accuracy: 0.860000014305, auc: 0.946370542049
[0:00:03.772320] Task: 1, Step: 74900, loss: 0.264987647533, accuracy: 0.899999976158, auc: 0.946406364441
[0:00:03.795459] Task: 0, Step: 75000, loss: 0.228719010949, accuracy: 0.899999976158, auc: 0.946437120438
[0:00:01.902293] Task: 1, Step: 75000, loss: 0.217391207814, accuracy: 0.910000026226, auc: 0.946473121643
[0:00:01.942055] Task: 1, Step: 75100, loss: 0.284583866596, accuracy: 0.889999985695, auc: 0.946496844292
[0:00:03.860608] Task: 0, Step: 75200, loss: 0.273199081421, accuracy: 0.850000023842, auc: 0.946503221989
[0:00:03.800881] Task: 1, Step: 75300, loss: 0.189931258559, accuracy: 0.930000007153, auc: 0.946559965611

``

最佳答案

除了 HowTo 之外没有真正的官方文档因此,通过研究示例来弄清楚事情是如何运作的一个好方法。

要理解的基本概念是有 3 种 tensorflow 过程。

  • 客户端——这是构建图形的 Python 进程,连接到本地主机( Session() )或远程主机( Session("grpc://...") )并发出 session.run调用。
  • 有 master,它是客户端连接到的进程,它确定如何在 worker 之间分配工作。
  • 有 worker ,它做实际工作。如果您的图表有 with tf.device(job:worker/task:0): , 块,那么该块中的计算应该在 task:0
  • 上执行

    当您使用 server = tf.train.Server 创建新服务器时,启动的进程既是worker又是master,但是了解调试的区别很有用。

    分布式 TF 的最简单示例是当您有一个客户端时,它启动一个进程内 master 和多个 worker。这是一个这样的 example .在这个用法中,与非分布式版本的主要区别在于你做 with tf.device("worker1")而不是 tf.device("gpu1")告诉它在 worker1 上执行图的那部分

    当您有多个客户端时,情况会变得更加复杂,例如“图间复制”。在参数服务器示例中,您有多个并行训练循环,其中每个循环对应一个单独的客户端,该客户端是一个发出运行调用的 Python 进程。要查看运算符(operator)实际位于哪个 worker ,您可以查看 with tf.device注释。

    在您的示例中,您没有明确的 with.device("job:worker/task")代码段中的块,但这部分是由 tf.device(tf.train.replica_device_setter( 完成的.本质上,代码运行 replica_device_setter,而不是为块中的所有操作设置固定设备。为每个操作生成设备以放置它。它将所有变量放在 /job:ps/task 上 worker ,以及当前 worker 的其余操作。 replica_device_setter 的代码随着时间的推移变得有点复杂,但你可以使用它的更简单的实现,以获得与下面相同的效果
    def simple_setter(ps_device="/job:ps/task:0"):
        def _assign(op):
            node_def = op if isinstance(op, tf.NodeDef) else op.node_def
            if node_def.op == "Variable":
                return ps_device
            else:
                return "/job:worker/task:%d" % (FLAGS.task)
        return _assign
     ...
    with tf.device(simple_setter):
        ...
    

    当你运行它时,每个 python 进程将创建略有不同版本的图形,除了变量节点,它们在每个进程中看起来都相同(检查 tf.get_default_graph().as_graph_def())

    当您有多个客户端运行训练循环时,一个问题是——谁执行需要为所有客户端完成一次的任务?例如,有人需要为所有变量运行初始化程序。你可以把 sess.run(tf.initialize_all_variables...)在客户端主体中,但多个客户端并行运行,这意味着操作初始化运行不止一次。因此,解决方案是指定一名 worker 作为“首席” worker ,并且仅让该 worker 运行操作。

    此外,worker 之间没有内在的区别。和 ps设备——这只是一个约定,将变量分配给 ps设备和操作分配给 worker设备。您也可以只拥有 worker设备,并且版本为 replica_device_setter将变量放到第 0 个 worker 。

    这是一个准系统 examplem工作人员更新分片的变量 n PS 任务,它使用显式设备分配而不是 replica_device_setter

    总而言之,在您的情况下 replica_device_setter确保您的 global_step是一个存储在 ps 上的变量工作人员,因此使此变量在您的所有训练循环中共享。至于为什么你得到相同的global_step在两个 worker 中 - 您的图表中没有任何强制global_step递增后读取。所以如果你运行 sess.run([increment_global_step, fetch_global_step])在两个不同的工作人员上并行运行,您可能会看到
    worker 0: 0
    worker 1: 0
    worker 0: 2
    worker 1: 2
    etc
    

    关于tensorflow - tensorflow分布式进程中的任务分配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41067398/

    相关文章:

    python - Tensorflow LSTM RNN 输出激活函数

    python - 如何保存和恢复在 TensorFlow python 中训练的 DNNClassifier;虹膜示例

    python - Keras:使用完全相同的数据和架构,训练性能会有所不同。唯一的区别是使用 .Sequential() 或 .Model()

    python - 如何在训练期间调整 gpu 批量大小?

    python - 凯拉斯/ tensorflow : Combined Loss function for single output

    TensorFlow:我的(广义)骰子损失实现有什么问题?

    python - 从 Tensorflow 中的张量中提取随机切片

    TensorFlow 没有属性 "with_dependencies"

    python - Tensorflow,预测值概率 (ROI)

    python - 如何获取 tensorflow 图中每个节点的输入形状?