tensorflow - 解析csv时升级到tf.dataset无法正常工作

标签 tensorflow google-cloud-ml tensorflow-datasets

我有一个GCMLE实验,我正在尝试升级input_fn以使用新的tf.data功能。我已根据此sample创建了以下input_fn

def input_fn(...):
    dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
    dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
                tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5) 
    if shuffle:
      dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()

    labels = features.pop(LABEL_COLUMN)

    return features, labels

我的parse_csv与我以前使用的相同,但目前无法正常工作。我可以解决一些问题,但是我不完全了解,为什么是我遇到这些问题的原因。这是我parse_csv()函数的开始
def parse_csv(..):
    columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
    raw_features = dict(zip(FIELDNAMES, columns))

    words = tf.string_split(raw_features['sentences']) # splitting words
    vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
                default_value = 0)

....
  • 立即使此tf.string_split()停止工作,并且错误为ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], []. -通过将raw_features['sentences']通过[raw_features['sentences']]包装到张量中很容易解决,但是我不明白为什么这种dataset方法需要这样做吗?在旧版本中怎么能正常工作呢?为了使形状与我的模型的其余部分匹配,最终需要通过words = tf.squeeze(words, 0)删除此多余的尺寸,因为我将此“不必要的”尺寸添加到了张量。
  • 出于任何原因,我还收到一个错误,指出该表未初始化tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized.,但是此代码与我的旧input_fn()完全兼容(请参见下文),所以我不知道为什么现在需要初始化表?我还没有为这部分找到解决方案。我无法在parse_csv函数中使用tf.contrib.lookup.index_table_from_file吗?

  • 供引用,这是我的旧input_fn()仍然有效:
    def input_fn(...):
        filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames), 
                    num_epochs=num_epochs, shuffle=shuffle, capacity=32)
        reader = tf.TextLineReader(skip_header_lines=skip_header_lines)
    
        _, rows = reader.read_up_to(filename_queue, num_records=batch_size)
    
        features = parse_csv(rows, hparams)
    
    
            if shuffle:
                features = tf.train.shuffle_batch(
                    features,
                    batch_size,
                    min_after_dequeue=2 * batch_size + 1,
                    capacity=batch_size * 10,
                    num_threads=multiprocessing.cpu_count(), 
                    enqueue_many=True,
                    allow_smaller_final_batch=True
                )
            else:
                features = tf.train.batch(
                    features,
                    batch_size,
                    capacity=batch_size * 10,
                    num_threads=multiprocessing.cpu_count(),
                    enqueue_many=True,
                    allow_smaller_final_batch=True
                )
    
    labels = features.pop(LABEL_COLUMN)
    
    return features, labels
    

    更新TF 1.7

    我正在使用TF 1.7(应该具有@mrry答案中提到的TF 1.6的所有功能)进行重新讨论,但是我仍然无法复制该行为。对于我的旧input_fn(),我能够获得大约13个步骤/秒的速度。我正在使用的新功能如下:
    def input_fn(...):
        files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
        dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
        dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
                parse_csv_dataset(row, hparams = hparams), 
                batch_size = batch_size, 
                num_parallel_batches = multiprocessing.cpu_count())) 
        dataset = dataset.prefetch(1)
        if shuffle:
            dataset = dataset.shuffle(buffer_size = 10000)
        dataset = dataset.repeat(num_epochs)
        iterator = dataset.make_initializable_iterator()
        features = iterator.get_next()
        tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)
    
        labels = {key: features.pop(key) for key in LABEL_COLUMNS}
    
        return features, labels 
    

    我相信我正在跟踪所有performance guildines,例如1)使用预取2)使用num_parallel_batches = cores使用map_and_batch 3)使用parallel_interleave 4)在重复之前应用随机播放。我没有使用的唯一步骤是缓存建议,但希望它实际上仅对第一个之外的时代有所帮助,以及“首先应用交错,预取和随机播放”。 -但是,我发现在map_and_batch之后进行预取和改组可以使速度提高约10%。

    BUFFER ISSUE
    我注意到的第一个性能问题是我的旧input_fn()花了大约13个挂钟分钟才能完成20k步,但即使buffer_size为10,000(这意味着我们正在等待,直到我们处理了10,000个批次) )我还在等待40多分钟,等待缓冲区充满。花这么长时间有意义吗?如果我知道我在GCS上分片的.csv已经被随机化了,那么将随机播放/缓冲区的大小设置得较小是可以接受的吗?我正在尝试从tf.train.shuffle_batch()复制行为-但是,看来在最坏的情况下应该花13分钟的时间才能达到10k步才能填满缓冲区?

    STEPS/SEC

    即使缓冲区已满,全局速度/秒也将在与以前的input_fn()相同的模型上达到3个步骤/秒(通常低至2个步骤/秒),而之前的input_fn()则获得约13个步骤/秒。

    松散交错
    我最终尝试用sloppy_interleave()替换parallel_interleave(),因为这是@mrry的另一个建议。当我切换到sloppy_interleave时,我得到了14个步骤/秒!我知道这意味着它不是确定性的,但实际上应该只意味着它从一个运行(或历时)到下一个运行(或时期)不是确定性的吗?还是对此有更大的启示?我应该担心旧的shuffle_batch()方法和sloppy_interleave之间的真正区别吗?这样会导致4-5倍的改善,这一事实是否暗示了先前的阻碍因素是什么?

    最佳答案

    在TF 1.4(当前是与GCMLE一起使用的TF的最新版本)中,您将无法在查询表中使用make_one_shot_iterator()(请参阅相关的post),您将需要使用Dataset.make_initializable_iterator(),然后使用默认的iterator.initalizer初始化TABLES_INITIALIZER(来自此post)。这是input_fn()的样子:

    def input_fn(...):
      dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    
      # Define `vocab_table` outside the map function and use it in `parse_csv()`.
      vocab_table = tf.contrib.lookup.index_table_from_file(
          vocabulary_file=hparams.vocab_file, default_value=0)
    
      dataset = dataset.interleave(
          lambda filename: (tf.data.TextLineDataset(filename)
                            .skip(1)
                            .map(lambda row: parse_csv(row, hparams),
                                 num_parallel_calls=multiprocessing.cpu_count())),
          cycle_length=5) 
    
      if shuffle:
        dataset = dataset.shuffle(buffer_size=10000)
      dataset = dataset.repeat(num_epochs)
      dataset = dataset.batch(batch_size)
      iterator = dataset.make_initializable_iterator()
      features = iterator.get_next()
    
      # add iterator.intializer to be handled by default table initializers
      tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer) 
    
      labels = features.pop(LABEL_COLUMN)
    
      return features, labels
    

    关于tensorflow - 解析csv时升级到tf.dataset无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48779293/

    相关文章:

    python - 在 Keras 中构建自定义损失函数

    python - 如何将 4 个子张量交错分配给一个更大的张量?

    python - 无法将 NumPy 数组转换为张量(不支持的对象类型 int)

    google-cloud-ml - Google Cloud ML Tensorflow 版本

    python - 在 Tensorboard 中绘制各个层的梯度

    python - 如何从 numpy 数组的数组中获取 tensorflow 2 中的窗口数据集?

    python - 为 Keras 逐个元素编写自定义损失函数

    python - 加载模型时出现意外错误: problem in predictor - ModuleNotFoundError: No module named 'torchvision'

    python - Tensorflow 1.14 : tf. numpy_function 映射时会丢失形状?

    python - TensorFlow 数据集映射函数中的随机性