python - 如何打印输出到 keras model.fit 的数据,特别是使用 petastorm 数据集时

标签 python tensorflow keras callback petastorm

更新

虽然我很欣赏 AloneTogether 的回答,但我不喜欢我使用 take() 并且它与 model.fit 是分开的。

如果你想看的话,我在这里放了另一个答案。它涉及模型的子类化。还不错。

更新结束

我有一个简单的示例,一个包含 8 列的 parquet 文件,名为 feature_#,每列填充 1 到 100

    feature_1      feature_2     ...      feature_8
    1              1                      1
    2              2                      2
    ...            ...                    ...
    99             99                     99
    100            100                    100

我的模型:

all_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7","feature_8"]
x_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7"]


inputs = [Input(shape=(1,),name=col) for col in x_cols]
merged = Concatenate(axis=1)(inputs)
x = Dense(50, activation="relu")(merged)
x = Dense(20,activation="relu")(x)
outputs = Dense(101,activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy",\
                      optimizer=opt,metrics=['accuracy'])

我像这样使用 petastorm:

batch_size = 4

with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                   schema_fields=all_cols) as train_reader:
    with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                       schema_fields=all_cols) as val_reader:
train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 
                                            

        val_ds = make_petastorm_dataset(val_reader) \
                        .unbatch() \
                        .map(
                        lambda x: (tuple(getattr(x, col) for col in x_cols), 
                                    getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 

对于这个简单的示例,我使用相同的数据进行训练作为验证。 我想确认整个数据集将进入 model.fit() 所以我写了一个自定义回调

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])

# and I pass the dataset to the custom callback:
callbacks.append(MyCustomCallback(train_ds))

不会打印所有值...1到100。如果我在没有 model.fit 的情况下迭代数据集(简单的 for 循环),那么我确实会得到所有 1 到 100,所以我认为 take() 是与model.fit竞争,只是一个理论。

我也尝试过:

class MyCustomCallback(tf.keras.callbacks.Callback):

  def on_train_batch_begin(self, batch, logs=None):
    print(self.model.layers[0].input) # or .output
    #or
    #print(self.model.layers[0].get_weights())


但这并没有给我带来任何真正的值(value) get_weights() 打印出空数组

这是打印输入打印出来的内容:

KerasTensor(type_spec=TensorSpec(shape=(None, 1), dtype=tf.float32, name='feature_1'), name='feature_1', description="created by layer 'feature_1'")

我也尝试在该层的输入和输出上使用 K.eval(),但最终导致了一个 numpy 问题,该问题无法通过任何急切的设置来解决。

我真的不认为这应该那么难。我只想在数据集进入训练之前达到峰值。

我用了repeat()、cache(),并且简单地在model.fit之前迭代数据集,但我不喜欢这种情况发生在model.fit之前的想法,除非它被缓存重新洗牌等等...

但我也希望能够任意查看模型,任何值、任何重量、任何时间。我觉得我无法访问这些东西,但我觉得我应该能够。

感谢任何帮助。

哦,使用tensorflow 2.6.2 atm 与 tf.keras

最佳答案

我认为这完全取决于您的 batch_size 的大小,因为 take(1) 需要一批,并且如果 batch_size < 100您不会看到所有值。例如,如果您有 batch_size=100,那么您肯定会看到 1 到 100 的值:

import pandas as pd
import tensorflow as tf
import numpy as np
from petastorm.tf_utils import make_petastorm_dataset
from petastorm.reader import make_batch_reader

df = pd.DataFrame({'feature1':np.arange(1, 101), 
              'feature2':np.arange(1, 101),
              'feature3':np.arange(1, 101),
              'feature4':np.arange(1, 101),
              'feature5':np.arange(1, 101),
              'feature6':np.arange(1, 101),
              'feature7':np.arange(1, 101),
              'feature8':np.arange(1, 101)})
columns = list(df)
df.to_parquet('file.parquet')
x_cols = columns[:-1]
batch_size = 100

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    tf.print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])


with make_batch_reader('file:///content/file.parquet', num_epochs=1,
                                   schema_fields=columns) as train_reader:
  train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature8"))
                        ) \
                        .shuffle(buffer_size=1000).batch(batch_size)
                        
  inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
  merged = tf.keras.layers.Concatenate(axis=1)(inputs)
  x = tf.keras.layers.Dense(50, activation="relu")(merged)
  x = tf.keras.layers.Dense(20,activation="relu")(x)
  outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
  model = tf.keras.Model(inputs=inputs, outputs=outputs)
  opt = tf.keras.optimizers.Adam(learning_rate=.001)

  model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
  model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])
Epoch 1/2
array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
        79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
        92,  93,  94,  95,  96,  97,  98,  99, 100])
      1/Unknown - 1s 777ms/step - loss: 19.3339 - accuracy: 0.0100array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
        79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
        92,  93,  94,  95,  96,  97,  98,  99, 100])
1/1 [==============================] - 1s 899ms/step - loss: 19.3339 - accuracy: 0.0100
...

此外,我不确定 petastorm 到底有什么好处,但如果您正在寻找替代方案,您可以尝试 tensorflow-io:

import pandas as pd
import tensorflow_io as tfio
import tensorflow as tf
import numpy as np

df = pd.DataFrame({'feature1':np.arange(1, 101), 
              'feature2':np.arange(1, 101),
              'feature3':np.arange(1, 101),
              'feature4':np.arange(1, 101),
              'feature5':np.arange(1, 101),
              'feature6':np.arange(1, 101),
              'feature7':np.arange(1, 101),
              'feature8':np.arange(1, 101)})
columns = list(df)
df.to_parquet('file.parquet')
ds = tfio.IODataset.from_parquet('file.parquet', columns = columns)
x_cols = columns[:-1]
batch_size = 100

train_ds = ds.map(lambda x: (tuple(x[col] for col in x_cols),x["feature8"])).shuffle(buffer_size=1000).batch(batch_size)
inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
merged = tf.keras.layers.Concatenate(axis=1)(inputs)
x = tf.keras.layers.Dense(50, activation="relu")(merged)
x = tf.keras.layers.Dense(20,activation="relu")(x)
outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])

更新 1: 您可以将每个批处理添加到回调中的数组中,并且在每个时期结束时,您可以打印值并重置下一个时期的数组:

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)
    self.train_data = train_data

  def on_batch_end(self, batch, logs=None):
    self.mylist = self.mylist.write(self.mylist.size(), list(self.train_data.take(1).as_numpy_iterator())[0][0][0])
  
  def on_epoch_end(self, epoch, logs=None):
    arr = self.mylist.stack()
    tf.print(arr, summarize=-1)
    self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)

关于python - 如何打印输出到 keras model.fit 的数据,特别是使用 petastorm 数据集时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70753331/

相关文章:

python - 如何防止抽象模型中添加的管理器成为默认管理器?

javascript - 使用 Node(和 Django)的服务器端 javascript

python - 如何在 Python 中修复这个 n 元语法提取器?

tensorflow 自定义操作梯度

python - Tensorflow 在 CPU 而不是 RTX 3000 系列 GPU 上训练

Python 密码学导出 key 到 DER

python - Tensorflow 对象检测 API : how to create tfrecords with images not containing any labels (hard negatives)?

random - 使用 tf.random 多次运行 session 会返回不同的 conv2d 值

python - 如何在keras中为共享层建模?

python - 如何在 keras 中使用 U-net 正确使用批量标准化?