本文代码下载
之前介绍了如何用 miniflow 实现 forward pass,现在我们来实现 backward propagation,也就是从loss function 出发,利用链式法则,推导出每一个参数的导数。
首先我们得添加Loss Function 相关的代码。
下面是 MSE (Mean Square Error)的实现。
C(w,b) = 1 / m * ∑_x ∣∣ y(x)−a ∣∣ ^2
class MSE(Node): def __init__(self, y, a): """ The mean squared error cost function. Should be used as the last node for a network. """ Node.__init__(self, [y, a]) def forward(self): """ Calculates the mean squared error. """ # NOTE: We reshape these to avoid possible matrix/vector broadcast # errors. # # For example, if we subtract an array of shape (3,) from an array of shape # (3,1) we get an array of shape(3,3) as the result when we want # an array of shape (3,1) instead. # # Making both arrays (3,1) insures the result is (3,1) and does # an elementwise subtraction as expected. y = self.inbound_nodes[0].value.reshape(-1, 1) a = self.inbound_nodes[1].value.reshape(-1, 1) m = self.inbound_nodes[0].value.shape[0] diff = y - a self.value = np.mean(diff**2)
class Input(Node): def backward(self): # An Input node has no inputs so the gradient (derivative) # is zero. # The key, `self`, is reference to this object. self.gradients = {self: 0} # Weights and bias may be inputs, so you need to sum # the gradient from output gradients. for n in self.outbound_nodes: self.gradients[self] += n.gradients[self]
class Linear(Node): def backward(self): """ Calculates the gradient based on the output values. """ # Initialize a partial for each of the inbound_nodes. self.gradients = {n: np.zeros_like(n.value) for n in self.inbound_nodes} # Cycle through the outputs. The gradient will change depending # on each output, so the gradients are summed over all outputs. for n in self.outbound_nodes: # Get the partial of the cost with respect to this node. grad_cost = n.gradients[self] # Set the partial of the loss with respect to this node's inputs. self.gradients[self.inbound_nodes[0]] += np.dot(grad_cost, self.inbound_nodes[1].value.T) # Set the partial of the loss with respect to this node's weights. self.gradients[self.inbound_nodes[1]] += np.dot(self.inbound_nodes[0].value.T, grad_cost) # Set the partial of the loss with respect to this node's bias. self.gradients[self.inbound_nodes[2]] += np.sum(grad_cost, axis=0, keepdims=False)
class Sigmoid(Node): def backward(self): """ Calculates the gradient using the derivative of the sigmoid function. """ # Initialize the gradients to 0. self.gradients = {n: np.zeros_like(n.value) for n in self.inbound_nodes} # Sum the partial with respect to the input over all the outputs. for n in self.outbound_nodes: grad_cost = n.gradients[self] sigmoid = self.value self.gradients[self.inbound_nodes[0]] += sigmoid * (1 - sigmoid) * grad_cost
class MSE(Node): def backward(self): """ Calculates the gradient of the cost. """ self.gradients[self.inbound_nodes[0]] = (2 / self.m) * self.diff self.gradients[self.inbound_nodes[1]] = (-2 / self.m) * self.diff
def sgd_update(trainables, learning_rate=1e-2): """ Updates the value of each trainable with SGD. Arguments: `trainables`: A list of `Input` Nodes representing weights/biases. `learning_rate`: The learning rate. """ for t in trainables: t.value -= learning_rate * t.gradients[t]
# import numpy as np from sklearn.datasets import load_boston from sklearn.utils import shuffle, resample # Load data data = load_boston() X_ = data['data'] y_ = data['target'] # Normalize data X_ = (X_ - np.mean(X_, axis=0)) / np.std(X_, axis=0) n_features = X_.shape[1] n_hidden = 10 W1_ = np.random.randn(n_features, n_hidden) b1_ = np.zeros(n_hidden) W2_ = np.random.randn(n_hidden, 1) b2_ = np.zeros(1) # Neural network X, y = Input(), Input() W1, b1 = Input(), Input() W2, b2 = Input(), Input() l1 = Linear(X, W1, b1) s1 = Sigmoid(l1) l2 = Linear(s1, W2, b2) cost = MSE(y, l2) feed_dict = { X: X_, y: y_, W1: W1_, b1: b1_, W2: W2_, b2: b2_ } epochs = 10 # Total number of examples m = X_.shape[0] batch_size = 11 steps_per_epoch = m // batch_size graph = topological_sort(feed_dict) trainables = [W1, b1, W2, b2] print("Total number of examples = {}".format(m)) # Step 4 for i in range(epochs): loss = 0 for j in range(steps_per_epoch): # Step 1 # Randomly sample a batch of examples X_batch, y_batch = resample(X_, y_, n_samples=batch_size) # Reset value of X and y Inputs X.value = X_batch y.value = y_batch # Step 2 forward_and_backward(graph) # Step 3 sgd_update(trainables) loss += graph[-1].value print("Epoch: {}, Loss: {:.3f}".format(i+1, loss/steps_per_epoch))