1. SGD梯度下降公式
当梯度大于0时,变小,往左边找梯度接近0的值。
当梯度小于0时,减去一个负数会变大,往右边找梯度接近0的值,此时梯度从负数到0上升
2.Adam优化器实现原理
#coding:utf8
import torch
import torch.nn as nn
import numpy as np
import copy
"""
基于pytorch的网络编写
手动实现梯度计算和反向传播
加入激活函数
"""
class TorchModel(nn.Module):
def __init__(self, hidden_size):
super(TorchModel, self).__init__()
self.layer = nn.Linear(hidden_size, hidden_size, bias=False) #w = hidden_size * hidden_size wx+b -> wx
self.activation = torch.sigmoid
self.loss = nn.functional.mse_loss #loss采用均方差损失
#当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
y_pred = self.layer(x)
y_pred = self.activation(y_pred)
if y is not None:
return self.loss(y_pred, y)
else:
return y_pred
#自定义模型,接受一个参数矩阵作为入参
class DiyModel:
def __init__(self, weight):
self.weight = weight
def forward(self, x, y=None):
x = np.dot(x, self.weight.T)
y_pred = self.diy_sigmoid(x)
if y is not None:
return self.diy_mse_loss(y_pred, y)
else:
return y_pred
#sigmoid
def diy_sigmoid(self, x):
return 1 / (1 + np.exp(-x))
#手动实现mse,均方差loss
def diy_mse_loss(self, y_pred, y_true):
return np.sum(np.square(y_pred - y_true)) / len(y_pred)
#手动实现梯度计算
def calculate_grad(self, y_pred, y_true, x):
#前向过程
# wx = np.dot(self.weight, x)
# sigmoid_wx = self.diy_sigmoid(wx)
# loss = self.diy_mse_loss(sigmoid_wx, y_true)
#反向过程
# 均方差函数 (y_pred - y_true) ^ 2 / n 的导数 = 2 * (y_pred - y_true) / n , 结果为2维向量
grad_mse = 2/len(x) * (y_pred - y_true)
# sigmoid函数 y = 1/(1+e^(-x)) 的导数 = y * (1 - y), 结果为2维向量
grad_sigmoid = y_pred * (1 - y_pred)
# wx矩阵运算,见ppt拆解, wx = [w11*x0 + w21*x1, w12*x0 + w22*x1]
#导数链式相乘
grad_w11 = grad_mse[0] * grad_sigmoid[0] * x[0]
grad_w12 = grad_mse[1] * grad_sigmoid[1] * x[0]
grad_w21 = grad_mse[0] * grad_sigmoid[0] * x[1]
grad_w22 = grad_mse[1] * grad_sigmoid[1] * x[1]
grad = np.array([[grad_w11, grad_w12],
[grad_w21, grad_w22]])
#由于pytorch存储做了转置,输出时也做转置处理
return grad.T
#梯度更新
def diy_sgd(grad, weight, learning_rate):
return weight - learning_rate * grad
#adam梯度更新
def diy_adam(grad, weight):
#参数应当放在外面,此处为保持后方代码整洁简单实现一步
alpha = 1e-3 #学习率
beta1 = 0.9 #超参数
beta2 = 0.999 #超参数
eps = 1e-8 #超参数
t = 0 #初始化
mt = 0 #初始化
vt = 0 #初始化
#开始计算
t = t + 1
gt = grad
mt = beta1 * mt + (1 - beta1) * gt
vt = beta2 * vt + (1 - beta2) * gt ** 2
mth = mt / (1 - beta1 ** t)
vth = vt / (1 - beta2 ** t)
weight = weight - (alpha * mth/ (np.sqrt(vth) + eps))
return weight
x = np.array([-0.5, 0.1]) #输入
y = np.array([0.1, 0.2]) #预期输出
#torch实验
torch_model = TorchModel(2)
torch_model_w = torch_model.state_dict()["layer.weight"]
print(torch_model_w, "初始化权重")
numpy_model_w = copy.deepcopy(torch_model_w.numpy())
#numpy array -> torch tensor, unsqueeze的目的是增加一个batchsize维度
torch_x = torch.from_numpy(x).float().unsqueeze(0)
torch_y = torch.from_numpy(y).float().unsqueeze(0)
#torch的前向计算过程,得到loss
torch_loss = torch_model(torch_x, torch_y)
print("torch模型计算loss:", torch_loss)
# #手动实现loss计算
diy_model = DiyModel(numpy_model_w)
diy_loss = diy_model.forward(x, y)
print("diy模型计算loss:", diy_loss)
# # #设定优化器
learning_rate = 0.1
# optimizer = torch.optim.SGD(torch_model.parameters(), lr=learning_rate)
optimizer = torch.optim.Adam(torch_model.parameters())
# optimizer.zero_grad()
# #
# # #pytorch的反向传播操作
torch_loss.backward()
print(torch_model.layer.weight.grad, "torch 计算梯度") #查看某层权重的梯度
# # #手动实现反向传播
grad = diy_model.calculate_grad(diy_model.forward(x), y, x)
print(grad, "diy 计算梯度")
# #
# #torch梯度更新
optimizer.step()
# # #查看更新后权重
update_torch_model_w = torch_model.state_dict()["layer.weight"]
print(update_torch_model_w, "torch更新后权重")
# #
# # #手动梯度更新
# diy_update_w = diy_sgd(grad, numpy_model_w, learning_rate)
diy_update_w = diy_adam(grad, numpy_model_w)
print(diy_update_w, "diy更新权重")
3. RNN
#coding:utf8
import torch
import torch.nn as nn
import numpy as np
"""
手动实现简单的神经网络
使用pytorch实现RNN
手动实现RNN
对比
"""
class TorchRNN(nn.Module):
def __init__(self, input_size, hidden_size):
super(TorchRNN, self).__init__()
self.layer = nn.RNN(input_size, hidden_size, bias=False, batch_first=True)
def forward(self, x):
return self.layer(x)
#自定义RNN模型
class DiyModel:
def __init__(self, w_ih, w_hh, hidden_size):
self.w_ih = w_ih
self.w_hh = w_hh
self.hidden_size = hidden_size
def forward(self, x):
ht = np.zeros((self.hidden_size))
output = []
for xt in x:
ux = np.dot(self.w_ih, xt)
wh = np.dot(self.w_hh, ht)
ht_next = np.tanh(ux + wh)
output.append(ht_next)
ht = ht_next
return np.array(output), ht
x = np.array([[1, 2, 3],
[3, 4, 5],
[5, 6, 7]]) #网络输入
#torch实验
hidden_size = 4
torch_model = TorchRNN(3, hidden_size)
# print(torch_model.state_dict())
w_ih = torch_model.state_dict()["layer.weight_ih_l0"]
w_hh = torch_model.state_dict()["layer.weight_hh_l0"]
print(w_ih, w_ih.shape)
print(w_hh, w_hh.shape)
#
torch_x = torch.FloatTensor([x])
output, h = torch_model.forward(torch_x)
print(h)
print(output.detach().numpy(), "torch模型预测结果")
print(h.detach().numpy(), "torch模型预测隐含层结果")
print("---------------")
diy_model = DiyModel(w_ih, w_hh, hidden_size)
output, h = diy_model.forward(x)
print(output, "diy模型预测结果")
print(h, "diy模型预测隐含层结果")
#coding:utf8
import torch
import torch.nn as nn
import numpy as np
"""
手动实现简单的神经网络
使用pytorch实现CNN
手动实现CNN
对比
"""
#一个二维卷积
class TorchCNN(nn.Module):
def __init__(self, in_channel, out_channel, kernel):
super(TorchCNN, self).__init__()
self.layer = nn.Conv2d(in_channel, out_channel, kernel, bias=False)
def forward(self, x):
return self.layer(x)
#自定义CNN模型
class DiyModel:
def __init__(self, input_height, input_width, weights, kernel_size):
self.height = input_height
self.width = input_width
self.weights = weights
self.kernel_size = kernel_size
def forward(self, x):
output = []
for kernel_weight in self.weights:
kernel_weight = kernel_weight.squeeze().numpy() #shape : 2x2
kernel_output = np.zeros((self.height - kernel_size + 1, self.width - kernel_size + 1))
for i in range(self.height - kernel_size + 1):
for j in range(self.width - kernel_size + 1):
window = x[i:i+kernel_size, j:j+kernel_size]
kernel_output[i, j] = np.sum(kernel_weight * window) # np.dot(a, b) != a * b
output.append(kernel_output)
return np.array(output)
x = np.array([[0.1, 0.2, 0.3, 0.4],
[-3, -4, -5, -6],
[5.1, 6.2, 7.3, 8.4],
[-0.7, -0.8, -0.9, -1]]) #网络输入
#torch实验
in_channel = 1
out_channel = 3
kernel_size = 2
torch_model = TorchCNN(in_channel, out_channel, kernel_size)
print(torch_model.state_dict())
torch_w = torch_model.state_dict()["layer.weight"]
# print(torch_w.numpy().shape)
torch_x = torch.FloatTensor([[x]])
output = torch_model.forward(torch_x)
output = output.detach().numpy()
print(output, output.shape, "torch模型预测结果\n")
print("---------------")
diy_model = DiyModel(x.shape[0], x.shape[1], torch_w, kernel_size)
output = diy_model.forward(x)
print(output, "diy模型预测结果")
#coding:utf8
import torch
import torch.nn as nn
import numpy as np
import random
import json
import matplotlib.pyplot as plt
"""
基于pytorch的网络编写
实现一个网络完成一个简单nlp任务
判断文本中是否有某些特定字符出现
"""
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab):
super(TorchModel, self).__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) #embedding层
self.pool = nn.AvgPool1d(sentence_length) #池化层
self.classify = nn.Linear(vector_dim, 1) #线性层
self.activation = torch.sigmoid #sigmoid归一化函数
self.loss = nn.functional.mse_loss #loss函数采用均方差损失
#当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
x = self.embedding(x) #(batch_size, sen_len) -> (batch_size, sen_len, vector_dim)
x = x.transpose(1, 2) #(batch_size, sen_len, vector_dim) -> (batch_size, vector_dim, sen_len)
x = self.pool(x) #(batch_size, vector_dim, sen_len)->(batch_size, vector_dim, 1)
x = x.squeeze() #(batch_size, vector_dim, 1) -> (batch_size, vector_dim)
x = self.classify(x) #(batch_size, vector_dim) -> (batch_size, 1) 3*5 5*1 -> 3*1
y_pred = self.activation(x) #(batch_size, 1) -> (batch_size, 1)
if y is not None:
return self.loss(y_pred, y) #预测值和真实值计算损失
else:
return y_pred #输出预测结果
#字符集随便挑了一些字,实际上还可以扩充
#为每个字生成一个标号
#{"a":1, "b":2, "c":3...}
#abc -> [1,2,3]
def build_vocab():
chars = "你我他defghijklmnopqrstuvwxyz" #字符集
vocab = {"pad":0}
for index, char in enumerate(chars):
vocab[char] = index+1 #每个字对应一个序号
vocab['unk'] = len(vocab) #26
return vocab
#随机生成一个样本
#从所有字中选取sentence_length个字
#反之为负样本
def build_sample(vocab, sentence_length):
#随机从字表选取sentence_length个字,可能重复
x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]
#指定哪些字出现时为正样本
if set("你我他") & set(x):
y = 1
#指定字都未出现,则为负样本
else:
y = 0
x = [vocab.get(word, vocab['unk']) for word in x] #将字转换成序号,为了做embedding
return x, y
#建立数据集
#输入需要的样本数量。需要多少生成多少
def build_dataset(sample_length, vocab, sentence_length):
dataset_x = []
dataset_y = []
for i in range(sample_length):
x, y = build_sample(vocab, sentence_length)
dataset_x.append(x)
dataset_y.append([y])
return torch.LongTensor(dataset_x), torch.FloatTensor(dataset_y)
#建立模型
def build_model(vocab, char_dim, sentence_length):
model = TorchModel(char_dim, sentence_length, vocab)
return model
#测试代码
#用来测试每轮模型的准确率
def evaluate(model, vocab, sample_length):
model.eval()
x, y = build_dataset(200, vocab, sample_length) #建立200个用于测试的样本
print("本次预测集中共有%d个正样本,%d个负样本"%(sum(y), 200 - sum(y)))
correct, wrong = 0, 0
with torch.no_grad():
y_pred = model(x) #模型预测
for y_p, y_t in zip(y_pred, y): #与真实标签进行对比
if float(y_p) < 0.5 and int(y_t) == 0:
correct += 1 #负样本判断正确
elif float(y_p) >= 0.5 and int(y_t) == 1:
correct += 1 #正样本判断正确
else:
wrong += 1
print("正确预测个数:%d, 正确率:%f"%(correct, correct/(correct+wrong)))
return correct/(correct+wrong)
def main():
#配置参数
epoch_num = 10 #训练轮数
batch_size = 20 #每次训练样本个数
train_sample = 500 #每轮训练总共训练的样本总数
char_dim = 20 #每个字的维度
sentence_length = 6 #样本文本长度
learning_rate = 0.005 #学习率
# 建立字表
vocab = build_vocab()
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
# 选择优化器
optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
log = []
# 训练过程
for epoch in range(epoch_num):
model.train()
watch_loss = []
for batch in range(int(train_sample / batch_size)):
x, y = build_dataset(batch_size, vocab, sentence_length) #构造一组训练样本
optim.zero_grad() #梯度归零
loss = model(x, y) #计算loss
loss.backward() #计算梯度
optim.step() #更新权重
watch_loss.append(loss.item())
print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
acc = evaluate(model, vocab, sentence_length) #测试本轮模型结果
log.append([acc, np.mean(watch_loss)])
#画图
plt.plot(range(len(log)), [l[0] for l in log], label="acc") #画acc曲线
plt.plot(range(len(log)), [l[1] for l in log], label="loss") #画loss曲线
plt.legend()
plt.show()
#保存模型
torch.save(model.state_dict(), "model.pth")
# 保存词表
writer = open("vocab.json", "w", encoding="utf8")
writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))
writer.close()
return
#使用训练好的模型做预测
def predict(model_path, vocab_path, input_strings):
char_dim = 20 # 每个字的维度
sentence_length = 6 # 样本文本长度
vocab = json.load(open(vocab_path, "r", encoding="utf8")) #加载字符表
model = build_model(vocab, char_dim, sentence_length) #建立模型
model.load_state_dict(torch.load(model_path)) #加载训练好的权重
x = []
for input_string in input_strings:
x.append([vocab[char] for char in input_string]) #将输入序列化
model.eval() #测试模式
with torch.no_grad(): #不计算梯度
result = model.forward(torch.LongTensor(x)) #模型预测
for i, input_string in enumerate(input_strings):
print("输入:%s, 预测类别:%d, 概率值:%f" % (input_string, round(float(result[i])), result[i])) #打印结果
if __name__ == "__main__":
main()
test_strings = ["fnvfee", "wz你dfg", "rqwdeg", "n我kwww"]
predict("model.pth", "vocab.json", test_strings)