python - Tensorflow:通过协调器停止线程似乎不起作用?

标签 python multithreading queue tensorflow

关注 standard prefetching queue ,我通过一些额外的验证代码扩展了上述示例,请参阅随附的代码。也就是说,每第 i 个训练步骤,都会在验证集(在我的例子中是几个)上评估学习模型。验证集无法通过队列输入,因此一种可能的想法是使用共享变量构建额外的推理图。

这在某种程度上有效,但训练完成后,程序挂起(在coord.join()处)并最终抛出异常:协调器停止,线程仍在运行:... 然后异步加载线程也会抛出异常。协调器异常可以通过 try/except 子句来解决(参见下面的代码),但是异步线程仍然会抛出异常(这不会妨碍主程序,尽管,但在我看来不应该发生——它有 while 循环应该告诉它停止)。

有趣的是,如果训练是在没有运行任何评估代码的情况下完成的(即 if (it+1)%stop == 0: 注释掉之后的 block ),则 coord. join() 根本不挂起。

我的问题:我在这里做错了什么?似乎 .request_stop() 没有做我希望它应该做的事情?

import tensorflow as tf
import numpy as np

# some parameters
btsz = 100 # batch size
some_shape = 20 # size of one input (no of dims)
iters = 1000 # that many single training steps
ith = 10 # run validation sets every so often
# datastores (sort of complex backends, SQL like)
ds_train = ... # the one for training
ds_val1, ds_val2, ds_val3 = ... # having the validation data

def async_load(coord, session, queue, datastore,
               tf_input, tf_target):
    """
    Feed queue in async way. Inputs can be extracted
    from datastore only one row at a time.
    """
    while not coord.should_stop():
        input = extract_one_input_as_numpy(datastore)
        target = extract_numpy_from(datastore) # either 0 or 1
        session.run(queue, feed_dict={tf_input: input, tf_target: target})

def evaluate(sess, datastore, tf_input, tf_target, tf_loss, btsz):
    """
    Evaluate current model (represented as tf_loss) on a datastore.
    """
    loss = []
    for i in xrange(something):
        input_batch = collect_btsz_many_single examples(datastore)
        target_batch = same_for_targets(datastore)
        tmp, = sess.run([tf_loss], feed_dict={tf_input:input_batch, tf_target:target_batch})
        loss.append(tmp)
    return np.mean(loss)

def log_reg(input, target, W, b):
    """
    Simple logistic regression model.
    """
    y = tf.matmul(input, W) + b
    y_bin = tf.to_int32(y > 0)

    t_bin = tf.to_int32(target > 0)

    loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y, targets=target))
    correct_prediction = tf.equal(y_bin, t_bin)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
    return y, loss, accuracy

with tf.Session() as sess:
    # Placeholders to represent one input/target pair from a data store.
    ds_inpt = tf.placeholder(tf.float32, shape=[some_shape])
    ds_trgt = tf.placeholder(tf.float32, shape=[])

    queue = tf.FIFOQueue(capacity=10000, dtypes=[tf.float32, tf.float32], 
                  shapes=[[], [some_shape], shared_name="FIFO", name="FIFO")

    # enqueuing, this will be used in the async loading.
    enqueue_op = queue.enqueue([ds_trgt, ds_inpt])

    # dequeue from queue q, with batch size btsz
    q_trgt, q_inpt = queue.dequeue_many(btsz)

    # Paramters for Logistic Regression
    # two functions that build shared variables and initialize these
    W = weight_variable([some_shape, 1])
    b = bias_variable([1])

    # training model, feed from dequeuing the async queue
    y, loss, accuracy = log_reg(input=q_inpt, target=q_trgt, W=W, b=b)

    train_step = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss)

    # inputs for validation models
    val_inpt = tf.placeholder(tf.float32, shape=[btsz, some_shape])
    val_trgt = tf.placeholder(tf.float32, shape=[btsz])
    # validation model
    val_y, val_loss, val_accuracy = log_reg(input=val_inpt, target=val_trgt, W=W, b=b)

    sess.run(tf.initialize_all_variables())
    try:
        coord = tf.train.Coordinator()
        # Start a thread to enqueue data asynchronously, and hide I/O latency.
        t = threading.Thread(target=async_load,
                              args=(coord, sess, enqueue_op, ds_train 
                                    ds_inpt, ds_trgt))
        t.start()

        # collect loss/accuracy for training
        # and losses for validation/test sets.
        tr_loss = []
        tr_acc = []
        v_loss = []

        for it in xrange(iters):
            _, _loss, _acc = sess.run([train_step, loss, accuracy])
            tr_loss.append(_loss)
            tr_acc.append(_acc)
            if (it+1)%stop == 0:
                # run trained model on validation set 1
                tmp = evaluate(sess=sess, data=ds_val1,
                               tf_inpt=val_inpt, tf_trgt=val_trgt,
                               tf_loss=val_loss, btsz)
                v_loss.append(tmp)
                # run trained model on validation set 2
                tmp = evaluate(sess=sess, data=ds_val2,
                               tf_inpt=val_inpt, tf_trgt=val_trgt,
                               tf_loss=val_loss, btsz)
                v_loss.append(tmp)
                # run trained model on validation set 3
                tmp = evaluate(sess=sess, data=ds_val3,
                               tf_inpt=val_inpt, tf_trgt=val_trgt,
                               tf_loss=val_loss, btsz)
                v_loss.append(tmp)
        coord.request_stop()
        coord.join([t])
    except RuntimeError as rte:
        print("Caught {}".format(rte))
# Clear everything!
tf.reset_default_graph()

最佳答案

您的代码中存在竞争条件。如果发生以下事件,运行 async_load() 的线程将永远阻塞:

  1. async_load() 调用 coord.should_stop() 并返回 False
  2. async_load() 调用 session.run(queue, ...) 但队列已满,因此调用会无限期阻塞。
  3. 主线程调用coord.request_stop()
  4. 主线程调用 coord.join([t]),并且由于 (2) 而永远阻塞。

避免这种情况的一种方法是创建 queue.close(cancel_pending_enqueues=True) op,并在调用 coord.request_stop() 之前在主线程中运行它。这将解除对 async_load() 线程的阻塞,并使 coord.join([t]) 返回。

关于python - Tensorflow:通过协调器停止线程似乎不起作用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36210162/

相关文章:

python - 2个句子的语义相似性度量

python - 用python合并两个数据框

iOS iPhone 寻求将带时间戳的整数快速记录到数据文件实现

java - 如何从另一个线程更新 JList?

c# - 生产者-消费者队列设计有什么问题?

python - 如何在pandas中存储mongoDB嵌套文档而不重复

python - Tensorboard:文件系统方案 gs 未实现

Java 在使用前锁定变量赋值。为什么?

python - python中的出列函数

php - 从非常慢的外部数据源获取大量数据