tensorflow - 多个队列导致TF锁定

标签 tensorflow

我尝试使用多个队列进行读取和批处理,但这会导致 TF 偶尔锁定。这是一些示例代码:

import tensorflow as tf

coordinator = tf.train.Coordinator()

file_queue = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader = tf.TextLineReader()
key, serialized_example = reader.read(file_queue)
keys, serialized_examples = tf.train.batch([key, serialized_example], 10)

# repeat the code snippet below multiple times, in my case 4
file_queue_i = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader_i = tf.TextLineReader()
key_i, serialized_example_i = reader.read(file_queue_i)

initializer = tf.initialize_all_variables()

session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, intra_op_parallelism_threads=1))
session.run(initializer)

threads = tf.train.start_queue_runners(sess=session, coord=coordinator)

session.run(keys)

当我实际尝试运行某些东西时,TensorFlow 有时会在最后一行锁定。然而,使用上面的代码很难重现这种行为。在 1000 多次运行中,我只能让它挂起一次。在我的真实代码中,实际的阅读器更复杂,它使用 TFRecords,但其他一切都是一样的。那里有 2/3 的时间挂起,总共有 3 个队列。如果有 5 个队列,它似乎永远不会运行,如果有 1 个队列,它似乎永远不会挂起。这是在 0.6 的 Mac 上。我有一个运行 Ubuntu 的不同系统,也是 0.6,我遇到了同样的问题(尽管 Ubuntu 系统上锁定的频率要高得多)。

更新:对上述代码锁定频率的更准确估计是 5,000 次试验中有 1 次锁定。

最佳答案

这可能是由于没有足够的操作线程造成的。如果您有一个队列运行程序 1 依赖于队列运行程序 2 的工作,并且异步运行它们,那么您将需要至少两个操作线程(通过 inter_op_parallelism_threads 设置),以保证取得进展.

就您而言,您的队列运行程序正在填充 batch 线程,具体取决于 string_input_ Producer 队列不为空。如果与 string_input_ Producer 队列关联的队列运行器首先运行,那么一切都很好。但是,如果首先调度 batch 队列运行程序,它将陷入 string_input_ Producer.dequeue 操作中,等待 string_input_ Producer 队列获取一些文件名。由于 TensorFlow op 线程池中只有 1 个线程,因此 string_input_ Producerenqueue op 永远不会分配一个线程来完成(即执行其 Compute方法)

最简单的解决方案是至少拥有与并发 run 调用一样多的操作线程(即队列数 + 1)。如果您确实想将自己限制在一个线程中,则可以使用主线程同步预加载文件名队列文件名。

  coordinator = tf.train.Coordinator()

  import glob
  files = glob.glob('/temp/pipeline/*')
  if FLAGS.preload_filenames:
    file_queue = tf.FIFOQueue(capacity=len(files), dtypes=tf.string)
    enqueue_val = tf.placeholder(dtype=tf.string)
    enqueue_op = file_queue.enqueue(enqueue_val)
  else:
    file_queue = tf.train.string_input_producer(files)

  reader = tf.TextLineReader()
  key, serialized_example = reader.read(file_queue)
  keys, serialized_examples = tf.train.batch([key, serialized_example], 5,
                                             capacity=10)

  initializer = tf.initialize_all_variables()

  session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1,
                                             intra_op_parallelism_threads=1))
  print 'running initializer'
  session.run(initializer)

  if FLAGS.preload_filenames:
    print 'preloading filenames'
    for fn in files:
      session.run([enqueue_op], feed_dict={enqueue_val: fn})
      print 'size - ', session.run([file_queue.size()])
    session.run([file_queue.close()])

  print 'starting queue runners'
  threads = tf.train.start_queue_runners(sess=session, coord=coordinator)
  print 'about to run session'
  print session.run(keys)

如果您有多个文件名队列,则上面的代码将需要一些封装。或者,这里有一个 hacky 解决方法,如果所有 input_ Producer 队列都有 prebuffer_amount 文件名

,则该解决方法应该有效
queue_runners=tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
filename_queue_runners=[qr for qr in queue_runners if 'input_producer' in qr.name]
for qr in filename_queue_runners:
  for k in prebuffer_amount:
    sess.run(qr._enqueue_ops[0])

关于tensorflow - 多个队列导致TF锁定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35414009/

相关文章:

python - AttributeError : can't set attribute. 分层注意网络

python - 当我尝试在 Tensorflow 中调整图像大小时如何修复 "TypeError: x and y must have the same dtype, got tf.uint8 != tf.float32"

python - Keras:约束链接输入和输出

python - Tensorflow 无法在 PyCharm 中工作 - DLL 加载失败 : The specific module could not be found

Tensorflow 2.0 如何在卷积层之间共享参数?

python - 在 Tensorflow 中删除张量的维度

python - 'max_pooling2d_8/MaxPool' 1 减 3 导致的负维度大小

python - 使用 Keras 在滑动窗口中评估函数

python - 如何从 tensorflow 中的 RNN 模型中提取细胞状态和隐藏状态?

python - Keras - 没有 ImageDataGenerator 的单像素分类