--

利用深度神经网络来识别交通标志



这是 Udacity 无人驾驶工程师课程第一学期的第 2 个项目。

我们利用基于 LeNet 的神经网络来识别交通标志。
LeNet 的基本使用见于上一篇博客。 LeNet

而我的这个项目的实现代码在: github


1. 数据集

你可以用: German Traffic Sign Dataset

也可以用: 32*32 dataset

前者是一个完整的数据集,后者已经裁剪好成为了 32*32 的大小的,而且是 pickle 文件格式,不需要额外文本文件和 image 文件读取过程。

我们可以这样读取数据集:

  
import pickle

training_file = './train.p'
validation_file= './valid.p'
testing_file = './test.p'

with open(training_file, mode='rb') as f:
    train = pickle.load(f)
with open(validation_file, mode='rb') as f:
    valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
    test = pickle.load(f)
    
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']

print('X_train shape = ', X_train.shape)
print('y_train shape = ', y_train.shape')
print('X_valid shape = ', X_valid.shape)
print('y_valid shape = ', y_valid.shape)
print('X_test shape = ', X_test.shape)
print('y_test shape = ', y_test.shape)


训练集是:(34799, 32, 32, 3) ,(34799,)
验证集是:(4410, 32, 32, 3),(4410,)
测试集是:(12630, 32, 32, 3),(12630,)



2. 数据快速浏览

import matplotlib.pyplot as plt
i = 234
plt.imshow(X_train[i])
print(y_train[i])
plt.show()
plt.imshow(X_train[i])
print(y_train[i])
plt.show()

print(np.unique(y_train))
print(np.unique(y_valid))
print(np.unique(y_test))

我们可以看到样本基本上长这个样子:



另外有一个描述了 从label 到实际标签对应关系的索引从文件 signnames.csv
  
ClassId SignName
0 Speed limit (20km/h)
1 Speed limit (30km/h)
2 Speed limit (50km/h)
3 Speed limit (60km/h)
4 Speed limit (70km/h)
5 Speed limit (80km/h)
6 End of speed limit (80km/h)
7 Speed limit (100km/h)
8 Speed limit (120km/h)
9 No passing
10  No passing for vehicles over 3.5 metric tons
11  Right-of-way at the next intersection
12  Priority road
13  Yield
14  Stop
15  No vehicles
16  Vehicles over 3.5 metric tons prohibited
17  No entry
18  General caution
19  Dangerous curve to the left
20  Dangerous curve to the right
21  Double curve
22  Bumpy road
23  Slippery road
24  Road narrows on the right
25  Road work
26  Traffic signals
27  Pedestrians
28  Children crossing
29  Bicycles crossing
30  Beware of ice/snow
31  Wild animals crossing
32  End of all speed and passing limits
33  Turn right ahead
34  Turn left ahead
35  Ahead only
36  Go straight or right
37  Go straight or left
38  Keep right
39  Keep left
40  Roundabout mandatory
41  End of no passing
42  End of no passing by vehicles over 3.5 metric tons



3. model 可以参考 Yann LeKun 的这篇文章 作为一个 baseline 的accuracy 可以达到 98.97 %。
而人眼识别的正确率在 98.81 %。 现在我们来搭建自己的神经网络:

import numpy as np

X_train_input = np.mean(X_train, axis = 3)/255
X_train_input = np.expand_dims(X_train_input, axis = 4)
X_valid_input = np.mean(X_valid, axis = 3)/255
X_valid_input = np.expand_dims(X_valid_input, axis = 4)
X_test_input  = np.mean(X_test, axis = 3) / 255
X_test_input  = np.expand_dims(X_test_input, axis = 4)



先对数据进行归一化,使得 从 RGB 转化为灰度图,灰度图的值大小为 RGB 的平均值,再除以 255 归一到 0.0 ~ 1.0
同时保持数据维度为 4, [sample_cnt, width, height, layer]




搭建神经网络
import tensorflow as tf
def buildNet(x, keep_rate):
    mu = 0
    sigma = 0.01
    conv1_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 1, 16), mean = mu, stddev = sigma))
    conv1_b = tf.Variable(tf.zeros(16))
    conv1   = tf.nn.conv2d(x, conv1_W, strides=[1, 1, 1, 1], padding='VALID') + conv1_b
    conv1   = tf.nn.relu(conv1)
    
    conv2_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 16, 32), mean = mu, stddev = sigma))
    conv2_b = tf.Variable(tf.zeros(32))
    conv2   = tf.nn.conv2d(conv1, conv2_W, strides=[1, 1, 1, 1], padding='VALID') + conv2_b
    conv2   = tf.nn.relu(conv2)
    conv2   = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
    
    conv3_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 32, 64), mean = mu, stddev = sigma))
    conv3_b = tf.Variable(tf.zeros(64))
    conv3   = tf.nn.conv2d(conv2, conv3_W, strides=[1, 1, 1, 1], padding='VALID') + conv3_b
    conv3   = tf.nn.relu(conv3)
    conv3   = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

    conv4_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 64, 64), mean = mu, stddev = sigma))
    conv4_b = tf.Variable(tf.zeros(64))
    conv4   = tf.nn.conv2d(conv3, conv4_W, strides=[1, 1, 1, 1], padding='VALID') + conv4_b
    conv4   = tf.nn.relu(conv4)
    conv4   = tf.nn.max_pool(conv4, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
    
    fc0     = tf.contrib.layers.flatten(conv4)
    fc1_W   = tf.Variable(tf.truncated_normal(shape=(256, 128), mean = mu, stddev = sigma))
    fc1_b   = tf.Variable(tf.zeros(128))
    fc1     = tf.matmul(fc0, fc1_W) + fc1_b   
    fc1     = tf.nn.relu(fc1)
    fc1     = tf.nn.dropout(fc1, keep_rate)
    fc2_W   = tf.Variable(tf.truncated_normal(shape=(128, 64), mean = mu, stddev = sigma))
    fc2_b   = tf.Variable(tf.zeros(64))
    fc2     = tf.matmul(fc1, fc2_W) + fc2_b
    fc2     = tf.nn.relu(fc2)
    fc2     = tf.nn.dropout(fc2, keep_rate)

    fc3_W   = tf.Variable(tf.truncated_normal(shape=(64, n_classes), mean = mu, stddev = sigma))
    fc3_b   = tf.Variable(tf.zeros(n_classes))
    logits  = tf.matmul(fc2, fc3_W) + fc3_b
    
    return logits

x = tf.placeholder(dtype = tf.float32, shape = (None, 32, 32, 1))
y = tf.placeholder(dtype = tf.int32, shape = (None))
one_hot_y = tf.one_hot(y, n_classes)
keep_prob = tf.placeholder(dtype = tf.float32)

这里和LeNet 不同的是,我们的最后的 flatten 输出 n_classes 是 43 而不是 MNIST 数据集里面的 10




接下来是整个训练、预测、测试函数

from sklearn.utils import shuffle

batch_size = 128
learning_rate = 0.001

logits = buildNet(x, keep_prob)
with tf.name_scope('summaries'):
    cross_entropy_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels = one_hot_y, logits=logits))
    optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
    training_operation = optimizer.minimize(cross_entropy_loss)

    correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
    accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    # tf.summary.scalar("loss", cross_entropy_loss)
    # tf.summary.scalar("accuracy", accuracy_operation)

