本文代码下载
之前介绍了如何用 miniflow 实现 forward pass,现在我们来实现 backward propagation,也就是从loss function 出发,利用链式法则,推导出每一个参数的导数。
首先我们得添加Loss Function 相关的代码。
下面是 MSE (Mean Square Error)的实现。
C(w,b) = 1 / m * ∑_x ∣∣ y(x)−a ∣∣ ^2
class MSE(Node):
def __init__(self, y, a):
"""
The mean squared error cost function.
Should be used as the last node for a network.
"""
Node.__init__(self, [y, a])
def forward(self):
"""
Calculates the mean squared error.
"""
# NOTE: We reshape these to avoid possible matrix/vector broadcast
# errors.
#
# For example, if we subtract an array of shape (3,) from an array of shape
# (3,1) we get an array of shape(3,3) as the result when we want
# an array of shape (3,1) instead.
#
# Making both arrays (3,1) insures the result is (3,1) and does
# an elementwise subtraction as expected.
y = self.inbound_nodes[0].value.reshape(-1, 1)
a = self.inbound_nodes[1].value.reshape(-1, 1)
m = self.inbound_nodes[0].value.shape[0]
diff = y - a
self.value = np.mean(diff**2)
class Input(Node):
def backward(self):
# An Input node has no inputs so the gradient (derivative)
# is zero.
# The key, `self`, is reference to this object.
self.gradients = {self: 0}
# Weights and bias may be inputs, so you need to sum
# the gradient from output gradients.
for n in self.outbound_nodes:
self.gradients[self] += n.gradients[self]
class Linear(Node):
def backward(self):
"""
Calculates the gradient based on the output values.
"""
# Initialize a partial for each of the inbound_nodes.
self.gradients = {n: np.zeros_like(n.value) for n in self.inbound_nodes}
# Cycle through the outputs. The gradient will change depending
# on each output, so the gradients are summed over all outputs.
for n in self.outbound_nodes:
# Get the partial of the cost with respect to this node.
grad_cost = n.gradients[self]
# Set the partial of the loss with respect to this node's inputs.
self.gradients[self.inbound_nodes[0]] += np.dot(grad_cost, self.inbound_nodes[1].value.T)
# Set the partial of the loss with respect to this node's weights.
self.gradients[self.inbound_nodes[1]] += np.dot(self.inbound_nodes[0].value.T, grad_cost)
# Set the partial of the loss with respect to this node's bias.
self.gradients[self.inbound_nodes[2]] += np.sum(grad_cost, axis=0, keepdims=False)
class Sigmoid(Node):
def backward(self):
"""
Calculates the gradient using the derivative of
the sigmoid function.
"""
# Initialize the gradients to 0.
self.gradients = {n: np.zeros_like(n.value) for n in self.inbound_nodes}
# Sum the partial with respect to the input over all the outputs.
for n in self.outbound_nodes:
grad_cost = n.gradients[self]
sigmoid = self.value
self.gradients[self.inbound_nodes[0]] += sigmoid * (1 - sigmoid) * grad_cost
class MSE(Node):
def backward(self):
"""
Calculates the gradient of the cost.
"""
self.gradients[self.inbound_nodes[0]] = (2 / self.m) * self.diff
self.gradients[self.inbound_nodes[1]] = (-2 / self.m) * self.diff
def sgd_update(trainables, learning_rate=1e-2):
"""
Updates the value of each trainable with SGD.
Arguments:
`trainables`: A list of `Input` Nodes representing weights/biases.
`learning_rate`: The learning rate.
"""
for t in trainables:
t.value -= learning_rate * t.gradients[t]
# import numpy as np
from sklearn.datasets import load_boston
from sklearn.utils import shuffle, resample
# Load data
data = load_boston()
X_ = data['data']
y_ = data['target']
# Normalize data
X_ = (X_ - np.mean(X_, axis=0)) / np.std(X_, axis=0)
n_features = X_.shape[1]
n_hidden = 10
W1_ = np.random.randn(n_features, n_hidden)
b1_ = np.zeros(n_hidden)
W2_ = np.random.randn(n_hidden, 1)
b2_ = np.zeros(1)
# Neural network
X, y = Input(), Input()
W1, b1 = Input(), Input()
W2, b2 = Input(), Input()
l1 = Linear(X, W1, b1)
s1 = Sigmoid(l1)
l2 = Linear(s1, W2, b2)
cost = MSE(y, l2)
feed_dict = {
X: X_,
y: y_,
W1: W1_,
b1: b1_,
W2: W2_,
b2: b2_
}
epochs = 10
# Total number of examples
m = X_.shape[0]
batch_size = 11
steps_per_epoch = m // batch_size
graph = topological_sort(feed_dict)
trainables = [W1, b1, W2, b2]
print("Total number of examples = {}".format(m))
# Step 4
for i in range(epochs):
loss = 0
for j in range(steps_per_epoch):
# Step 1
# Randomly sample a batch of examples
X_batch, y_batch = resample(X_, y_, n_samples=batch_size)
# Reset value of X and y Inputs
X.value = X_batch
y.value = y_batch
# Step 2
forward_and_backward(graph)
# Step 3
sgd_update(trainables)
loss += graph[-1].value
print("Epoch: {}, Loss: {:.3f}".format(i+1, loss/steps_per_epoch))