我尝试使用多个队列进行读取和批处理,但这会导致 TF 偶尔锁定。这是一些示例代码:
import tensorflow as tf
coordinator = tf.train.Coordinator()
file_queue = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader = tf.TextLineReader()
key, serialized_example = reader.read(file_queue)
keys, serialized_examples = tf.train.batch([key, serialized_example], 10)
# repeat the code snippet below multiple times, in my case 4
file_queue_i = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader_i = tf.TextLineReader()
key_i, serialized_example_i = reader.read(file_queue_i)
initializer = tf.initialize_all_variables()
session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, intra_op_parallelism_threads=1))
session.run(initializer)
threads = tf.train.start_queue_runners(sess=session, coord=coordinator)
session.run(keys)
当我实际尝试运行某些东西时,TensorFlow 有时会在最后一行锁定。然而,使用上面的代码很难重现这种行为。在 1000 多次运行中,我只能让它挂起一次。在我的真实代码中,实际的阅读器更复杂,它使用 TFRecords,但其他一切都是一样的。那里有 2/3 的时间挂起,总共有 3 个队列。如果有 5 个队列,它似乎永远不会运行,如果有 1 个队列,它似乎永远不会挂起。这是在 0.6 的 Mac 上。我有一个运行 Ubuntu 的不同系统,也是 0.6,我遇到了同样的问题(尽管 Ubuntu 系统上锁定的频率要高得多)。
更新:对上述代码锁定频率的更准确估计是 5,000 次试验中有 1 次锁定。
最佳答案
这可能是由于没有足够的操作线程造成的。如果您有一个队列运行程序 1 依赖于队列运行程序 2 的工作,并且异步运行它们,那么您将需要至少两个操作线程(通过 inter_op_parallelism_threads 设置),以保证取得进展.
就您而言,您的队列运行程序正在填充 batch
线程,具体取决于 string_input_ Producer
队列不为空。如果与 string_input_ Producer 队列关联的队列运行器首先运行,那么一切都很好。但是,如果首先调度 batch
队列运行程序,它将陷入 string_input_ Producer.dequeue
操作中,等待 string_input_ Producer
队列获取一些文件名。由于 TensorFlow op 线程池中只有 1 个线程,因此 string_input_ Producer
的 enqueue
op 永远不会分配一个线程来完成(即执行其 Compute
方法)
最简单的解决方案是至少拥有与并发 run
调用一样多的操作线程(即队列数 + 1)。如果您确实想将自己限制在一个线程中,则可以使用主线程同步预加载文件名队列文件名。
coordinator = tf.train.Coordinator()
import glob
files = glob.glob('/temp/pipeline/*')
if FLAGS.preload_filenames:
file_queue = tf.FIFOQueue(capacity=len(files), dtypes=tf.string)
enqueue_val = tf.placeholder(dtype=tf.string)
enqueue_op = file_queue.enqueue(enqueue_val)
else:
file_queue = tf.train.string_input_producer(files)
reader = tf.TextLineReader()
key, serialized_example = reader.read(file_queue)
keys, serialized_examples = tf.train.batch([key, serialized_example], 5,
capacity=10)
initializer = tf.initialize_all_variables()
session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1))
print 'running initializer'
session.run(initializer)
if FLAGS.preload_filenames:
print 'preloading filenames'
for fn in files:
session.run([enqueue_op], feed_dict={enqueue_val: fn})
print 'size - ', session.run([file_queue.size()])
session.run([file_queue.close()])
print 'starting queue runners'
threads = tf.train.start_queue_runners(sess=session, coord=coordinator)
print 'about to run session'
print session.run(keys)
如果您有多个文件名队列,则上面的代码将需要一些封装。或者,这里有一个 hacky 解决方法,如果所有 input_ Producer 队列都有 prebuffer_amount
文件名
queue_runners=tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
filename_queue_runners=[qr for qr in queue_runners if 'input_producer' in qr.name]
for qr in filename_queue_runners:
for k in prebuffer_amount:
sess.run(qr._enqueue_ops[0])
关于tensorflow - 多个队列导致TF锁定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35414009/