tensorflow - 如何将 TFRecord 示例从字节字符串解析为张量字典?

标签 tensorflow tfrecord

我正在为一个项目训练一个多任务转换器,并且希望将我的数据结构切换到 TFRecords,因为我的训练受到动态数据生成的瓶颈。我目前正在将单个数据样本构造为张量字典,如下所示:

{'continuous_input': tf.Tensor(), 'categorical_input': tf.Tensor(), 'continuous_output': tf.Tensor(), 'categorical_output': tf.Tensor()}

在一个样本内,这 4 个张量具有相同的长度,但在样本之间,这些张量的长度不同。两个continuous_张量是 tf.float32,而两个 categorical_张量是 tf.int32。这些张量的更明确的细节在下面的代码中。

我认为我已经以正确的格式(字节字符串)成功地将数据写入 TFRecords。

问题陈述:我无法弄清楚如何将这些 TFRecords 读回内存并将字节字符串解析到上面的张量结构字典中。我在下面提供了一个完全可重现问题的示例,该示例使用 Numpy v1.23.4 和 Tensorflow v2.10.0。它使用上述字典结构创建虚假数据,将 TFRecords 保存到您的工作目录,重新加载这些 TFRecords 并尝试使用我的函数 parse_tfrecord_fn() 解析它们。 。我知道问题出在parse_tfrecord_fn()但不知道合适tf.io解决此问题的工具。

可重现的示例:

import os
import os.path as op
import numpy as np
import tensorflow as tf


# Helper functions for writing TFRecords
def _tensor_feature(value):
    serialized_nonscalar = tf.io.serialize_tensor(value)
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[serialized_nonscalar.numpy()]))


