我正在为一个项目训练一个多任务转换器,并且希望将我的数据结构切换到 TFRecords,因为我的训练受到动态数据生成的瓶颈。我目前正在将单个数据样本构造为张量字典,如下所示:
{'continuous_input': tf.Tensor(), 'categorical_input': tf.Tensor(), 'continuous_output': tf.Tensor(), 'categorical_output': tf.Tensor()}
在一个样本内,这 4 个张量具有相同的长度,但在样本之间,这些张量的长度不同。两个continuous_
张量是 tf.float32,而两个 categorical_
张量是 tf.int32。这些张量的更明确的细节在下面的代码中。
我认为我已经以正确的格式(字节字符串)成功地将数据写入 TFRecords。
问题陈述:我无法弄清楚如何将这些 TFRecords 读回内存并将字节字符串解析到上面的张量结构字典中。我在下面提供了一个完全可重现问题的示例,该示例使用 Numpy v1.23.4 和 Tensorflow v2.10.0。它使用上述字典结构创建虚假数据,将 TFRecords 保存到您的工作目录,重新加载这些 TFRecords 并尝试使用我的函数 parse_tfrecord_fn()
解析它们。 。我知道问题出在parse_tfrecord_fn()
但不知道合适tf.io
解决此问题的工具。
可重现的示例:
import os
import os.path as op
import numpy as np
import tensorflow as tf
# Helper functions for writing TFRecords
def _tensor_feature(value):
serialized_nonscalar = tf.io.serialize_tensor(value)
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[serialized_nonscalar.numpy()]))
def create_example(sample):
feature = {
"continuous_input": _tensor_feature(sample['continuous_input']),
"categorical_input": _tensor_feature(sample['categorical_input']),
"continuous_output": _tensor_feature(sample['continuous_output']),
"categorical_output": _tensor_feature(sample['categorical_output']),
}
return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()
# Helper functions for reading/preparing TFRecord data
def parse_tfrecord_fn(example):
feature_description = {
"continuous_input": tf.io.VarLenFeature(tf.string),
"categorical_input": tf.io.VarLenFeature(tf.string),
"continuous_output": tf.io.VarLenFeature(tf.string),
"categorical_output": tf.io.VarLenFeature(tf.string)
}
example = tf.io.parse_single_example(example, feature_description)
# TODO: WHAT GOES HERE?
return example
def get_dataset(filenames, batch_size):
dataset = (
tf.data.TFRecordDataset(filenames, num_parallel_reads=tf.data.AUTOTUNE)
.map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(batch_size * 10)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
return dataset
# Make fake data
num_samples_per_tfrecord = 100
num_train_samples = 1600
num_tfrecords = num_train_samples // num_samples_per_tfrecord
fake_sequence_lengths = np.random.randint(3, 35, num_train_samples)
fake_data = []
for i in range(num_train_samples):
seq_len = fake_sequence_lengths[i]
fake_data.append({'continuous_input': tf.random.uniform([seq_len], minval=0, maxval=1, dtype=tf.float32),
'categorical_input': tf.random.uniform([seq_len], minval=0, maxval=530, dtype=tf.int32),
'continuous_output': tf.fill(seq_len, -1.0),
'categorical_output': tf.fill(seq_len, -1)})
tfrecords_dir = './tfrecords'
if not op.exists(tfrecords_dir):
os.makedirs(tfrecords_dir) # create TFRecords output folder
# Write fake data to tfrecord files
for tfrec_num in range(num_tfrecords):
samples = fake_data[(tfrec_num * num_samples_per_tfrecord): ((tfrec_num + 1) * num_samples_per_tfrecord)]
with tf.io.TFRecordWriter(tfrecords_dir + "/file_%.2i.tfrec" % tfrec_num) as writer:
for sample in samples:
example = create_example(sample)
writer.write(example)
# (Try to) Load all the TFRecord data into a (parsed) tf dataset
train_filenames = tf.io.gfile.glob(f"{tfrecords_dir}/*.tfrec")
# Problem: the line below doesn't return the original tensors of fake_data, because my parse_tfrecord_fn is wrong
# Question: What must I add to parse_tfrecord_fn to give this the desired behavior?
dataset = get_dataset(train_filenames, batch_size=32)
# For ease of debugging parse_tfrecord_fn():
dataset = tf.data.TFRecordDataset(train_filenames, num_parallel_reads=tf.data.AUTOTUNE)
element = dataset.take(1).get_single_element()
parse_tfrecord_fn(element) # set your breakpoint here, then can step through parse_tfrecord_fn()
函数parse_tfrecord_fn()
接受字节字符串作为输入,如下所示:
示例 = "b'\n\xb4\x03\nj\n\x10连续_输入\x12V\nT\nR\x08\x01\x12\x04\x12\x02\x08\x12"H..."
命令example = tf.io.parse_single_example(example, feature_description)
,其中参数的定义如我的可重现示例中所示,返回 SparseTensors
的字典使用所需的 4 个键(“连续输入”、“分类输入”等)。然而,这些 SparseTensors 的值要么不存在,要么对我来说无法访问,因此我无法提取它们并解析它们,例如 tf.io.parse_tensor(example['continuous_input'].values.numpy().tolist()[0], out_type=tf.float32)
.
最佳答案
我解决了这个问题,我最初的怀疑是正确的 - 这是解析器函数 parse_tfrecord_fn
中需要的一个简单更改。我在下面提供了完整的工作代码,对于任何人来说这可能有助于继续前进。我对编写 TFRecord 的辅助函数做了一些小的修改,只是为了匹配常见的设计模式。实质性更改位于 parse_tfrecord_fn
中。
关键见解:
在解析最初序列化到
bytes_list
中的任何 tfrecord 对象时,使用tf.io.FixedLenFeature([], tf.string)
。这里的直觉是,尽管bytes_list
字符串的长度可能因对象而异,但它仍然只是 1 个字符串,而“1”就是它的组成部分固定长度特征。使用
tf.io.parse_tensor()
撤消张量的bytes_list
序列化,并使用out_type
指定张量的原始数据类型> 论证。
- 请注意,如果您使用
tf.io.VarLenFeature
解析 TFRecord,这将不起作用,因为这将返回某种我无法反序列化的SparseTensor
/解析。
结合这两种见解,正确的流程如下:
- 将 TFRecord 解析回其字典形式,并将原始键和序列化(即未解析)张量作为值。
- 然后解析该字典中的各个张量。
import os
import os.path as op
import numpy as np
import tensorflow as tf
# Helper functions for writing TFRecords
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
# If the value is an eager tensor BytesList won't unpack a string from an EagerTensor.
if isinstance(value, type(tf.constant(0))):
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def create_example(sample):
feature = {
"continuous_input": _bytes_feature(tf.io.serialize_tensor(sample['continuous_input'])),
"categorical_input": _bytes_feature(tf.io.serialize_tensor(sample['categorical_input'])),
"continuous_output": _bytes_feature(tf.io.serialize_tensor(sample['continuous_output'])),
"categorical_output": _bytes_feature(tf.io.serialize_tensor(sample['categorical_output'])),
}
return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()
# Helper functions for reading/preparing TFRecord data
def parse_tfrecord_fn(example_to_parse):
feature_description = {
"continuous_input": tf.io.FixedLenFeature([], tf.string),
"categorical_input": tf.io.FixedLenFeature([], tf.string),
"continuous_output": tf.io.FixedLenFeature([], tf.string),
"categorical_output": tf.io.FixedLenFeature([], tf.string)
}
parsed_example = tf.io.parse_single_example(example_to_parse, feature_description)
return {'continuous_input': tf.io.parse_tensor(parsed_example['continuous_input'], out_type=tf.float32),
'categorical_input': tf.io.parse_tensor(parsed_example['categorical_input'], out_type=tf.int32),
'continuous_output': tf.io.parse_tensor(parsed_example['continuous_output'], out_type=tf.float32),
'categorical_output': tf.io.parse_tensor(parsed_example['categorical_output'], out_type=tf.int32)}
def get_dataset(filenames, batch_size):
dataset = (
tf.data.TFRecordDataset(filenames, num_parallel_reads=tf.data.AUTOTUNE)
.map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(batch_size * 10)
.padded_batch(batch_size=batch_size,
padding_values={'categorical_input': 0, 'continuous_input': 0.0,
'categorical_output': -1,
'continuous_output': -1.0},
padded_shapes={'categorical_input': [None], 'continuous_input': [None],
'categorical_output': [None],
'continuous_output': [None]},
drop_remainder=True)
.prefetch(tf.data.AUTOTUNE)
)
return dataset
# Make fake data
num_samples_per_tfrecord = 100
num_train_samples = 1600
num_tfrecords = num_train_samples // num_samples_per_tfrecord
fake_sequence_lengths = np.random.randint(3, 35, num_train_samples)
fake_data = []
for i in range(num_train_samples):
seq_len = fake_sequence_lengths[i]
fake_data.append({"continuous_input": tf.random.uniform([seq_len], minval=0, maxval=1, dtype=tf.float32),
"categorical_input": tf.random.uniform([seq_len], minval=0, maxval=530, dtype=tf.int32),
"continuous_output": tf.fill(seq_len, -1.0),
"categorical_output": tf.fill(seq_len, -1)})
tfrecords_dir = './tfrecords'
if not op.exists(tfrecords_dir):
os.makedirs(tfrecords_dir) # create TFRecords output folder
# Write fake data to tfrecord files
for tfrec_num in range(num_tfrecords):
samples = fake_data[(tfrec_num * num_samples_per_tfrecord): ((tfrec_num + 1) * num_samples_per_tfrecord)]
with tf.io.TFRecordWriter(tfrecords_dir + "/file_%.2i.tfrec" % tfrec_num) as writer:
for sample in samples:
example = create_example(sample)
writer.write(example)
# Load all the TFRecord data into a (parsed) tf dataset
train_filenames = tf.io.gfile.glob(f"{tfrecords_dir}/*.tfrec")
# The line below works now!
dataset = get_dataset(train_filenames, batch_size=32)
for el in dataset:
successful_element = el
break
print(successful_element)
关于tensorflow - 如何将 TFRecord 示例从字节字符串解析为张量字典?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74366436/