我创建了一个自定义类作为 ML 模型,它工作正常,但我想对输入进行标准化,因为它们具有广泛的值(例如 0、20000、500、10、8)。目前,作为标准化输入的一种方式,我将 lambda x: np.log(x + 1) 应用于每个输入(+1 是为了在 0 时不会出错)传入)。标准化层会比我当前的方法更好吗?如果是这样,我将如何实现它?我的模型代码如下:
class FollowModel:
def __init__(self, input_shape, output_shape, hidden_layers, input_labels, learning_rate=0.001):
tf.reset_default_graph()
assert len(input_labels) == input_shape[1], 'Incorrect number of input labels!'
# Placeholders for input and output data
self.input_labels = input_labels
self.input_shape = input_shape
self.output_shape = output_shape
self.X = tf.placeholder(shape=input_shape, dtype=tf.float64, name='X')
self.y = tf.placeholder(shape=output_shape, dtype=tf.float64, name='y')
self.hidden_layers = hidden_layers
self.learning_rate = learning_rate
# Variables for two group of weights between the three layers of the network
self.W1 = tf.Variable(np.random.rand(input_shape[1], hidden_layers), dtype=tf.float64)
self.W2 = tf.Variable(np.random.rand(hidden_layers, output_shape[1]), dtype=tf.float64)
# Create the neural net graph
self.A1 = tf.sigmoid(tf.matmul(self.X, self.W1))
self.y_est = tf.sigmoid(tf.matmul(self.A1, self.W2))
# Define a loss function
self.deltas = tf.square(self.y_est - self.y) # want this to be 0
self.loss = tf.reduce_sum(self.deltas)
# Define a train operation to minimize the loss
self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)
#initialize
self.model_init = tf.global_variables_initializer()
self.trained = False
def train(self, Xtrain, ytrain, Xtest, ytest, training_steps, batch_size, print_progress=True):
#intiialize session
self.trained = True
self.training_steps = training_steps
self.batch_size = batch_size
self.sess = tf.Session()
self.sess.run(self.model_init)
self.losses = []
self.accs = []
self.testing_accuracies = []
for i in range(training_steps*batch_size):
self.sess.run(self.optimizer, feed_dict={self.X: Xtrain, self.y: ytrain})
local_loss = self.sess.run(self.loss, feed_dict={self.X: Xtrain.values, self.y: ytrain.values})
self.losses.append(local_loss)
self.weights1 = self.sess.run(self.W1)
self.weights2 = self.sess.run(self.W2)
y_est_np = self.sess.run(self.y_est, feed_dict={self.X: Xtrain.values, self.y: ytrain.values})
correct = [estimate.argmax(axis=0) == target.argmax(axis=0)
for estimate, target in zip(y_est_np, ytrain.values)]
acc = 100 * sum(correct) / len(correct)
self.accs.append(acc)
if i % batch_size == 0:
batch_num = i / batch_size
if batch_num % 5 == 0:
self.testing_accuracies.append(self.test_accuracy(Xtest, ytest, False, True))
temp_table = pd.concat([Xtrain, ytrain], axis=1).sample(frac=1)
column_names = list(temp_table.columns.values)
X_columns, y_columns = column_names[0:len(column_names) - 2], column_names[len(column_names) - 2:]
Xtrain = temp_table[X_columns]
ytrain = temp_table[y_columns]
if print_progress: print('Step: %d, Accuracy: %.2f, Loss: %.2f' % (int(i/batch_size), acc, local_loss))
if print_progress: print("Training complete!\nloss: {}, hidden nodes: {}, steps: {}, epoch size: {}, total steps: {}".format(int(self.losses[-1]*100)/100, self.hidden_layers, training_steps, batch_size, training_steps*batch_size))
self.follow_accuracy = acc
return acc
def test_accuracy(self, Xtest, ytest, print_progress=True, return_accuracy=False):
if self.trained:
X = tf.placeholder(shape=Xtest.shape, dtype=tf.float64, name='X')
y = tf.placeholder(shape=ytest.shape, dtype=tf.float64, name='y')
W1 = tf.Variable(self.weights1)
W2 = tf.Variable(self.weights2)
A1 = tf.sigmoid(tf.matmul(X, W1))
y_est = tf.sigmoid(tf.matmul(A1, W2))
# Calculate the predicted outputs
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
y_est_np = sess.run(y_est, feed_dict={X: Xtest, y: ytest})
correctly_followed = 0
incorrectly_followed = 0
missed_follows = 0
correctly_skipped = 0
for estimate, actual in zip(y_est_np, ytest.values):
est = estimate.argmax(axis=0)
# print(estimate)
actual = actual.argmax(axis=0)
if est == 1 and actual == 0: incorrectly_followed += 1
elif est == 1 and actual == 1: correctly_followed += 1
elif est == 0 and actual == 1: missed_follows += 1
else: correctly_skipped += 1
# correct = [estimate.argmax(axis=0) == target.argmax(axis=0) for estimate, target in zip(y_est_np, ytest.values)]
total_followed = incorrectly_followed + correctly_followed
total_correct = correctly_followed + correctly_skipped
total_incorrect = incorrectly_followed + missed_follows
try: total_accuracy = int(total_correct * 10000 / (total_correct + total_incorrect)) / 100
except: total_accuracy = 0
total_skipped = correctly_skipped + missed_follows
try: follow_accuracy = int(correctly_followed * 10000 / total_followed) / 100
except: follow_accuracy = 0
try: skip_accuracy = int(correctly_skipped * 10000 / total_skipped) / 100
except: skip_accuracy = 0
if print_progress: print('Correctly followed {} / {} ({}%), correctly skipped {} / {} ({}%)'.format(
correctly_followed, total_followed, follow_accuracy, correctly_skipped, total_skipped, skip_accuracy))
self.follow_accuracy = follow_accuracy
if return_accuracy:
return total_accuracy
else:
print('The model is not trained!')
def make_prediction_on_normal_data(self, input_list):
assert len(input_list) == len(self.input_labels), 'Incorrect number of inputs (had {} should have {})'.format(len(input_list), len(self.input_labels))
# from ProcessData import normalize_list
# normalize_list(input_list)
input_array = np.array([input_list])
X = tf.placeholder(shape=(1, len(input_list)), dtype=tf.float64, name='X')
y = tf.placeholder(shape=(1, 2), dtype=tf.float64, name='y')
W1 = tf.Variable(self.weights1)
W2 = tf.Variable(self.weights2)
A1 = tf.sigmoid(tf.matmul(X, W1))
y_est = tf.sigmoid(tf.matmul(A1, W2))
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
y_est_np = sess.run(y_est, feed_dict={X: input_array, y: self.create_blank_outputs()})
predicted_value = y_est_np[0].argmax(axis=0)
return predicted_value
def make_prediction_on_abnormal_data(self, input_list):
from ProcessData import normalize_list
normalize_list(input_list)
return self.make_prediction_on_normal_data(input_list)
def create_blank_outputs(self):
blank_outputs = np.zeros(shape=(1,2), dtype=np.int)
for i in range(len(blank_outputs[0])):
blank_outputs[0][i] = float(blank_outputs[0][i])
return blank_outputs
最佳答案
我不明白你为什么要创建一个层来做到这一点。预处理输入的常见做法与您当前所做的一样。 对于倾斜数据,使用对数运算符非常常见,但还有其他预处理解决方案,例如 sklearn 的 MinMaxScaler 和 StandardScaler
https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html
这些只是扩展数据的另外两种方法的示例。 有一种称为 BatchNorm 的东西,但不建议将其作为网络的第一层,因为数据的分布是固定的,并且在训练期间不会变化。
关于python - 对 Tensorflow 中的输入应用归一化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53530554/