def create_example(sample):
    feature = {
        "continuous_input": _tensor_feature(sample['continuous_input']),
        "categorical_input": _tensor_feature(sample['categorical_input']),
        "continuous_output": _tensor_feature(sample['continuous_output']),
        "categorical_output": _tensor_feature(sample['categorical_output']),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()


# Helper functions for reading/preparing TFRecord data

def parse_tfrecord_fn(example):
    feature_description = {
        "continuous_input": tf.io.VarLenFeature(tf.string),
        "categorical_input": tf.io.VarLenFeature(tf.string),
        "continuous_output": tf.io.VarLenFeature(tf.string),
        "categorical_output": tf.io.VarLenFeature(tf.string)
    }
    example = tf.io.parse_single_example(example, feature_description)
    # TODO: WHAT GOES HERE?
    return example


def get_dataset(filenames, batch_size):
    dataset = (
        tf.data.TFRecordDataset(filenames, num_parallel_reads=tf.data.AUTOTUNE)
            .map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
            .shuffle(batch_size * 10)
            .batch(batch_size)
            .prefetch(tf.data.AUTOTUNE)
    )
    return dataset

# Make fake data
num_samples_per_tfrecord = 100
num_train_samples = 1600
num_tfrecords = num_train_samples // num_samples_per_tfrecord
fake_sequence_lengths = np.random.randint(3, 35, num_train_samples)
fake_data = []
for i in range(num_train_samples):
    seq_len = fake_sequence_lengths[i]
    fake_data.append({'continuous_input': tf.random.uniform([seq_len], minval=0, maxval=1, dtype=tf.float32),
                      'categorical_input': tf.random.uniform([seq_len], minval=0, maxval=530, dtype=tf.int32),
                      'continuous_output': tf.fill(seq_len, -1.0),
                      'categorical_output': tf.fill(seq_len, -1)})

tfrecords_dir = './tfrecords'
if not op.exists(tfrecords_dir):
    os.makedirs(tfrecords_dir)  # create TFRecords output folder

# Write fake data to tfrecord files
for tfrec_num in range(num_tfrecords):
    samples = fake_data[(tfrec_num * num_samples_per_tfrecord): ((tfrec_num + 1) * num_samples_per_tfrecord)]
    with tf.io.TFRecordWriter(tfrecords_dir + "/file_%.2i.tfrec" % tfrec_num) as writer:
        for sample in samples:
            example = create_example(sample)
            writer.write(example)

# (Try to) Load all the TFRecord data into a (parsed) tf dataset
train_filenames = tf.io.gfile.glob(f"{tfrecords_dir}/*.tfrec")

# Problem: the line below doesn't return the original tensors of fake_data, because my parse_tfrecord_fn is wrong
# Question: What must I add to parse_tfrecord_fn to give this the desired behavior?
dataset = get_dataset(train_filenames, batch_size=32)

# For ease of debugging parse_tfrecord_fn():
dataset = tf.data.TFRecordDataset(train_filenames, num_parallel_reads=tf.data.AUTOTUNE)
element = dataset.take(1).get_single_element()
parse_tfrecord_fn(element)  # set your breakpoint here, then can step through parse_tfrecord_fn()

函数parse_tfrecord_fn()接受字节字符串作为输入,如下所示:

示例 = "b'\n\xb4\x03\nj\n\x10连续_输入\x12V\nT\nR\x08\x01\x12\x04\x12\x02\x08\x12"H..."

命令example = tf.io.parse_single_example(example, feature_description) ,其中参数的定义如我的可重现示例中所示,返回 SparseTensors 的字典使用所需的 4 个键(“连续输入”、“分类输入”等)。然而,这些 SparseTensors 的值要么不存在,要么对我来说无法访问,因此我无法提取它们并解析它们,例如 tf.io.parse_tensor(example['continuous_input'].values.numpy().tolist()[0], out_type=tf.float32) .

最佳答案

我解决了这个问题,我最初的怀疑是正确的 - 这是解析器函数 parse_tfrecord_fn 中需要的一个简单更改。我在下面提供了完整的工作代码,对于任何人来说这可能有助于继续前进。我对编写 TFRecord 的辅助函数做了一些小的修改,只是为了匹配常见的设计模式。实质性更改位于 parse_tfrecord_fn 中。

关键见解:

  1. 在解析最初序列化到 bytes_list 中的任何 tfrecord 对象时,使用 tf.io.FixedLenFeature([], tf.string)。这里的直觉是,尽管 bytes_list 字符串的长度可能因对象而异,但它仍然只是 1 个字符串,而“1”就是它的组成部分固定长度特征。

  2. 使用 tf.io.parse_tensor() 撤消张量的 bytes_list 序列化,并使用 out_type 指定张量的原始数据类型> 论证。

  • 请注意,如果您使用 tf.io.VarLenFeature 解析 TFRecord,这将不起作用,因为这将返回某种我无法反序列化的 SparseTensor/解析。

结合这两种见解,正确的流程如下:

  • 将 TFRecord 解析回其字典形式,并将原始键和序列化(即未解析)张量作为值。
  • 然后解析该字典中的各个张量。
import os
import os.path as op
import numpy as np
import tensorflow as tf


# Helper functions for writing TFRecords
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    # If the value is an eager tensor BytesList won't unpack a string from an EagerTensor.
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def create_example(sample):
    feature = {
        "continuous_input": _bytes_feature(tf.io.serialize_tensor(sample['continuous_input'])),
        "categorical_input": _bytes_feature(tf.io.serialize_tensor(sample['categorical_input'])),
        "continuous_output": _bytes_feature(tf.io.serialize_tensor(sample['continuous_output'])),
        "categorical_output": _bytes_feature(tf.io.serialize_tensor(sample['categorical_output'])),
    }

    return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()


# Helper functions for reading/preparing TFRecord data
def parse_tfrecord_fn(example_to_parse):
    feature_description = {
        "continuous_input": tf.io.FixedLenFeature([], tf.string),
        "categorical_input": tf.io.FixedLenFeature([], tf.string),
        "continuous_output": tf.io.FixedLenFeature([], tf.string),
        "categorical_output": tf.io.FixedLenFeature([], tf.string)
    }
    parsed_example = tf.io.parse_single_example(example_to_parse, feature_description)
    return {'continuous_input': tf.io.parse_tensor(parsed_example['continuous_input'], out_type=tf.float32),
            'categorical_input': tf.io.parse_tensor(parsed_example['categorical_input'], out_type=tf.int32),
            'continuous_output': tf.io.parse_tensor(parsed_example['continuous_output'], out_type=tf.float32),
            'categorical_output': tf.io.parse_tensor(parsed_example['categorical_output'], out_type=tf.int32)}


def get_dataset(filenames, batch_size):
    dataset = (
        tf.data.TFRecordDataset(filenames, num_parallel_reads=tf.data.AUTOTUNE)
            .map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
            .shuffle(batch_size * 10)
            .padded_batch(batch_size=batch_size,
                          padding_values={'categorical_input': 0, 'continuous_input': 0.0,
                                          'categorical_output': -1,
                                          'continuous_output': -1.0},
                          padded_shapes={'categorical_input': [None], 'continuous_input': [None],
                                         'categorical_output': [None],
                                         'continuous_output': [None]},
                          drop_remainder=True)
            .prefetch(tf.data.AUTOTUNE)
    )
    return dataset


# Make fake data
num_samples_per_tfrecord = 100
num_train_samples = 1600
num_tfrecords = num_train_samples // num_samples_per_tfrecord
fake_sequence_lengths = np.random.randint(3, 35, num_train_samples)
fake_data = []
for i in range(num_train_samples):
    seq_len = fake_sequence_lengths[i]
    fake_data.append({"continuous_input": tf.random.uniform([seq_len], minval=0, maxval=1, dtype=tf.float32),
                      "categorical_input": tf.random.uniform([seq_len], minval=0, maxval=530, dtype=tf.int32),
                      "continuous_output": tf.fill(seq_len, -1.0),
                      "categorical_output": tf.fill(seq_len, -1)})

tfrecords_dir = './tfrecords'
if not op.exists(tfrecords_dir):
    os.makedirs(tfrecords_dir)  # create TFRecords output folder

# Write fake data to tfrecord files
for tfrec_num in range(num_tfrecords):
    samples = fake_data[(tfrec_num * num_samples_per_tfrecord): ((tfrec_num + 1) * num_samples_per_tfrecord)]
    with tf.io.TFRecordWriter(tfrecords_dir + "/file_%.2i.tfrec" % tfrec_num) as writer:
        for sample in samples:
            example = create_example(sample)
            writer.write(example)

# Load all the TFRecord data into a (parsed) tf dataset
train_filenames = tf.io.gfile.glob(f"{tfrecords_dir}/*.tfrec")

# The line below works now!
dataset = get_dataset(train_filenames, batch_size=32)

for el in dataset:
    successful_element = el
    break

print(successful_element)

关于tensorflow - 如何将 TFRecord 示例从字节字符串解析为张量字典?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74366436/

相关文章:

python - 切换到数据集后精度不再提高

mxnet - TFRecord 与 RecordIO

tensorflow - 在 tfrecord 文件中写入和读取 SparseTensor

python - 属性错误: 'Tensor' object has no attribute 'numpy' in Tensorflow 2. 1

tensorflow - 使用 tfslim 解码 tfrecord

python - Tensorflow pad序列特征列

tensorflow - 在对象检测 API 中使用数据增强选项时出错

python - 使用Tensorflow对图像进行分类时如何获取识别对象的像素矩阵?

python - LSTM ValueError : non-broadcastable output operand with shape (879, 1) 与广播形状不匹配 (879,18)

python - 将 ZipDataSet 写入 TFRecord