saver = tf.train.Saver()

merged_summary_op = tf.summary.merge_all()
def evaluate(sess, X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0.0
    for offset in range(0, num_examples, batch_size):
        batch_x, batch_y = X_data[offset:offset + batch_size], y_data[offset:offset+batch_size]
        # no use of dropout if evaluation the model
        feed = {x: batch_x, y: batch_y, keep_prob: 1.0}
        accuracy = sess.run(accuracy_operation, feed_dict=feed)
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

def train(X_data, y_data, epoch = 2, learning_rate = 0.001):
    assert len(X_data) == len(y_data)
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        train_writer = tf.summary.FileWriter(logs_path+ '/train/', graph=tf.get_default_graph())        
        print("Training begin...")
        print()
        num_examples = len(X_data)
        for i in range(epoch):
            print('epoch ' + str(i+1) + '/'+ str(epoch) +' begin...')
            X_data, y_data = shuffle(X_data, y_data)
            epoch_train_accuracy = 0.0
            for offset in range(0, num_examples, batch_size):
                end = offset + batch_size
                batch_x, batch_y = X_data[offset:end], y_data[offset:end]
                feed = { x: batch_x, y: batch_y, keep_prob: 0.75 }
                _, batch_train_accuracy = sess.run(
                    [training_operation, accuracy_operation], feed_dict= feed)
                epoch_train_accuracy += batch_train_accuracy * len(batch_x)
                
            epoch_train_accuracy /= num_examples
            print('train accuracy ={:.3f}'.format(epoch_train_accuracy))
            validation_accuracy = evaluate(sess, X_valid_input, y_valid)
            print("Validation Accuracy = {:.3f}".format(validation_accuracy))
            print()
        saver.save(sess, './lenet')
        train_writer.close()
        print("Model saved")

        
def test(X_data, y_data):
    assert len(X_data) == len(y_data)
    num_examples = len(X_data)
    with tf.Session() as sess:
        saver.restore(sess, tf.train.latest_checkpoint('.'))
        print('restore previously saved model under ./lenet')
        print("testing begin...")
        print()
        test_accuracy = evaluate(sess, X_data, y_data)
        print("testing accuracy = {:.3f}".format(test_accuracy))
        print()
    return test_accuracy



这里我们利用 Saver 来保存模型的状态。



4. 运行这个模型

train(X_train_input, y_train, epoch = 30) 
test_accuracy = test(X_test_input, y_test)
print("Test Accuracy = {:.3f}".format(test_accuracy))







5. 利用新的数据来进行测试
import cv2  
import os
import numpy as np

myimgs_gray = []
myimgs_color = []
dirname = 'mysamples'
for filename in os.listdir(dirname):
    if '.jpg' in filename:
        myimg = cv2.imread(os.path.join(dirname, filename))
        myimg = cv2.resize(myimg, (32, 32))
        myimgs_color.append(myimg)
        myimg = cv2.cvtColor(myimg, cv2.COLOR_BGR2GRAY)
        myimgs_gray.append(myimg)

print('total ', len(myimgs_gray), 'images from web search loaded')

myimgs_input =np.expand_dims(myimgs_gray, axis = 3)




我们再利用已经训练好的模型,来预测我们的样本
argmax_item = tf.argmax(logits, 1), 

def predict(X_data):
    num_examples = len(X_data)
    print('totally '+str(num_examples) + ' to be predicted')
    with tf.Session() as sess:
        saver.restore(sess, tf.train.latest_checkpoint('.'))
        print('restore previously saved model under ./lenet')
        result = sess.run(argmax_item, feed_dict = {x: X_data, keep_prob : 1.0})
        print(result)
    return result
predicts_y = predict(myimgs_input)