我正在使用 keras model.fit
进行训练,数据来自 tf.records,加载到 tf.data 对象中,该对象使用 .shuffle
来打乱数据。我还使用 callbacks.ModelCheckpoint
每隔 x
个步骤/批处理保存模型。
有时,我的云实例会在 epoch 完成之前断开连接或崩溃,但 y
步骤中的模型会保存到我的驱动器中。
我想在训练另一个纪元之前完成该纪元中数据的训练(我有很长的纪元),因此每个数据示例每个纪元都会训练一次。
有没有办法获取数据的原始顺序以及数据中模型最后保存的位置?
到目前为止我发现了什么
看起来您可以通过设置种子来在 .shuffle 中设置特定顺序。但是,洗牌仅发生在缓冲区中,因此我不能 100% 确定设置种子是否会完美地重现顺序。另外,我不确定这将如何与 reshuffle_each_iteration
一起使用。每个时期后是否使用不同的种子?如果是这样,我想解决方法是一次只训练 1 个时期,每个时期都有一个指定的种子。
即使我确实获得了训练订单的副本,我也不知道如何找到模型最后保存的订单中的位置,然后从该点开始训练。我必须达到的一个想法是手动迭代数据集直到达到它。虽然我不确定 model.fit()
是否会继续这个顺序,或者从头开始。
为了从上次保存模型的位置获取步骤/批处理号,我可能可以将其记录在某个地方。
这些解决方案看起来像是粗略的解决方法,我想知道 Keras 中是否有一些我可能会忽略的功能来帮助解决这个问题。
最佳答案
似乎没有 keras 构建方法可以做到这一点,但如果我错了,请纠正我。
我的方法
Dataset.shuffle
在内部使用初始种子值来生成种子,以便在 reshuffle_each_iteration=True
时在迭代过程中用于重新洗牌。因此,为特定时期重新创建相同的顺序,并继续在该特定批处理中训练时期,我们必须使用相同的种子重新创建数据集,并将数据集迭代器移动到相同的时期和相同的批处理。
调试
为了调试并确保纪元和批处理以相同的顺序生成,我们需要一种方法来打印如何在每个纪元批处理中拾取数据点。这在 kears 中很棘手,因此出于调试目的,我将使用回归问题并将基本事实作为序列号。然后我可以有一个自定义损失,我可以在其中打印事实真相并使用户的订单是正确的。
模型和数据
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import keras.backend as K
# Data
x_train = np.random.randn(15, 10).astype("float32")
y_train = np.arange(15).astype("float32")
# Custom MSE looss just to track the order in which data is picked up
def my_mse(y_true, y_pred):
tf.print(tf.keras.backend.flatten(y_true))
loss = K.square(y_pred - y_true)
loss = K.sum(loss, axis=1)
return loss
# Model
def get_model():
inputs = keras.Input(shape=(10))
outputs = layers.Dense(1, activation="linear")(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer="rmsprop",
loss=my_mse,
)
return model
数据集
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)
epochs = 2
print ("Runs 1")
for e in range(epochs):
for i, (x, y) in enumerate(train_dataset):
print (e, i, y)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)
print ("Runs 2")
for e in range(epochs):
for i, (x, y) in enumerate(train_dataset):
print (e, i, y)
输出:
Runs 1
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14. 9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10. 0. 14. 6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0. 1. 5. 6. 9. 3. 7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13. 8. 4. 10. 2. 12. 11.], shape=(7,), dtype=float32)
Runs 2
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14. 9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10. 0. 14. 6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0. 1. 5. 6. 9. 3. 7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13. 8. 4. 10. 2. 12. 11.], shape=(7,), dtype=float32)
是的,通过种子,订单被复制。
现在让我们编写一个方法将数据集转发到特定的纪元和批处理组合
def forward(dataset, n=None):
if not n:
return dataset
i = 0
while True:
for _ in dataset:
i += 1
if i == n:
return dataset
测试用例:
让我们正常运行并观察顺序
从头开始的数据
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)
model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)
输出:
[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
来自数据集第 n 个状态的数据
让我们的数据集进行第四次迭代并运行训练
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)
model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)
输出:
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
很好,现在我们知道如何正确转发数据集了。现在让我们编写回调来跟踪当前的迭代次数:
用于跟踪迭代的自定义回调(纪元-批处理组合)
现在我们需要识别模型检查点的纪元和批处理组合。如果我们有这些信息,我们可以加载最后一个检查点模型并将我们的数据集转发到其批处理和纪元组合并继续训练。我们将使用回调来完成此操作
class MyCustomCallback(tf.keras.callbacks.ModelCheckpoint, keras.callbacks.Callback):
def __init__(self, the_id=0, **args):
self.the_id = the_id
self.epoch = 0
super().__init__(**args)
def _save_model(self, epoch, logs):
logs['the_id'] = self.the_id
super()._save_model(epoch, logs)
def on_batch_end(self, batch, logs={}):
self.the_id += 1
super().on_batch_end(batch, logs)
checkpoint_filepath = 'checkpoint-{the_id}'
model_checkpoint_callback = MyCustomCallback(
filepath=checkpoint_filepath,
save_freq=2,
save_best_only=False)
model = get_model()
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)
model.fit(train_dataset, epochs=5, verbose=0, callbacks=[model_checkpoint_callback], workers=4, shuffle=False)
输出:
[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
我们每两批检查一次。因此,我们假设它崩溃了,最后一个检查点是 checkpoint-4。我们可以加载此模型并将数据集转发到 4 并继续训练。
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)
model = get_model()
model.fit(train_dataset, epochs=2, verbose=0, workers=4, shuffle=False)
输出:
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
关于tensorflow - 使用打乱的 tf.data : if training is interrupted, 进行 Keras 训练如何在最后一次数据迭代/最后保存的检查点的顺序处继续训练,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62528853/