python - 对 Tensorflow 中的输入应用归一化

标签 python tensorflow machine-learning keras artificial-intelligence

我创建了一个自定义类作为 ML 模型,它工作正常,但我想对输入进行标准化,因为它们具有广泛的值(例如 0、20000、500、10、8)。目前,作为标准化输入的一种方式,我将 lambda x: np.log(x + 1) 应用于每个输入(+1 是为了在 0 时不会出错)传入)。标准化层会比我当前的方法更好吗?如果是这样,我将如何实现它?我的模型代码如下:

class FollowModel:
    def __init__(self, input_shape, output_shape, hidden_layers, input_labels, learning_rate=0.001):
        tf.reset_default_graph()
        assert len(input_labels) == input_shape[1], 'Incorrect number of input labels!'

        # Placeholders for input and output data
        self.input_labels = input_labels
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.X = tf.placeholder(shape=input_shape, dtype=tf.float64, name='X')
        self.y = tf.placeholder(shape=output_shape, dtype=tf.float64, name='y')
        self.hidden_layers = hidden_layers
        self.learning_rate = learning_rate

        # Variables for two group of weights between the three layers of the network
        self.W1 = tf.Variable(np.random.rand(input_shape[1], hidden_layers), dtype=tf.float64)
        self.W2 = tf.Variable(np.random.rand(hidden_layers, output_shape[1]), dtype=tf.float64)

        # Create the neural net graph
        self.A1 = tf.sigmoid(tf.matmul(self.X, self.W1))
        self.y_est = tf.sigmoid(tf.matmul(self.A1, self.W2))

        # Define a loss function
        self.deltas = tf.square(self.y_est - self.y)  # want this to be 0
        self.loss = tf.reduce_sum(self.deltas)

        # Define a train operation to minimize the loss
        self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)

        #initialize
        self.model_init = tf.global_variables_initializer()
        self.trained = False


    def train(self, Xtrain, ytrain, Xtest, ytest, training_steps, batch_size, print_progress=True):
        #intiialize session
        self.trained = True
        self.training_steps = training_steps
        self.batch_size = batch_size
        self.sess = tf.Session()
        self.sess.run(self.model_init)
        self.losses = []
        self.accs = []
        self.testing_accuracies = []

        for i in range(training_steps*batch_size):
            self.sess.run(self.optimizer, feed_dict={self.X: Xtrain, self.y: ytrain})
            local_loss = self.sess.run(self.loss, feed_dict={self.X: Xtrain.values, self.y: ytrain.values})
            self.losses.append(local_loss)
            self.weights1 = self.sess.run(self.W1)
            self.weights2 = self.sess.run(self.W2)

            y_est_np = self.sess.run(self.y_est, feed_dict={self.X: Xtrain.values, self.y: ytrain.values})
            correct = [estimate.argmax(axis=0) == target.argmax(axis=0)
                       for estimate, target in zip(y_est_np, ytrain.values)]
            acc = 100 * sum(correct) / len(correct)
            self.accs.append(acc)

            if i % batch_size == 0:
                batch_num = i / batch_size
                if batch_num % 5 == 0:
                    self.testing_accuracies.append(self.test_accuracy(Xtest, ytest, False, True))
                temp_table = pd.concat([Xtrain, ytrain], axis=1).sample(frac=1)
                column_names = list(temp_table.columns.values)
                X_columns, y_columns = column_names[0:len(column_names) - 2], column_names[len(column_names) - 2:]
                Xtrain = temp_table[X_columns]
                ytrain = temp_table[y_columns]
                if print_progress: print('Step: %d, Accuracy: %.2f, Loss: %.2f' % (int(i/batch_size), acc, local_loss))

        if print_progress: print("Training complete!\nloss: {}, hidden nodes: {}, steps: {}, epoch size: {}, total steps: {}".format(int(self.losses[-1]*100)/100, self.hidden_layers, training_steps, batch_size, training_steps*batch_size))
        self.follow_accuracy = acc
        return acc


    def test_accuracy(self, Xtest, ytest, print_progress=True, return_accuracy=False):
        if self.trained:
            X = tf.placeholder(shape=Xtest.shape, dtype=tf.float64, name='X')
            y = tf.placeholder(shape=ytest.shape, dtype=tf.float64, name='y')
            W1 = tf.Variable(self.weights1)
            W2 = tf.Variable(self.weights2)
            A1 = tf.sigmoid(tf.matmul(X, W1))
            y_est = tf.sigmoid(tf.matmul(A1, W2))

            # Calculate the predicted outputs
            init = tf.global_variables_initializer()
            with tf.Session() as sess:
                sess.run(init)
                y_est_np = sess.run(y_est, feed_dict={X: Xtest, y: ytest})

            correctly_followed = 0
            incorrectly_followed = 0
            missed_follows = 0
            correctly_skipped = 0

            for estimate, actual in zip(y_est_np, ytest.values):
                est = estimate.argmax(axis=0)
                # print(estimate)
                actual = actual.argmax(axis=0)
                if est == 1 and actual == 0: incorrectly_followed += 1
                elif est == 1 and actual == 1: correctly_followed += 1
                elif est == 0 and actual == 1: missed_follows += 1
                else: correctly_skipped += 1

            # correct = [estimate.argmax(axis=0) == target.argmax(axis=0) for estimate, target in zip(y_est_np, ytest.values)]
            total_followed = incorrectly_followed + correctly_followed

            total_correct = correctly_followed + correctly_skipped
            total_incorrect = incorrectly_followed + missed_follows

            try: total_accuracy = int(total_correct * 10000 / (total_correct + total_incorrect)) / 100
            except: total_accuracy = 0


            total_skipped = correctly_skipped + missed_follows
            try: follow_accuracy = int(correctly_followed * 10000 / total_followed) / 100
            except: follow_accuracy = 0
            try: skip_accuracy = int(correctly_skipped * 10000 / total_skipped) / 100
            except: skip_accuracy = 0

            if print_progress: print('Correctly followed {} / {} ({}%), correctly skipped {} / {} ({}%)'.format(
                correctly_followed, total_followed, follow_accuracy, correctly_skipped, total_skipped, skip_accuracy))

            self.follow_accuracy = follow_accuracy

            if return_accuracy:
                return total_accuracy

        else:
            print('The model is not trained!')


    def make_prediction_on_normal_data(self, input_list):
        assert len(input_list) == len(self.input_labels), 'Incorrect number of inputs (had {} should have {})'.format(len(input_list), len(self.input_labels))
        # from ProcessData import normalize_list
        # normalize_list(input_list)
        input_array = np.array([input_list])

        X = tf.placeholder(shape=(1, len(input_list)), dtype=tf.float64, name='X')
        y = tf.placeholder(shape=(1, 2), dtype=tf.float64, name='y')
        W1 = tf.Variable(self.weights1)
        W2 = tf.Variable(self.weights2)
        A1 = tf.sigmoid(tf.matmul(X, W1))
        y_est = tf.sigmoid(tf.matmul(A1, W2))

        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            y_est_np = sess.run(y_est, feed_dict={X: input_array, y: self.create_blank_outputs()})
            predicted_value = y_est_np[0].argmax(axis=0)
            return predicted_value


    def make_prediction_on_abnormal_data(self, input_list):
        from ProcessData import normalize_list
        normalize_list(input_list)
        return self.make_prediction_on_normal_data(input_list)


    def create_blank_outputs(self):
        blank_outputs = np.zeros(shape=(1,2), dtype=np.int)
        for i in range(len(blank_outputs[0])):
            blank_outputs[0][i] = float(blank_outputs[0][i])
        return blank_outputs

最佳答案

我不明白你为什么要创建一个层来做到这一点。预处理输入的常见做法与您当前所做的一样。 对于倾斜数据,使用对数运算符非常常见,但还有其他预处理解决方案,例如 sklearn 的 MinMaxScaler 和 StandardScaler

https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html

这些只是扩展数据的另外两种方法的示例。 有一种称为 BatchNorm 的东西,但不建议将其作为网络的第一层,因为数据的分布是固定的,并且在训练期间不会变化。

关于python - 对 Tensorflow 中的输入应用归一化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53530554/

相关文章:

python - 在 python 函数中定义本地类是个坏主意吗?

python - 从匹配索引开始循环多次遍历列表

python - 如何同时在多个维度上执行reduce_op?

python - Tensorflow 无法分配设备进行操作

python - 为什么模型在归一化后表现不佳?

Python:计算 TP、FP、FN и TN

java - 如何学习鼠标移动?

python - 为什么我不能使用 --target 和 --editable 调用 pip?

python - 在 healpy 中对 HEALPix map 应用旋转

python - 使用 RNN 进行 POS 标记