文章目录
- 一、基本概念
- 1、深度学习流程
- 2、感知机结构与人工神经网络结构
- 3、反向传播(Back Propagation)导学
- 二、线性模型
- 1、线性模型计算流程
- 2、代码实现
- 3、小练习
- 三、梯度下降算法
- 1、梯度下降计算流程
- 2、代码实现
- 3、随机梯度下降(SGD)
- 四、反向传播
- 1、创建计算图
- 2、面临的问题
- 3、反向传播流程
- 4、代码实现
- 五、线性回归
- 1、pytorch深度学习代码流程
- 2、小批量(mini-batch)风格的深度学习
- 六、Logistic Regression Model
- 1、简述
- 2、代码实现
- 七、处理多维特征的输入
- 1、Multiple Dimension Logistic Regression Model
- 2、人工神经网络举例
- 3、代码实现
- 八、加载数据集
- 1、术语解析
- 2、DataLoader功能
- 3、代码实现
- 九、多分类问题
- 1、Softmax Layer
- 2、损失函数
- (1)NLLLoss
- (2)CrossEntropyLoss
- 3.代码实现
- 十、卷积神经网络CNN(基础)
- 1、卷积神经网络例图
- 2、卷积输入输出通道
- (1)单输入通道 单输出通道
- (2)N输入通道 单输出通道
- (3)N输入通道 M输出通道
- 3、卷积层
- (1)卷积核shape
- (2)padding
- (3)stride
- 4、最大池化层Max Pooling Layer
- 5、一个简单的卷积神经网络示例
- 十一、卷积神经网络CNN(高阶)
- 1、GoogLeNet
- (1)Inception Module
- (2)1x1 convolution
- (3)使用Inception Module的代码实现
- 2、Deep Residual Learning
- (1)梯度消失
- (2)用Residual解决梯度消失
- (3)代码实现
- (4)扩展阅读
- 十二、循环神经网络RNN(基础)
- 1、什么是RNNs
- 2、代码实现
- (1)用RNNCell实现
- (2)用RNN实现
- 3、小练习一
- (1)使用RNNCell实现
- (2)使用RNN实现
- 4、嵌入层 EMBEDDING
- (1)one-hot vs embedding
- (2)embedding的权重 查询矩阵
- (3)例如,查询第2个字符
- (4)代码实现
- 十三、循环神经网络RNN(高级)
- 1、问题
- 2、模型设计
- 3、Preparing Data
- 4、双向循环神经网络 Bi-direction RNN/LSTM/GRU
- 5、pack_padded_sequence()方法
- 6、代码实现
一、基本概念
1、深度学习流程
Input → Simple features → Additional layers of more abstract features → Mapping from features → Output
2、感知机结构与人工神经网络结构
3、反向传播(Back Propagation)导学
计算图如下:
二、线性模型
1、线性模型计算流程
公式:
y
^
=
x
∗
w
+
b
{\hat{y} = x * w + b}
y^=x∗w+b
其中
y
^
\hat{y}
y^为线性模型计算的输出预测值;
x为输入值;
w为权重;
b为偏置。
训练是为了求出w和b
可以简化模型为: y ^ = x ∗ w {\hat{y} = x * w} y^=x∗w
问题如下:
处理步骤如下:
①随机给出一个权重值
②计算loss 和损失函数值
loss函数公式:
l
o
s
s
=
(
y
^
−
y
)
2
=
(
x
∗
w
−
y
)
2
{loss = (\hat{y}-y)^2 = (x * w - y)^2}
loss=(y^−y)2=(x∗w−y)2
根据不同的w值,计算模型的每一个样本的loss。计算方法例如下图为w=3的情况
损失函数计算公式(这里以MSE均方误差为例,它为损失函数的一种):
c
o
s
t
=
1
N
∑
n
=
1
N
(
y
^
n
−
y
n
)
2
{cost = \frac 1 N \sum\limits_{n=1}^N(\hat y_n - y_n)^2}
cost=N1n=1∑N(y^n−yn)2
2、代码实现
import numpy as np
import matplotlib.pyplot as plt
# 准备训练集
x_data = [1.0, 2.0, 3.0] # data
y_data = [2.0, 4.0, 6.0] # label
def forward(x):
'''
前馈传播函数
'''
return x * w
def loss(x, y):
y_pred = forward(x)
return (y_pred - y) * (y_pred - y)
w_list = []
mse_list = []
for w in np.arange(0.0, 4.1, 0.1):
print("w =", w)
l_sum = 0
for x_val, y_val in zip(x_data, y_data):
y_pred_val = forward(x_val)
loss_val = loss(x_val, y_val)
l_sum += loss_val
print("\t", x_val,y_val, y_pred_val, loss_val)
print("MSE =", l_sum/3)
w_list.append(w)
mse_list.append(l_sum/3)
# 画图
plt.plot(w_list, mse_list)
plt.ylabel("Loss")
plt.xlabel("w")
plt.show()
3、小练习
把模型该为 y ^ = x ∗ w + b {\hat {y} = x * w+b} y^=x∗w+b
import numpy as np
import matplotlib.pyplot as plt
# 准备训练集
x_data = [1.0, 2.0, 3.0] # data
y_data = [2.0, 4.0, 6.0] # label
def forward(x):
'''
前馈传播函数
'''
return x * w + b
def loss(x, y):
y_pred = forward(x)
return (y_pred - y) * (y_pred - y)
w_list = []
b_list = []
mse_list = []
for b in np.arange(-2.0, 2.0, 0.1):
print("b =", b)
w_rowlist = []
b_rowlist = []
mse_rowlist = []
for w in np.arange(0.0, 4.1, 0.1):
print("w = ", w)
l_sum = 0
for x_val, y_val in zip(x_data, y_data):
y_pred_val = forward(x_val)
loss_val = loss(x_val, y_val)
l_sum += loss_val
print("\t", x_val,y_val, y_pred_val, loss_val)
print("MSE =", l_sum/3)
w_rowlist.append(w)
b_rowlist.append(b)
mse_rowlist.append(l_sum/3)
w_list.append(w_rowlist)
b_list.append(b_rowlist)
mse_list.append(mse_rowlist)
w_list = np.array(w_list)
b_list = np.array(b_list)
mse_list = np.array(mse_list)
# 画图
ax = plt.axes(projection="3d")
ax.plot_surface(w_list, b_list, mse_list, cmap="summer")
plt.show()
三、梯度下降算法
1、梯度下降计算流程
梯度下降算法是一种由训练数据,寻找最优模型的方法。
例如,对于线性模型
y
^
=
x
∗
w
{\hat y=x*w}
y^=x∗w 需解决最优化问题:
w
∗
=
arg
min
w
cos
(
w
)
{w^*=\arg \min \limits_w \cos(w)}
w∗=argwmincos(w)
∂
cos
(
w
)
∂
w
=
∂
∂
w
1
N
∑
n
=
1
N
(
x
n
⋅
w
−
y
n
)
2
=
1
N
∑
n
=
1
N
∂
∂
w
(
x
n
⋅
w
−
y
n
)
2
=
1
N
∑
n
=
1
N
2
⋅
(
x
n
⋅
w
−
y
n
)
∂
(
x
n
⋅
w
−
y
n
)
∂
w
=
1
N
∑
n
=
1
N
2
⋅
x
n
⋅
(
x
n
⋅
w
−
y
n
)
\begin{aligned} \frac {\partial \cos (w)} {\partial w} &= \frac \partial {\partial w} \frac 1 N \sum\limits_{n=1}^N(x_n \cdot w - y_n)^2\\ &= \frac 1 N \sum\limits_{n=1}^N \frac \partial {\partial w} (x_n \cdot w - y_n)^2\\ &= \frac 1 N \sum\limits_{n=1}^N 2 \cdot(x_n \cdot w - y_n) \frac {\partial (x_n \cdot w - y_n)} {\partial w}\\ &= \frac 1 N \sum\limits_{n=1}^N 2 \cdot x_n \cdot (x_n \cdot w - y_n) \end{aligned}
∂w∂cos(w)=∂w∂N1n=1∑N(xn⋅w−yn)2=N1n=1∑N∂w∂(xn⋅w−yn)2=N1n=1∑N2⋅(xn⋅w−yn)∂w∂(xn⋅w−yn)=N1n=1∑N2⋅xn⋅(xn⋅w−yn)
则update w值:
w
=
w
−
α
1
N
∑
n
=
1
N
2
⋅
x
n
⋅
(
x
n
⋅
w
−
y
n
)
{w=w - \alpha \frac 1 N \sum\limits_{n=1}^N 2 \cdot x_n \cdot (x_n \cdot w - y_n)}
w=w−αN1n=1∑N2⋅xn⋅(xn⋅w−yn)
2、代码实现
import matplotlib.pyplot as plt
import numpy as np
# 准备训练集
x_data = [1.0, 2.0, 3.0]
y_data = [2.0, 4.0, 6.0]
# 初始化权重值
w = 1.0
def forward(x):
return x * w
def cost(xs, ys):
cost = 0
for x, y in zip(xs, ys):
y_pred = forward(x)
cost += (y_pred - y) ** 2
return cost / len(xs)
def gradient(xs, ys):
grad = 0
for x, y in zip(xs, ys):
grad += 2 * x * (x * w - y)
return grad / len(xs)
cost_list = []
print("Predict (before training)", 4, forward(4))
for epoch in range(100):
cost_val = cost(x_data, y_data)
grad_val = gradient(x_data, y_data)
cost_list.append(cost_val)
w -= 0.01 * grad_val
print(f"epoch:{epoch},w={w},loss={cost_val}")
print("Predict (after training)", 4, forward(4))
plt.plot(np.arange(0,100), cost_list)
plt.xlabel("epoch")
plt.ylabel("cost")
plt.show()
注:在训练集中,随着epoch增加,cost会持续减小,如果cost增大了,表明这次训练失败,原因可能是学习率设置太大。
在测试集中,随着epoch增加,cost增大了,表示过拟合。
3、随机梯度下降(SGD)
即,将损失由原来的整体样本计算loss取平均,换成只求其中一个样本的loss,这样有可能克服鞍点问题,更有可能得到性能更好的模型。
import matplotlib.pyplot as plt
import numpy as np
# 准备训练集
x_data = [1.0, 2.0, 3.0]
y_data = [2.0, 4.0, 6.0]
# 初始化权重值
w = 1.0
def forward(x):
return x * w
def loss(x, y):
y_pred = forward(x)
return (y_pred - y) ** 2
def gradient(x, y):
return 2 * x * (x * w - y)
loss_list = []
print("Predict (before training)", 4, forward(4))
for epoch in range(100):
l = 0
for x, y in zip(x_data, y_data):
grad = gradient(x, y)
w -= 0.01 * grad
l += loss(x, y)
loss_list.append(l)
print(f"epoch={epoch},w={w},loss={l}")
print("Predict (after training)", 4, forward(4))
plt.plot(np.arange(0,100), loss_list)
plt.xlabel("epoch")
plt.ylabel("loss")
plt.savefig("11.png")
plt.show()
注:①批量(Mini-Batch)越小,性能越好,训练时间越长。
②现在通常也把Mini-Batch简称为Batch
四、反向传播
因为复杂的网络,用传统求导方法很难求,所以需要用反向传播。
矩阵求导可以参考书:matrix cookbook
1、创建计算图
2、面临的问题
3、反向传播流程
4、代码实现
import torch
x_data = [1.0, 2.0, 3.0]
y_data = [2.0, 4.0, 6.0]
w = torch.Tensor([1.0])
# 张量Tensor主要包含data和grad
# 一般张量Tensor的计算,Pytorch会创建计算图
# 默认requires_grad为False
# 需要计算该张量的梯度,需要requires_grad设置为True
w.requires_grad = True
def forward(x):
return x * w
def loss(x, y):
y_pred = forward(x)
return (y_pred - y) ** 2
# item方法可以把一个元素的张量转换成一个标量
print("predict (before training)", 4, forward(4).item())
for epoch in range(100):
for x, y in zip(x_data, y_data):
l = loss(x, y) # 前馈传播,计算loss,构建计算图
l.backward() # 反向传播,根据计算图计算梯度,将梯度存到对应的参数中,并释放计算图
# 使用data属性,张量计算的时候,不会构建计算图
print(f"\tx:{x},y:{y},grad:{w.grad.data}")
w.data = w.data - 0.01 * w.grad.data
w.grad.data.zero_() # 如果梯度不清零,下次计算的梯度会和本轮梯度叠加
print("progress:",epoch,l.item())
print("predict (after training)", 4, forward(4).item())
五、线性回归
1、pytorch深度学习代码流程
①准备数据集
②设计模型
③构造loss和优化器
④训练周期(forward, backward, update)
2、小批量(mini-batch)风格的深度学习
该风格下的输入输出需要用Tensor类型。
一般行表示样本,列表示特征 例3*4矩阵表示3个样本,每个样本有4个特征(维度)。
import torch
# 准备数据集
# 3行1列,表示三个样本,in_features大小为1
x_data = torch.Tensor([[1.0],
[2.0],
[3.0]])
# 3行1列,表示三个样本,out_features大小为1
y_data = torch.Tensor([[2.0],
[4.0],
[6.0]])
# 设计模型
class LinearModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear = torch.nn.Linear(1, 1) # 包含两个张量:weight和bias
def forward(self, x):
y_pred = self.linear(x) # 会构造计算图
return y_pred
model = LinearModel()
# 构造 loss 和 Optimizer
crterion = torch.nn.MSELoss(size_average=False)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 训练周期
for epoch in range(100):
y_pred = model(x_data) # 会去调用LinearModel的forward方法
loss = crterion(y_pred, y_data) # # 会构造计算图
print(epoch, loss)
optimizer.zero_grad() # 梯度清零
loss.backward() # 反向传播,计算梯度
optimizer.step() # 参数更新
# 输出 weight and bias
print(f"w={model.linear.weight.item()}, b={model.linear.bias.item()}")
# 预测
x_test = torch.Tensor([[4.0]])
y_test = model(x_test)
print("y_pred=", y_test.data)
六、Logistic Regression Model
1、简述
回归问题:输出值是属于连续的区间。
分类问题:输出值是输入是属于各个离散类的概率,往往取概率最大那一类。
Logistic Regression Model虽然是回归模型,但实际是处理分类问题的一个模型,我们需要将输出值,做以下映射:
R
→
[
0
,
1
]
{\R \to [0,1]}
R→[0,1]
这里用到 Logistic Function:
σ
(
x
)
=
1
1
+
e
−
x
{\sigma(x) = \frac 1 {1+\text e^{-x}}}
σ(x)=1+e−x1
注: Logistic Function是Sigmoid functions中的最有名的一种,所以有时候说Sigmoid functions来代指它。
模型对比:
损失函数对比:
Linear Regression:
l
o
s
s
=
(
y
^
−
y
)
2
=
(
x
⋅
w
−
y
)
2
{loss = (\hat y - y)^2 = (x \cdot w - y)^2}
loss=(y^−y)2=(x⋅w−y)2
Binary Classification:交叉熵
l
o
s
s
=
−
(
y
log
y
^
+
(
1
−
y
)
log
(
1
−
y
^
)
)
{loss = -(y \log \hat y + (1-y) \log (1 - \hat y))}
loss=−(ylogy^+(1−y)log(1−y^))
2、代码实现
import torch
import numpy as np
import matplotlib.pyplot as plt
# 准备数据集
# 3行1列,表示三个样本,in_features大小为1
x_data = torch.Tensor([[1.0],
[2.0],
[3.0]])
# 3行1列,表示三个样本,out_features大小为1
y_data = torch.Tensor([[0],
[0],
[1]])
# 设计模型
class LogisticRegressionModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear = torch.nn.Linear(1, 1)
def forward(self, x):
y_pred = torch.sigmoid(self.linear(x))
return y_pred
model = LogisticRegressionModel()
# Construct loss and optimizer
criterion = torch.nn.BCELoss(reduction="sum") # 二分类的交叉熵损失函数
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 训练周期
for epoch in range(1000):
y_pred = model(x_data)
loss = criterion(y_pred, y_data)
print(epoch, loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 作图
x = np.linspace(0, 10, 200)
x_t = torch.Tensor(x).view((200,1)) # 相当于numpy里的reshape
y_t = model(x_t)
y = y_t.data.numpy()
plt.plot(x, y)
plt.plot([0,10],[0.5,0.5],c="r")
plt.xlabel("Hours")
plt.ylabel("Probability of Pass")
plt.grid()
plt.show()
七、处理多维特征的输入
1、Multiple Dimension Logistic Regression Model
注:上图中假设每个样本有8个特征,
x
n
(
i
)
x_n^{(i)}
xn(i)表示第
i
i
i个样本的第
n
n
n个特征。
2、人工神经网络举例
3、代码实现
import numpy as np
import torch
# np.loadtxt可以取.csv.gz和.csv的文件数据。delimiter指令分隔符
xy = np.loadtxt("diabetes.csv", delimiter=",", dtype=np.float32)
# torch.from_numpy方法将 NumPy 数组转换为 PyTorch 张量
x_data = torch.from_numpy(xy[:, :-1])
y_data = torch.from_numpy(xy[:, [-1]])
# [-1]会保留二维形状,不加方括号会得到一维形式。等同 torch.from_numpy(xy[:, -1:])
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear1 = torch.nn.Linear(8, 6)
self.linear2 = torch.nn.Linear(6, 4)
self.linear3 = torch.nn.Linear(4, 1)
self.sigmoid = torch.nn.Sigmoid() # 激活函数
def forward(self, x):
x = self.sigmoid(self.linear1(x))
x = self.sigmoid(self.linear2(x))
x = self.sigmoid(self.linear3(x))
return x
model = Model()
criterion = torch.nn.BCELoss(reduction="mean") # reduction="mean"表示计算平均值
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
for epoch in range(100):
# forward
y_pred = model(x_data) # 这里没有使用Mini-Batch去训练
loss = criterion(y_pred, y_data)
print(epoch, loss.item())
# backward
optimizer.zero_grad()
loss.backward()
# update
optimizer.step()
八、加载数据集
1、术语解析
2、DataLoader功能
3、代码实现
import numpy as np
import torch
from torch.utils.data import Dataset
# Dataset是一个抽象类,自定义类可以继承它
from torch.utils.data import DataLoader
# DataLoader是一个帮助我们加载数据的类
class DiabetesDataset(Dataset):
def __init__(self, filepath):
xy = np.loadtxt(filepath, delimiter=",", dtype=np.float32)
self.len = xy.shape[0]
self.x_data = torch.from_numpy(xy[:, :-1])
self.y_data = torch.from_numpy(xy[:, [-1]])
# dataset[set] 调用该方法
def __getitem__(self, index):
return self.x_data[index], self.y_data[index]
# len(dataset)调用该方法
# 该方法必须返回一个整数,不然调用时,会报错
def __len__(self):
return self.len
dataset = DiabetesDataset("diabetes.csv.gz")
train_loader = DataLoader(dataset=dataset, batch_size=32,
shuffle=True,num_workers=2)
# shuffle=True表示每个epoch,打乱样本顺序,再组装成一个一个小批量
# num_workers 用于数据加载的子进程数
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear1 = torch.nn.Linear(8, 6)
self.linear2 = torch.nn.Linear(6, 4)
self.linear3 = torch.nn.Linear(4, 1)
self.sigmoid = torch.nn.Sigmoid() # 激活函数
def forward(self, x):
x = self.sigmoid(self.linear1(x))
x = self.sigmoid(self.linear2(x))
x = self.sigmoid(self.linear3(x))
return x
model = Model()
criterion = torch.nn.BCELoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
if __name__ == '__main__':
'''
在windows下,多进程模式,即num_workers不等于0时。以下代码需
要被封装,即不能顶格写,如上面加一行if语句,不然会报错。
'''
for epoch in range(100):
for i, data in enumerate(train_loader, 0):
# 1、prepare data
inputs, labels = data
# forward
y_pred = model(inputs)
loss = criterion(y_pred, labels)
print(f"epoch:{epoch}, i:{i}, loss:{loss.item()}")
# 3、backward
optimizer.zero_grad()
loss.backward()
# 4、updata
optimizer.step()
也可以直接使用torchvision.datasets的子类数据集
import numpy as np
import torch
from torchvision import transforms,datasets
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
train_dataset = datasets.MNIST(root="./dataset/minst", train=True,
transform=transforms.ToTensor(),
download=True)
test_dataset = datasets.MNIST(root="./dataset/minst", train=False,
transform=transforms.ToTensor(),
download=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=32,
shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=32,
shuffle=False)
# 后续代码......
九、多分类问题
1、Softmax Layer
假设
Z
l
∈
R
K
Z^l\in\R^K
Zl∈RK是最后一个线性层的输出,则Softmax函数为:
P
(
y
=
i
)
=
e
Z
i
∑
j
=
0
K
−
1
e
Z
j
,
i
∈
{
0
,
.
.
.
,
K
−
1
}
{P(y=i) = \frac {e^{Z_i}} {\sum^{K-1}_{j=0}e^{Z_j}} , i \in\{0,\ ... \ ,K-1\}}
P(y=i)=∑j=0K−1eZjeZi,i∈{0, ... ,K−1}
2、损失函数
(1)NLLLoss
L
o
s
s
(
Y
^
,
Y
)
=
−
Y
log
Y
^
Loss(\widehat Y, Y)=-Y\log\widehat Y
Loss(Y
,Y)=−YlogY
import numpy as np
y = np.array([1, 0, 0])
z = np.array([0.2, 0.1, -0.1])
y_pred = np.exp(z) / np.exp(z).sum()
loss = (-y * np.log(y_pred)).sum()
print(loss) # 0.9729189131256584
(2)CrossEntropyLoss
import torch
# 必须使用LongTensor型数据
y = torch.LongTensor([0]) # 表示第0个分类
z = torch.Tensor([[0.2, 0.1, -0.1]])
criterion = torch.nn.CrossEntropyLoss()
loss = criterion(z, y)
print(loss) # tensor(0.9729)
import torch
criterion = torch.nn.CrossEntropyLoss()
Y = torch.LongTensor([2, 0, 1])
Y_pred1 = torch.Tensor([[0.1, 0.2, 0.9],
[1.1, 0.1, 0.2],
[0.2, 2.1, 0.1]])
Y_pred2 = torch.Tensor([[0.8, 0.2, 0.3],
[0.2, 0.3, 0.5],
[0.2, 0.2, 0.5]])
l1 = criterion(Y_pred1, Y)
l2 = criterion(Y_pred2, Y)
print("batch loss1=", l1.data) # batch loss1= tensor(0.4966)
print("batch loss2=", l2.data) # batch loss2= tensor(1.2389)
注:CrossEntropyLoss <==> LogSoftmax + NLLLoss
3.代码实现
模型设计:
损失计算:
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
batch_size = 64
transform = transforms.Compose([
transforms.ToTensor(), # Convert the PIL Image to Tensor.
transforms.Normalize((0.1307,), (0.3081,))
]) # 数字为经验值,这里是灰度图,为单通道图像,所以每个元组只有一个数值
train_dataset = datasets.MNIST(root="./dataset/minst",train=True,
download=True,transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True,
batch_size=batch_size)
test_dataset = datasets.MNIST(root="./dataset/minst",train=False,
download=True,transform=transform)
test_loader = DataLoader(train_dataset, shuffle=False,
batch_size=batch_size)
class Net(torch.nn.Module):
def __init__(self):
super().__init__()
self.l1 = torch.nn.Linear(784, 512)
self.l2 = torch.nn.Linear(512, 256)
self.l3 = torch.nn.Linear(256,128)
self.l4 = torch.nn.Linear(128, 64)
self.l5 = torch.nn.Linear(64, 10)
def forward(self, x):
x = x.view(-1, 784)
# -1表示该维度自动计算,由torch.Size([64, 1, 28, 28])变为torch.Size([64, 784])
x = F.relu(self.l1(x))
x = F.relu(self.l2(x))
x = F.relu(self.l3(x))
x = F.relu(self.l4(x))
return self.l5(x)
model = Net()
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# momentum为冲量
def train(epoch):
running_loss = 0.0
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if batch_idx %300 ==299:
print("epoch:%d,batch_idx:%5d,loss:%.3f" %(epoch+1,batch_idx+1,running_loss/300))
running_loss = 0.0
def test():
correct = 0
total = 0
with torch.no_grad(): # 禁用梯度计算
for data in test_loader:
images, labels = data
# images torch.Size([64, 1, 28, 28])
# labels torch.Size([64])
outputs = model(images) # torch.Size([64, 10])
_, predicted = torch.max(outputs, dim=1)
# dim=1表示取每行最大值。torch.max返回(value,index)
total += len(labels)
correct += (predicted == labels).sum().item()
print("Accuracy on test set :%d %%" %(100 * correct / total))
if __name__ == '__main__':
for epoch in range(10):
train(epoch)
test()
注:①transforms.ToTensor()的作用是:将PIL图像转换为张量。即,
PIL Image:
Z
28
×
28
,
p
i
x
e
l
∈
{
0
,
.
.
.
,
255
}
\Z^{28\times28},pixel\in\{0,\ ...,\ 255\}
Z28×28,pixel∈{0, ..., 255}
转换为PyTorch Tensor:
R
1
×
28
×
28
,
p
i
x
l
e
∈
[
0
,
1
]
\R^{1\times28\times28},pixle\in[0,1]
R1×28×28,pixle∈[0,1]
②transforms.Normalize(mean,std)为标准化,公式如下:
P
i
x
e
l
n
o
r
m
=
P
i
x
e
l
o
r
i
g
i
n
−
m
e
a
n
s
t
d
Pixel_{norm}=\frac {Pixel_{origin}-mean} {std}
Pixelnorm=stdPixelorigin−mean
十、卷积神经网络CNN(基础)
1、卷积神经网络例图
注:torch.nn.Linear()线性层又叫做全连接层,因为上一层的每一个节点与下一层的每一个节点都有连接权重。
2、卷积输入输出通道
(1)单输入通道 单输出通道
(2)N输入通道 单输出通道
(3)N输入通道 M输出通道
3、卷积层
(1)卷积核shape
import torch
in_channels, out_channels = 5, 10
width, height = 100, 100
kernel_size = 3
batch_size = 1
input = torch.randn(batch_size,in_channels,width,height)
# 生成一个四维张量,张量中的值是从均值为0、方差为1的正态分布中抽取的随机数
conv_layer = torch.nn.Conv2d(in_channels,out_channels,
kernel_size=kernel_size)
output = conv_layer(input)
print(input.shape) # torch.Size([1, 5, 100, 100])
print(output.shape) # torch.Size([1, 10, 98, 98])
print(conv_layer.weight.shape) # torch.Size([10, 5, 3, 3])
(2)padding
import torch
input = [3,4,6,5,7,
2,4,6,8,2,
1,6,7,8,4,
9,7,4,6,2,
3,7,5,4,1]
input = torch.Tensor(input).view(1,1,5,5)
conv_layer = torch.nn.Conv2d(1,1,kernel_size=3,padding=1,bias=False)
kernel = torch.Tensor([1,2,3,4,5,6,7,8,9]).view(1,1,3,3)
conv_layer.weight.data = kernel.data
output = conv_layer(input)
print(output.data)
# tensor([[[[ 91., 168., 224., 215., 127.],
# [114., 211., 295., 262., 149.],
# [192., 259., 282., 214., 122.],
# [194., 251., 253., 169., 86.],
# [ 96., 112., 110., 68., 31.]]]])
(3)stride
import torch
input = [3,4,6,5,7,
2,4,6,8,2,
1,6,7,8,4,
9,7,4,6,2,
3,7,5,4,1]
input = torch.Tensor(input).view(1,1,5,5)
conv_layer = torch.nn.Conv2d(1,1,kernel_size=3,stride=2,bias=False)
kernel = torch.Tensor([1,2,3,4,5,6,7,8,9]).view(1,1,3,3)
conv_layer.weight.data = kernel.data
output = conv_layer(input)
print(output.data)
# tensor([[[[211., 262.],
# [251., 169.]]]])
4、最大池化层Max Pooling Layer
import torch
input = [3,4,6,5,
2,4,6,8,
1,6,7,8,
9,7,4,6,]
input = torch.Tensor(input).view(1,1,4,4)
maxpooling_layer = torch.nn.MaxPool2d(kernel_size=2) #默认步长等于核大小
output = maxpooling_layer(input)
print(output.data)
'''
tensor([[[[4., 8.],
[9., 8.]]]])
'''
5、一个简单的卷积神经网络示例
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
batch_size = 64
transform = transforms.Compose([
transforms.ToTensor(), # Convert the PIL Image to Tensor.
transforms.Normalize((0.1307,), (0.3081,))
]) # 数字为经验值,这里是灰度图,为单通道图像,所以每个元组只有一个数值
train_dataset = datasets.MNIST(root="./dataset/minst",train=True,
download=True,transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True,
batch_size=batch_size)
test_dataset = datasets.MNIST(root="./dataset/minst",train=False,
download=True,transform=transform)
test_loader = DataLoader(train_dataset, shuffle=False,
batch_size=batch_size)
class Net(torch.nn.Module):
def __init__(self):
super().__init__()
self.conv1 = torch.nn.Conv2d(1,10,kernel_size=5)
self.conv2 = torch.nn.Conv2d(10,20,kernel_size=5)
self.pooling = torch.nn.MaxPool2d(2) # 无权重,所以该对象可复用
self.fc = torch.nn.Linear(320, 10)
def forward(self, x):
# Flatten data from (n, 1, 28, 28) to (n, 784)
batch_size = x.size(0)
x = self.pooling(F.relu(self.conv1(x)))
x = self.pooling(F.relu(self.conv2(x)))
x = x.view(batch_size, -1) # flatten
x = self.fc(x)
return x
model = Net()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device) #将模型 model 移动到指定的设备上
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# momentum为冲量
def train(epoch):
running_loss = 0.0
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
# 将输入数据和标签移到GPU上
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if batch_idx %300 ==299:
print("epoch:%d,batch_idx:%5d,loss:%.3f" %(epoch+1,batch_idx+1,running_loss/300))
running_loss = 0.0
def test():
correct = 0
total = 0
with torch.no_grad(): # 禁用梯度计算
for data in test_loader:
images, labels = data
images, labels = images.to(device), labels.to(device)
# images torch.Size([64, 1, 28, 28])
# labels torch.Size([64])
outputs = model(images) # torch.Size([64, 10])
_, predicted = torch.max(outputs, dim=1)
# dim=1表示取每行最大值。torch.max返回(value,index)
total += len(labels)
correct += (predicted == labels).sum().item()
print("Accuracy on test set :%d %%" %(100 * correct / total))
if __name__ == '__main__':
for epoch in range(10):
train(epoch)
test()
十一、卷积神经网络CNN(高阶)
1、GoogLeNet
(1)Inception Module
(2)1x1 convolution
1x1 convolution可以大幅降低计算量,见下图:
(3)使用Inception Module的代码实现
注:Concatenate要求各个张量的mini-batch、W、H想到,沿着channel方向拼接。
import torch
from torch import nn
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
batch_size = 64
transform = transforms.Compose([
transforms.ToTensor(), # Convert the PIL Image to Tensor.
transforms.Normalize((0.1307,), (0.3081,))
]) # 数字为经验值,这里是灰度图,为单通道图像,所以每个元组只有一个数值
train_dataset = datasets.MNIST(root="./dataset/minst",train=True,
download=True,transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True,
batch_size=batch_size)
test_dataset = datasets.MNIST(root="./dataset/minst",train=False,
download=True,transform=transform)
test_loader = DataLoader(train_dataset, shuffle=False,
batch_size=batch_size)
class InceptionA(nn.Module):
def __init__(self, in_channels):
super().__init__()
self.branch1x1 = nn.Conv2d(in_channels, 16, kernel_size=1)
self.branch5x5_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
self.branch5x5_2 = nn.Conv2d(16, 24, kernel_size=5, padding=2)
self.branch3x3_1 = nn.Conv2d(in_channels, 16,kernel_size=1)
self.branch3x3_2 = nn.Conv2d(16, 24, kernel_size=3, padding=1)
self.branch3x3_3 = nn.Conv2d(24, 24, kernel_size=3, padding=1)
self.branch_pool = nn.Conv2d(in_channels, 24, kernel_size=1)
def forward(self, x):
branch1x1 = self.branch1x1(x)
branch5x5 = self.branch5x5_1(x)
branch5x5 = self.branch5x5_2(branch5x5)
branch3x3 = self.branch3x3_1(x)
branch3x3 = self.branch3x3_2(branch3x3)
branch3x3 = self.branch3x3_3(branch3x3)
branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1)
branch_pool = self.branch_pool(branch_pool)
outputs = [branch1x1, branch5x5, branch3x3, branch_pool]
return torch.cat(outputs, dim=1)
# 张量形状为(N,C,H,W),dim=1表示沿着channel方向拼接
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(88, 20, kernel_size=5)
self.incep1 = InceptionA(in_channels=10)
self.incep2 = InceptionA(in_channels=20)
self.mp = nn.MaxPool2d(2)
self.fc = nn.Linear(1408, 10)
def forward(self, x):
in_size = x.size(0) # 获得mini-batch的大小
x = F.relu(self.mp(self.conv1(x))) # torch.Size([64, 10, 12, 12])
x = self.incep1(x) # torch.Size([64, 88, 12, 12])
x = F.relu(self.mp(self.conv2(x))) # torch.Size([64, 20, 4, 4])
x = self.incep2(x) # torch.Size([64, 88, 4, 4])
x = x.view(in_size, -1) # torch.Size([64, 1408])
x = self.fc(x)
return x
model = Net()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device) #将模型 model 移动到指定的设备上
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# momentum为冲量
def train(epoch):
running_loss = 0.0
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
# 将输入数据和标签移到GPU上
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if batch_idx %300 ==299:
print("epoch:%d,batch_idx:%5d,loss:%.3f" %(epoch+1,batch_idx+1,running_loss/300))
running_loss = 0.0
def test():
correct = 0
total = 0
with torch.no_grad(): # 禁用梯度计算
for data in test_loader:
images, labels = data
images, labels = images.to(device), labels.to(device)
# images torch.Size([64, 1, 28, 28])
# labels torch.Size([64])
outputs = model(images) # torch.Size([64, 10])
_, predicted = torch.max(outputs, dim=1)
# dim=1表示取每行最大值。torch.max返回(value,index)
total += len(labels)
correct += (predicted == labels).sum().item()
print("Accuracy on test set :%d %%" %(100 * correct / total))
if __name__ == '__main__':
for epoch in range(10):
train(epoch)
test()
2、Deep Residual Learning
(1)梯度消失
串行网络并不是越深越好。出现上图中的问题,可能是出现过拟合,或者梯度消失。
梯度消失就是每一层的权重梯度如果小于1,那么离输入端最近的一些层的权重梯度近乎为零。在迭代训练的时候,它们将无法得到充分训练。
(2)用Residual解决梯度消失
解决梯度消失问题,可以使用Residual Network
注:Residual net中,最后一个relu激活前的两个输入F(x)和x的张量维度大小应该完全一样。
(3)代码实现
import torch
from torch import nn
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
batch_size = 64
transform = transforms.Compose([
transforms.ToTensor(), # Convert the PIL Image to Tensor.
transforms.Normalize((0.1307,), (0.3081,))
]) # 数字为经验值,这里是灰度图,为单通道图像,所以每个元组只有一个数值
train_dataset = datasets.MNIST(root="./dataset/minst",train=True,
download=True,transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True,
batch_size=batch_size)
test_dataset = datasets.MNIST(root="./dataset/minst",train=False,
download=True,transform=transform)
test_loader = DataLoader(train_dataset, shuffle=False,
batch_size=batch_size)
class ResidualBlock(nn.Module):
def __init__(self, channels):
super().__init__()
self.channels = channels
self.conv1 = nn.Conv2d(channels, channels,
kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(channels, channels,
kernel_size=3, padding=1)
def forward(self, x):
y = F.relu(self.conv1(x))
y = self.conv2(y)
return F.relu(x+y)
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 16, kernel_size=5)
self.conv2 = nn.Conv2d(16, 32, kernel_size=5)
self.mp = nn.MaxPool2d(2)
self.rblock1 = ResidualBlock(16)
self.rblock2 = ResidualBlock(32)
self.fc = nn.Linear(512, 10)
def forward(self, x):
in_size = x.size(0)
x = self.mp(F.relu(self.conv1(x)))
x = self.rblock1(x)
x = self.mp(F.relu(self.conv2(x)))
x = self.rblock2(x)
x = x.view(in_size, -1)
x = self.fc(x)
return x
model = Net()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device) #将模型 model 移动到指定的设备上
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# momentum为冲量
def train(epoch):
running_loss = 0.0
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
# 将输入数据和标签移到GPU上
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if batch_idx %300 ==299:
print("epoch:%d,batch_idx:%5d,loss:%.3f" %(epoch+1,batch_idx+1,running_loss/300))
running_loss = 0.0
def test():
correct = 0
total = 0
with torch.no_grad(): # 禁用梯度计算
for data in test_loader:
images, labels = data
images, labels = images.to(device), labels.to(device)
# images torch.Size([64, 1, 28, 28])
# labels torch.Size([64])
outputs = model(images) # torch.Size([64, 10])
_, predicted = torch.max(outputs, dim=1)
# dim=1表示取每行最大值。torch.max返回(value,index)
total += len(labels)
correct += (predicted == labels).sum().item()
print("Accuracy on test set :%d %%" %(100 * correct / total))
if __name__ == '__main__':
for epoch in range(10):
train(epoch)
test()
(4)扩展阅读
He K, Zhang X, Ren S, et al. Identity Mappings in Deep Residual Networks[C]
Huang G, Liu Z, Laurens V D M, et al. Densely Connected Convolutional Networks[J]. 2016:2261-2269.
十二、循环神经网络RNN(基础)
1、什么是RNNs
由于全连接网络,连接非常稠密,所以叫DNN
DNN的连接权重数量过多,所以模仿RNN的权重共享的概念,来减少权重数量,发明了RNN。RNN是专门用来处理带有序列模式的数据,即被预测的数据和前几个数据有很大关系,它们是有先后顺序关系的,比如股市、天气。
注:①RNN Cell本质是个线性层,只不过它的权重共享。
②
x
t
为
t
时刻的输入数据,
t
∈
{
1
,
2
,
3
,
.
.
.
}
x_t为t时刻的输入数据,t\in\{1,2,3,...\}
xt为t时刻的输入数据,t∈{1,2,3,...}
③
h
t
即
h
i
d
d
e
n
,为隐层
h_t即hidden,为隐层
ht即hidden,为隐层
2、代码实现
(1)用RNNCell实现
import torch
batch_size = 1
seq_len = 3
input_size = 4
hidden_size = 2
cell = torch.nn.RNNCell(input_size=input_size, hidden_size=hidden_size)
# (seq, batch, features)
dataset = torch.randn(seq_len, batch_size, input_size)
# 张量中的元素是从标准正态分布(mean=0, stddev=1)中随机采样得到的
hidden = torch.zeros(batch_size, hidden_size) # h_0
for idx, input in enumerate(dataset):
print("=" * 20, idx, "=" * 20)
print("Input size:", input.shape)
hidden = cell(input, hidden)
print("output size", hidden.shape)
print(hidden)
(2)用RNN实现
import torch
batch_size = 1
seq_len = 3
input_size = 4
hidden_size = 2
num_layers = 1
cell = torch.nn.RNN(input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers)
# (seqLen, batchSize, inputSize)
inputs = torch.randn(seq_len, batch_size, input_size)
hidden = torch.zeros(num_layers, batch_size, hidden_size) # h_0
#output of shape: (𝒔𝒆𝒒𝑺𝒊𝒛𝒆, 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊𝒛)
out, hidden = cell(inputs, hidden)
print("output size:", out.shape) # output size: torch.Size([3, 1, 2])
print("output:", out)
print("hidden size:", hidden.shape) # hidden size: torch.Size([1, 1, 2])
print("hidden:", hidden)
参数batch_first:如果为True,则输入输出的张量形状为:(𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑖𝑛𝑝𝑢𝑡_𝑠𝑖𝑧e)
import torch
batch_size = 1
seq_len = 3
input_size = 4
hidden_size = 2
num_layers = 1
cell = torch.nn.RNN(input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers, batch_first=True)
inputs = torch.randn(batch_size, seq_len, input_size)
hidden = torch.zeros(num_layers, batch_size, hidden_size) # h_0
out, hidden = cell(inputs, hidden)
print("output size:", out.shape) # output size: torch.Size([1, 3, 2])
print("output:", out)
print("hidden size:", hidden.shape) # hidden size: torch.Size([1, 1, 2])
print("hidden:", hidden)
3、小练习一
(1)使用RNNCell实现
import torch
############## Prepare Data ###############
input_size = 4
hidden_size = 4
batch_size = 1
idx2char = ["e", "h", "l", "o"]
x_data = [1, 0, 2, 2, 3] # hello
y_data = [3, 1, 2, 3, 2] # ohlol
one_hot_lookup = [[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]]
# 转换为独热向量
x_one_hot = [one_hot_lookup[x] for x in x_data]
# torch.Size([5, 1, 4]),即(𝒔𝒆𝒒𝑳𝒆𝒏, 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆,𝒊𝒏𝒑𝒖𝒕𝑺𝒊𝒛e)
inputs = torch.Tensor(x_one_hot).view(-1, batch_size, input_size)
# torch.Size([5, 1]),即(𝒔𝒆𝒒𝑳𝒆𝒏, 𝟏)
labels = torch.LongTensor(y_data).view(-1, 1)
############## Design Model ###############
class Model(torch.nn.Module):
def __init__(self, input_size, hidden_size, batch_size):
super().__init__()
self.batch_size = batch_size
self.input_size = input_size
self.hidden_size = hidden_size
self.rnncell = torch.nn.RNNCell(input_size=self.input_size,
hidden_size=self.hidden_size)
# shape of inputs:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆,𝒊𝒏𝒑𝒖𝒕𝑺𝒊𝒛e)
# shape of hidden:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊𝒛e)
def forward(self, input, hidden):
hidden = self.rnncell(input, hidden)
return hidden
def init_hidden(self): # h_0
return torch.zeros(self.batch_size, self.hidden_size)
net = Model(input_size, hidden_size, batch_size)
########### Loss and Optimizer ############
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.1)
############# Training Cycle ##############
for epoch in range(15):
loss = 0
optimizer.zero_grad()
hidden = net.init_hidden()
print("predicted string:",end="")
for input, label in zip(inputs, labels):
# shape of inputs:(𝒔𝒆𝒒𝑳𝒆𝒏, 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆,𝒊𝒏𝒑𝒖𝒕𝑺𝒊𝒛e)
# shape of input: (𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊𝒛�)
# shape of labels:(𝒔𝒆𝒒𝑺𝒊𝒛𝒆, 1)
# shape of label: (1)
hidden = net(input, hidden)
loss += criterion(hidden, label)
# 这里不用item方法,是因为需要构建构建计算图,计算总和的loss为最小
_, idx = hidden.max(dim=1)
print(idx2char[idx.item()], end="")
loss.backward()
optimizer.step()
print(",Epoch [%d/15] loss=%.4f" %(epoch+1, loss.item()))
(2)使用RNN实现
import torch
############## Prepare Data ###############
input_size = 4
hidden_size = 4
batch_size = 1
seq_len = 5
idx2char = ["e", "h", "l", "o"]
x_data = [1, 0, 2, 2, 3] # hello
y_data = [3, 1, 2, 3, 2] # ohlol
one_hot_lookup = [[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]]
# 转换为独热向量
x_one_hot = [one_hot_lookup[x] for x in x_data]
# shape of inputs:(𝒔𝒆𝒒𝑳𝒆𝒏, 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, input𝑺𝒊𝒛e)
inputs = torch.Tensor(x_one_hot).view(seq_len, batch_size, input_size)
# shape of labels:(𝒔𝒆𝒒𝑳𝒆𝒏 × 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 1)
labels = torch.LongTensor(y_data) # tensor([3, 1, 2, 3, 2])
############## Design Model ###############
class Model(torch.nn.Module):
def __init__(self, input_size, hidden_size, batch_size, num_layers=1):
super().__init__()
self.num_layers = num_layers # 1
self.batch_size = batch_size # 1
self.input_size = input_size # 4
self.hidden_size = hidden_size # 4
self.rnn = torch.nn.RNN(input_size=self.input_size,
hidden_size=self.hidden_size,
num_layers=num_layers)
def forward(self, input):
hidden = torch.zeros(self.num_layers,self.batch_size,
self.hidden_size)
# shape of hidden:(𝒏𝒖𝒎𝑳𝒂𝒚𝒆𝒓𝒔, 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊𝒛𝒆)
out, _ = self.rnn(input, hidden)
return out.view(-1, self.hidden_size)
# reshape out to:(𝒔𝒆𝒒𝑳𝒆𝒏 × 𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊𝒛e)
net = Model(input_size, hidden_size, batch_size)
########### Loss and Optimizer ############
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.1)
############# Training Cycle ##############
for epoch in range(15):
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, idx = outputs.max(dim=1)
idx = idx.data.numpy()
print("Predicted:", "".join([idx2char[x] for x in idx]), end="")
print(",Epoch [%d/15] loss=%.3f" %(epoch+1, loss.item()))
4、嵌入层 EMBEDDING
(1)one-hot vs embedding
(2)embedding的权重 查询矩阵
(3)例如,查询第2个字符
(4)代码实现
class torch.nn.Embedding()主要有两个参数:
num_embeddings(int):inputSize
embedding_dim(int):embeddingSize
shape:
Input:包含提取索引的LongTensor的形状
Output:(*,embedding_dim),*是input 的形状
class torch.nn.Linear()参数和前文相同。
shape:
Input:(N, *, in_features),*表示任意数量额外维度
Output:(N, *, in_features),除了最后一个维度外,所有维度的形状都与输入相同。
class torcn.nn.CrossEntropy()参数经常缺省
shape:
import torch
num_class = 4
input_size = 4
hidden_size = 8
embedding_size = 10
num_layers = 2
batch_size = 1
seq_len = 5
idx2char = ['e', 'h', 'l', 'o']
x_data = [[1, 0, 2, 2, 3]] # (batch, seq_len)
y_data = [3, 1, 2, 3, 2] # (batch * seq_len)
inputs = torch.LongTensor(x_data)
labels = torch.LongTensor(y_data)
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
self.emb = torch.nn.Embedding(input_size, embedding_size)
self.rnn = torch.nn.RNN(input_size=embedding_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True)
self.fc = torch.nn.Linear(hidden_size, num_class)
def forward(self, x):
hidden = torch.zeros(num_layers, x.size(0), hidden_size)
x = self.emb(x)
'''emb(x)
Input应该是LongTensor:(batchSize, seqlen)
Output的形状:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏, 𝒆𝒎𝒃𝒆𝒅𝒅𝒊𝒏𝒈𝑺𝒊𝒛e)
'''
x, _ = self.rnn(x, hidden)
'''rnn()
Input:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏, 𝒆𝒎𝒃𝒆𝒅𝒅𝒊𝒏𝒈𝑺𝒊𝒛e)
Output:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊ze)
'''
x = self.fc(x)
'''fc()
Input:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏, 𝒉𝒊𝒅𝒅𝒆𝒏𝑺𝒊𝒛e)
Output:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆, 𝒔𝒆𝒒𝑳𝒆𝒏,𝒏𝒖𝒎𝑪𝒍𝒂𝒔s)
'''
return x.view(-1, num_class)
# Reshape result to use Cross Entropy:(𝒃𝒂𝒕𝒄𝒉𝑺𝒊𝒛𝒆 × 𝒔𝒆𝒒𝑳𝒆𝒏, 𝒏𝒖𝒎𝑪𝒍𝒂𝒔s)
net = Model()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.05)
for epoch in range(15):
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, idx = outputs.max(dim=1)
idx = idx.data.numpy()
print("Predicted:", "".join([idx2char[x] for x in idx]), end="")
print(", Epoch [%d/15] loss = %.3f" %(epoch+1, loss.item()))
十三、循环神经网络RNN(高级)
1、问题
2、模型设计
3、Preparing Data
Convert name to tensor
此外还需返回将序列长度列表和该tensor一同返回。
country data 处理
将country数据转化为一个字典,即代码中类NameDataset中的self.country_dict,如下表,country为key,Index为value
4、双向循环神经网络 Bi-direction RNN/LSTM/GRU
5、pack_padded_sequence()方法
pytorch中支持pack_padded_sequence()方法将序列padding的0对象的向量全都去除,返回一个PackedSquence对象。该PackedSquence对象,在RNN/LSTM/GRU中,都支持作为输入对象。
6、代码实现
import csv
import gzip
import math, time
import torch, torchvision
import matplotlib.pyplot as plt
import numpy as np
from torch.nn.utils.rnn import pack_padded_sequence
from torch.utils.data import Dataset, DataLoader
############### Parameter ###############
HIDDEN_SIZE = 100
BATCH_SIZE = 256
N_LAYER = 2 # GRU的层数
N_EPOCHS = 3
N_CHARS = 128
USE_GPU = True
################# 计时函数 ################
def time_since(since):
s = time.time() - since
m = math.floor(s / 60)
s -= m * 60
return "%dm %ds" % (m, s)
############# Preparing Data ############
def name2list(name):
'''
将输入数据name,由字符串转换成 ASCII 列表
'''
arr = [ord(c) for c in name]
return arr, len(arr)
def create_tensor(tensor):
if USE_GPU:
device = torch.device("cuda:0")
tensor = tensor.to(device)
return tensor
def make_tensors(names, contries):
sequences_and_lengths = [name2list(name) for name in names]
name_sequences = [sl[0] for sl in sequences_and_lengths]
seq_lengths = torch.LongTensor([sl[1] for sl in sequences_and_lengths])
contries = contries.long()
# make tensor of name, padding zero, BatchSize x SeqLen
seq_tensor = torch.zeros(len(name_sequences), seq_lengths.max()).long()
# .max()方法是返回输入张量中最大的值,组成的单个值的张量
# .long()方法是将已有张量转换为LongTensor类型
for idx, (seq, seq_len) in enumerate(zip(name_sequences, seq_lengths), 0):
seq_tensor[idx, :seq_len] = torch.LongTensor(seq)
# sort by length to use pack_padded_sequence
seq_lengths, perm_idx = seq_lengths.sort(dim=0, descending=True)
# .sort()方法返回(values:tensor, indices:tensor),indices为values在原数据中对应的索引
seq_tensor = seq_tensor[perm_idx] # BatchSize x SeqLen
contries = contries[perm_idx]
return create_tensor(seq_tensor), create_tensor(seq_lengths), create_tensor(contries)
class NameDataset(Dataset):
def __init__(self, is_train_set=True):
filename = "dataset/names_train.csv.gz" if is_train_set else "dataset/names_test.csv.gz"
with gzip.open(filename, "rt") as f:
# rt模式:以文本字符串的形式读取文件内容
reader = csv.reader(f)
rows = list(reader) # [['Adsit', 'Czech'], ['Ajdrna', 'Czech'],...]
self.names = [row[0] for row in rows]
self.len = len(self.names)
self.countries = [row[1] for row in rows]
self.country_list = sorted(set(self.countries)) # 返回country列表
self.country_dict = self.getCountryDict()
self.country_num = len(self.country_list)
def __getitem__(self, index):
return self.names[index], self.country_dict[self.countries[index]]
# 返回(name:str, country:int)
def __len__(self):
return self.len
def getCountryDict(self):
country_dict = dict()
for idx, country_name in enumerate(self.country_list, 0):
country_dict[country_name] = idx
return country_dict
def idx2country(self, index):
return self.country_list[index]
def getCountriesNum(self):
return self.country_num
trainset = NameDataset(is_train_set=True)
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
testset = NameDataset(is_train_set=False)
testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)
N_COUNTRY = trainset.getCountriesNum() # 模型输出的类别大小
############## Model Design #############
class RNNClassifier(torch.nn.Module):
def __init__(self, input_size, hidden_size, output_size, n_layers=1, bidirectional=True):
super().__init__()
self.hidden_size = hidden_size
self.n_layers = n_layers
self.n_directions = 2 if bidirectional else 1 # 2表示选择双向RNN/LSTM/GRU
self.embedding = torch.nn.Embedding(input_size, hidden_size)
'''Embedding Layer
input:(𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒)
output:(𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧e)
'''
self.gru = torch.nn.GRU(hidden_size, hidden_size, n_layers,
bidirectional=bidirectional)
'''GRU Layer
inputs:
input (𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧)
ℎ𝑖𝑑𝑑𝑒n (𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒)
outputs:
output (𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛s)
hidden (nLayers * nDirections, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧e)
'''
self.fc = torch.nn.Linear(hidden_size * self.n_directions, output_size)
def _init_hidden(self, batch_size):
hidden = torch.zeros(self.n_layers * self.n_directions,
batch_size, self.hidden_size)
return create_tensor(hidden)
def forward(self, input, seq_lengths):
# input shape: B x S -> S x B
input = input.t()
batch_size = input.size(1)
hidden = self._init_hidden(batch_size)
# h_0: (𝑛𝐿𝑎𝑦𝑒𝑟 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧e)
embedding = self.embedding(input)
# embedding的形状 (𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧e)
# 加速运算,将padding的数据全部去除,重新打包
seq_lengths = seq_lengths.cpu()
# pack_padded_sequence要求该张量必须是1D CPU int64 tensor
gru_input = pack_padded_sequence(embedding, seq_lengths)
# 第一个参数形状:(𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒)
# 第二个参数:是一个tonsor,每一个batch元素的序列长度的列表
# 输出PackedSquence对象,形状: (𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧e)
output, hidden = self.gru(gru_input, hidden)
# 输出的output是一个PackedSequence对象
# 输出的hidden形状:(𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧e)
if self.n_directions == 2:
hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1)
else:
hidden_cat = hidden[-1]
fc_output = self.fc(hidden_cat)
return fc_output
########### One Epoch Training ##########
def trainModel():
total_loss = 0
for i, (names, countries) in enumerate(trainloader, 1):
inputs, seq_lengths, target = make_tensors(names, countries)
output = classifier(inputs, seq_lengths)
loss = criterion(output, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if i % 10 == 0:
print(f"[{time_since(start)} Epoch {epoch}]", end="")
print(f"[{i * len(inputs)}/{len(trainset)}]", end="")
print(f"loss={total_loss / (i * len(inputs))}")
return total_loss
################# Testing ###############
def testModel():
correct = 0
total = len(testset)
print("evaluating trained model ...")
with torch.no_grad():
for i, (names, countries) in enumerate(testloader, 1):
inputs, seq_lengths, target = make_tensors(names, countries)
output = classifier(inputs, seq_lengths)
pred = output.max(dim=1, keepdim=True)[1]
# .max()指定dim=1,是按行取最大值,返回(values:tensor,indices:tensor)
# keepdim=True 保持原有维度
correct += pred.eq(target.view_as(pred)).sum().item()
# target.view_as(pred)是将target重塑为与pred相同
# .eq()方法比较了 pred 和 target 中对应位置的值是否相等,返回由布尔值组成的张量
percent = "%.2f" % (100 * correct / total)
print(f"Test set: Accuracy {correct}/{total} {percent}%")
return correct / total
############### main cycle ##############
if __name__ == '__main__':
classifier = RNNClassifier(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYER)
if USE_GPU:
device = torch.device("cuda:0")
classifier.to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)
start = time.time()
print("Training for %d epochs..." % N_EPOCHS)
acc_list = []
for epoch in range(1, N_EPOCHS + 1):
# Train cycle
trainModel()
acc = testModel()
acc_list.append(acc)
############## draw a plot ##############
epoch = np.arange(1, len(acc_list) + 1, 1)
acc_list = np.array(acc_list)
plt.plot(epoch, acc_list)
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.grid()
plt.show()