Zero2Hero : 3 - Tanh、Gradient、BatchNormal
- 接上篇,对MLP模型有进一步进行了修改,增加BatchNormal、和激活函数。
- 深入研究深层网络的内部,激活、反向传递梯度以及随机初始化的陷阱。
- BatchNormal的作用。
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt # for making figures
from matplotlib.font_manager import FontProperties
font = FontProperties(fname='../chinese_pop.ttf', size=10)
加载数据集
数据是一个中文名数据集
words = open('../Chinese_Names_Corpus.txt', 'r').read().splitlines()
# 数据包含100多万个姓名,过滤出一个姓氏用来测试
names = [name for name in words if name[0] == '王' and len(name) == 3]
len(names)
52127
# 构建词汇表到索引,索引到词汇表的映射,词汇表大小为:1561(加上开始和结束填充字符):
chars = sorted(list(set(''.join(names))))
char2i = {s:i+1 for i,s in enumerate(chars)}
char2i['.'] = 0 # 填充字符
i2char = {i:s for s,i in char2i.items()}
len(chars)
1650
构建训练数据
block_size = 2
def build_dataset(names):
X, Y = [], []
for w in names:
context = [0] * block_size
for ch in w + '.':
ix = char2i[ch]
X.append(context)
Y.append(ix)
context = context[1:] + [ix] # crop and append
X = torch.tensor(X)
Y = torch.tensor(Y)
print(X.shape, Y.shape)
return X, Y
划分数据:
import random
random.seed(42)
random.shuffle(names)
n1 = int(0.8*len(names))
Xtr, Ytr = build_dataset(names[:n1])
Xte, Yte = build_dataset(names[n1:])
torch.Size([166804, 2]) torch.Size([166804])
torch.Size([41704, 2]) torch.Size([41704])
MLP模型
- 模型结构:输入层 → \to →嵌入层 → \to →隐藏层 → \to → BatchNormal层 → \to →激活函数 → \to →输出层。
初始化模型参数:
vocab_size = len(char2i)
n_embd = 2 # 嵌入向量维度
n_hidden = 200 # 隐藏层神经元
g = torch.Generator().manual_seed(2147483647)
C = torch.randn((vocab_size, n_embd), generator=g)
W1 = torch.randn((n_embd * block_size, n_hidden), generator=g) #* (5/3)/((n_embd * block_size)**0.5) #* 0.2
b1 = torch.randn(n_hidden, generator=g) #* 0.01
W2 = torch.randn((n_hidden, vocab_size), generator=g) #* 0.01
b2 = torch.randn(vocab_size, generator=g) #* 0
# BatchNorm parameters
bngain = torch.ones((1, n_hidden))
bnbias = torch.zeros((1, n_hidden))
bnmean_running = torch.zeros((1, n_hidden))
bnstd_running = torch.ones((1, n_hidden))
parameters = [C, W1, W2, b2, bngain, bnbias]
print(sum(p.nelement() for p in parameters)) # number of parameters in total
for p in parameters:
p.requires_grad = True
336353
训练模型:
# same optimization as last time
max_steps = 20000
batch_size = 32
lossi = []
for i in range(max_steps):
# random batch data
ix = torch.randint(0, Xtr.shape[0], (batch_size,), generator=g)
Xb, Yb = Xtr[ix], Ytr[ix]
# forward pass
emb = C[Xb] # embed the characters into vectors
embcat = emb.view(emb.shape[0], -1) # concatenate the vectors
# Linear layer
hpreact = embcat @ W1 + b1 # hidden layer pre-activation
# BatchNorm layer
bnmeani = hpreact.mean(0, keepdim=True) # (1, n_hidden)
bnstdi = hpreact.std(0, keepdim=True) # (1, n_hidden)
hpreact = bngain * (hpreact - bnmeani) / bnstdi + bnbias
with torch.no_grad():
bnmean_running = 0.999 * bnmean_running + 0.001 * bnmeani
bnstd_running = 0.999 * bnstd_running + 0.001 * bnstdi
# -------------------------------------------------------------
# Non-linearity
h = torch.tanh(hpreact)
# output layer
logits = h @ W2 + b2
loss = F.cross_entropy(logits, Yb) # loss function
# backward pass
for p in parameters:
p.grad = None
loss.backward()
# update
lr = 0.1 if i < 10000 else 0.01
for p in parameters:
p.data += -lr * p.grad
lossi.append(loss.log10().item())
训练/测试Loss:
with torch.no_grad():
# pass the training set through
emb = C[Xtr]
embcat = emb.view(emb.shape[0], -1)
hpreact = embcat @ W1 + b1
# measure the mean/std over the entire training set
bnmean = hpreact.mean(0, keepdim=True)
bnstd = hpreact.std(0, keepdim=True)
@torch.no_grad() # this decorator disables gradient tracking
def split_loss(split):
x,y = {'train': (Xtr, Ytr),
'test': (Xte, Yte),}[split]
emb = C[x] # (N, block_size, n_embd)
embcat = emb.view(emb.shape[0], -1) # concat into (N, block_size * n_embd)
hpreact = embcat @ W1 + b1
#hpreact = bngain * (hpreact - hpreact.mean(0, keepdim=True)) / hpreact.std(0, keepdim=True) + bnbias
hpreact = bngain * (hpreact - bnmean_running) / bnstd_running + bnbias
h = torch.tanh(hpreact) # (N, n_hidden)
logits = h @ W2 + b2 # (N, vocab_size)
loss = F.cross_entropy(logits, y)
print(split, loss.item())
split_loss('train')
split_loss('test')
train 3.2291476726531982
test 3.237765312194824
随机初始化参数并进行缩放:
# 对随机初始化的参数进行缩放至更小的值
g = torch.Generator().manual_seed(2147483647)
C = torch.randn((vocab_size, n_embd), generator=g)
W1 = torch.randn((n_embd * block_size, n_hidden), generator=g) * (5/3)/((n_embd * block_size)**0.5) #* 0.2
b1 = torch.randn(n_hidden, generator=g) * 0.01
W2 = torch.randn((n_hidden, vocab_size), generator=g) * 0.01
b2 = torch.randn(vocab_size, generator=g) * 0.01
# BatchNorm parameters
bngain = torch.ones((1, n_hidden))
bnbias = torch.zeros((1, n_hidden))
bnmean_running = torch.zeros((1, n_hidden))
bnstd_running = torch.ones((1, n_hidden))
parameters = [C, W1, W2, b2, bngain, bnbias]
print(sum(p.nelement() for p in parameters)) # number of parameters in total
for p in parameters:
p.requires_grad = True
336353
训练模型:
# same optimization as last time
max_steps = 20000
batch_size = 32
scaled_lossi = []
for i in range(max_steps):
# random batch data
ix = torch.randint(0, Xtr.shape[0], (batch_size,), generator=g)
Xb, Yb = Xtr[ix], Ytr[ix]
# forward pass
emb = C[Xb] # embed the characters into vectors
embcat = emb.view(emb.shape[0], -1) # concatenate the vectors
# Linear layer
hpreact = embcat @ W1 + b1 # hidden layer pre-activation
# BatchNorm layer
bnmeani = hpreact.mean(0, keepdim=True) # (1, n_hidden)
bnstdi = hpreact.std(0, keepdim=True) # (1, n_hidden)
hpreact = bngain * (hpreact - bnmeani) / bnstdi + bnbias
with torch.no_grad():
bnmean_running = 0.999 * bnmean_running + 0.001 * bnmeani
bnstd_running = 0.999 * bnstd_running + 0.001 * bnstdi
# -------------------------------------------------------------
# Non-linearity
h = torch.tanh(hpreact)
# output layer
logits = h @ W2 + b2
loss = F.cross_entropy(logits, Yb) # loss function
# backward pass
for p in parameters:
p.grad = None
loss.backward()
# update
lr = 0.1 if i < 10000 else 0.01
for p in parameters:
p.data += -lr * p.grad
scaled_lossi.append(loss.log10().item())
训练/测试Loss:
with torch.no_grad():
# pass the training set through
emb = C[Xtr]
embcat = emb.view(emb.shape[0], -1)
hpreact = embcat @ W1 + b1
# measure the mean/std over the entire training set
bnmean = hpreact.mean(0, keepdim=True)
bnstd = hpreact.std(0, keepdim=True)
@torch.no_grad() # this decorator disables gradient tracking
def split_loss(split):
x,y = {'train': (Xtr, Ytr),
'test': (Xte, Yte),}[split]
emb = C[x] # (N, block_size, n_embd)
embcat = emb.view(emb.shape[0], -1) # concat into (N, block_size * n_embd)
hpreact = embcat @ W1 + b1
#hpreact = bngain * (hpreact - hpreact.mean(0, keepdim=True)) / hpreact.std(0, keepdim=True) + bnbias
hpreact = bngain * (hpreact - bnmean_running) / bnstd_running + bnbias
h = torch.tanh(hpreact) # (N, n_hidden)
logits = h @ W2 + b2 # (N, vocab_size)
loss = F.cross_entropy(logits, y)
print(split, loss.item())
split_loss('train')
split_loss('test')
train 3.085115909576416
test 3.104541540145874
plt.figure(figsize=(10, 5))
plt.plot(lossi, label='No Scaled parameters')
plt.plot(scaled_lossi,alpha=0.5, label='Scaled parameters')
plt.legend()
对随机初始化权重缩放后,可以显著的降低模型的初始误差。
对数损失
- base
- test : 3.3062
- add batch norm
- train : 3.2291
- test : 3.2377
- add batch norm and scaled parameters
- train : 3.0851
- test : 3.1045
为什么归一化、缩小权重?
首先观察和误差直接相关的预测输出:logits
# 假设下面是输出层的输出
logits = torch.rand((1, 10))*10
logits
tensor([[0.6693, 1.1769, 4.6489, 6.4311, 8.7869, 5.6321, 0.4762, 7.6668, 5.5291,
4.9612]])
loss = F.cross_entropy(logits, torch.tensor([1]))
loss
tensor(8.0425)
# 缩小后的损失
loss = F.cross_entropy(logits*0.01, torch.tensor([1]))
loss
tensor(2.3372)
logits的值越大损失就会越大,logits = h @ W2 + b2
,所以缩小w2
和b2
,就是在缩小logits,可以显著的减小模型的初始损失。
在本例中,(5/3)/((n_embd * block_size)**0.5) = 0.3
,本质也是对随机初始化的权重进行了缩小。
接下来观察hpreact
,是隐藏层的输出,hpreact = embcat @ W1 + b1
:
# 下面假设为隐藏层的输出,隐藏层20个神经元
hpreact = torch.randn((32, 20))*10
hpreact[0]
tensor([ 5.4474, 0.8826, -9.8720, 12.3268, -19.7285, 2.5135, -9.5221,
7.9822, -11.6153, -10.5080, -10.6796, 3.6791, -0.7050, 14.4790,
7.3994, -18.2474, 11.5146, 0.6579, -6.6393, -6.7630])
# 经过Tanh激活后的,隐藏层输出
h = torch.tanh(hpreact)
h[0]
tensor([ 1.0000, 0.7077, -1.0000, 1.0000, -1.0000, 0.9870, -1.0000, 1.0000,
-1.0000, -1.0000, -1.0000, 0.9987, -0.6076, 1.0000, 1.0000, -1.0000,
1.0000, 0.5770, -1.0000, -1.0000])
# 激活后的输出,接近0.99占比
torch.sum(torch.abs(h) >= 0.99)/(20*32)
tensor(0.7875)
经过Tanh激活后,输出值的绝对值 ≈ \approx ≈ 1的大概占了78%,这是一个很恐怖的现象,下面是Tanh函数:
def tanh(self):
x = self.data
t = (math.exp(2*x) - 1)/(math.exp(2*x) + 1)
out = Value(t, (self, ), 'tanh')
def _backward():
self.grad += (1 - t**2) * out.grad
out._backward = _backward
return out
在反向传播部分,(1 - t**2) * out.grad
,t : 经过tanh激活后的输出,如果 t 中大量的值接近-1/1,那么大部
(
1
−
t
2
)
≈
0
(1 - t^2)\approx 0
(1−t2)≈0,这将导致该层的大部分神经元得不到更新,不能够充分训练。
如何解决这个问题:
- 对
hpreact
进行归一化
# 把对w1和b1的缩放,近似作用在hpreact上
# 经过Tanh激活后的,隐藏层输出
hpreact = torch.randn((32, 20))*10
hpreact[0]
tensor([ -1.6678, -5.1004, 4.6603, -6.7397, 11.6537, -12.1372, 12.5041,
-6.4717, -8.0874, 12.1796, -2.7098, -13.1736, 9.8013, -2.1097,
4.5570, -10.4803, -4.0452, 11.1274, 11.3966, 3.9012])
# 激活前对hpreact进行归一化
hpreact = (hpreact - hpreact.mean(axis=0, keepdim=True))/hpreact.std(axis=0, keepdim=True)
hpreact[0]
tensor([-0.0923, -0.7857, 0.4576, -0.5444, 1.2959, -1.0164, 1.3767, -0.5830,
-0.4439, 1.0640, -0.0931, -1.0887, 0.9777, -0.2024, 0.4199, -1.4186,
-0.1238, 1.2435, 1.3699, 0.3593])
# 经过Tanh激活后的,隐藏层输出
h = torch.tanh(hpreact)
h[0]
tensor([-0.0920, -0.6560, 0.4281, -0.4963, 0.8607, -0.7684, 0.8802, -0.5248,
-0.4169, 0.7872, -0.0929, -0.7964, 0.7521, -0.1997, 0.3968, -0.8893,
-0.1231, 0.8465, 0.8787, 0.3446])
# 激活后的输出接近0.99的占比
torch.sum(torch.abs(h) >= 0.99)/(20*32)
tensor(0.0063)
经过BatchNormal后,大部神经元都可以得到更新。
DNN模型
# 全连接层
class Linear:
def __init__(self, fan_in, fan_out, bias=True):
self.weight = torch.randn((fan_in, fan_out), generator=g) / fan_in**0.5
self.bias = torch.zeros(fan_out) if bias else None
def __call__(self, x):
self.out = x @ self.weight
if self.bias is not None:
self.out += self.bias
return self.out
def parameters(self):
return [self.weight] + ([] if self.bias is None else [self.bias])
# 批归一化层
class BatchNorm1d:
def __init__(self, dim, eps=1e-5, momentum=0.1):
self.eps = eps
self.momentum = momentum
self.training = True
# parameters (trained with backprop)
self.gamma = torch.ones(dim)
self.beta = torch.zeros(dim)
# buffers (trained with a running 'momentum update')
self.running_mean = torch.zeros(dim)
self.running_var = torch.ones(dim)
def __call__(self, x):
# calculate the forward pass
if self.training:
xmean = x.mean(0, keepdim=True) # batch mean
xvar = x.var(0, keepdim=True) # batch variance
else:
xmean = self.running_mean
xvar = self.running_var
xhat = (x - xmean) / torch.sqrt(xvar + self.eps) # normalize to unit variance
self.out = self.gamma * xhat + self.beta
# update the buffers
if self.training:
with torch.no_grad():
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar
return self.out
def parameters(self):
return [self.gamma, self.beta]
class Tanh:
def __call__(self, x):
self.out = torch.tanh(x)
return self.out
def parameters(self):
return []
初始化模型参数:
n_embd = 2
n_hidden = 100
vocab_size = len(char2i)
g = torch.Generator().manual_seed(2147483647) # for reproducibility
C = torch.randn((vocab_size, n_embd), generator=g)
layers = [
Linear(n_embd * block_size, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
Linear( n_hidden, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
Linear( n_hidden, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
Linear( n_hidden, vocab_size, bias=False)]
with torch.no_grad():
# last layer: make less confident
#layers[-1].gamma *= 0.1
#layers[-1].weight *= 0.1
# all other layers: apply gain
for layer in layers[:-1]:
if isinstance(layer, Linear):
layer.weight *= 0.01 #5/3
parameters = [C] + [p for layer in layers for p in layer.parameters()]
print(sum(p.nelement() for p in parameters)) # number of parameters in total
for p in parameters:
p.requires_grad = True
189402
训练DNN模型:
# same optimization as last time
max_steps = 20000
batch_size = 32
lossi = []
ud = []
for i in range(max_steps):
# minibatch data
ix = torch.randint(0, Xtr.shape[0], (batch_size,), generator=g)
Xb, Yb = Xtr[ix], Ytr[ix] # batch X,Y
# forward pass
emb = C[Xb] # embed the characters into vectors
x = emb.view(emb.shape[0], -1) # concatenate the vectors
for layer in layers:
x = layer(x)
loss = F.cross_entropy(x, Yb) # loss function
# backward pass
for layer in layers:
layer.out.retain_grad() # AFTER_DEBUG: would take out retain_graph
for p in parameters:
p.grad = None
loss.backward()
# update
lr = 0.1 if i < 15000 else 0.01 # step learning rate decay
for p in parameters:
p.data += -lr * p.grad
lossi.append(loss.log10().item())
with torch.no_grad():
ud.append([((lr*p.grad).std() / p.data.std()).log10().item() for p in parameters])
#if i >= 1000:
# break # AFTER_DEBUG: would take out obviously to run full optimization
参数和梯度可视化:
# visualize activation histograms
plt.figure(figsize=(10, 3)) # width and height of the plot
legends = []
for i, layer in enumerate(layers[:-1]): # note: exclude the output layer
if isinstance(layer, Tanh):
t = layer.out
print('layer %d (%10s): mean %+.2f, std %.2f, saturated: %.2f%%' % (i, layer.__class__.__name__, t.mean(), t.std(), (t.abs() > 0.97).float().mean()*100))
hy, hx = torch.histogram(t, density=True)
plt.plot(hx[:-1].detach(), hy.detach())
legends.append(f'layer {i} ({layer.__class__.__name__})')
plt.legend(legends)
plt.title('activation distribution')
layer 2 ( Tanh): mean -0.01, std 0.66, saturated: 1.62%
layer 5 ( Tanh): mean +0.00, std 0.68, saturated: 1.28%
layer 8 ( Tanh): mean -0.02, std 0.70, saturated: 0.44%
# visualize gradinet histograms
plt.figure(figsize=(10, 3)) # width and height of the plot
legends = []
for i, layer in enumerate(layers[:-1]): # note: exclude the output layer
if isinstance(layer, Tanh):
t = layer.out.grad
print('layer %d (%10s): mean %+f, std %e' % (i, layer.__class__.__name__, t.mean(), t.std()))
hy, hx = torch.histogram(t, density=True)
plt.plot(hx[:-1].detach(), hy.detach())
legends.append(f'layer {i} ({layer.__class__.__name__})')
plt.legend(legends)
plt.title('gradient distribution')
layer 2 ( Tanh): mean +0.000000, std 1.148749e-03
layer 5 ( Tanh): mean -0.000000, std 1.178951e-03
layer 8 ( Tanh): mean -0.000058, std 2.413830e-03
# visualize histograms
plt.figure(figsize=(10, 3)) # width and height of the plot
legends = []
for i,p in enumerate(parameters):
t = p.grad
if p.ndim == 2:
print('weight %10s | mean %+f | std %e | grad:data ratio %e' % (tuple(p.shape), t.mean(), t.std(), t.std() / p.std()))
hy, hx = torch.histogram(t, density=True)
plt.plot(hx[:-1].detach(), hy.detach())
legends.append(f'{i} {tuple(p.shape)}')
plt.legend(legends)
plt.title('weights gradient distribution')
weight (1651, 2) | mean -0.000000 | std 5.618064e-04 | grad:data ratio 5.536448e-04
weight (4, 100) | mean -0.000148 | std 5.627263e-03 | grad:data ratio 1.135445e-02
weight (100, 100) | mean -0.000013 | std 7.010635e-04 | grad:data ratio 2.180403e-03
weight (100, 100) | mean -0.000004 | std 1.754580e-03 | grad:data ratio 6.728885e-03
weight (100, 1651) | mean +0.000000 | std 2.069748e-03 | grad:data ratio 1.988948e-02
测试
@torch.no_grad() # this decorator disables gradient tracking
def split_loss(split):
x,y = {'train': (Xtr, Ytr),'test': (Xte, Yte),}[split]
emb = C[x] # (N, block_size, n_embd)
x = emb.view(emb.shape[0], -1) # concat into (N, block_size * n_embd)
for layer in layers:
x = layer(x)
loss = F.cross_entropy(x, y)
print(split, loss.item())
# put layers into eval mode
for layer in layers:
layer.training = False
split_loss('train')
split_loss('test')
train 3.086639881134033
test 3.101759433746338
# sample from the model
g = torch.Generator().manual_seed(2147483647 + 10)
for _ in range(10):
out = []
context = [0] * block_size # initialize with all ...
while True:
# forward pass the neural net
emb = C[torch.tensor([context])] # (1,block_size,n_embd)
x = emb.view(emb.shape[0], -1) # concatenate the vectors
for layer in layers:
x = layer(x)
logits = x
probs = F.softmax(logits, dim=1)
# sample from the distribution
ix = torch.multinomial(probs, num_samples=1, generator=g).item()
# shift the context window and track the samples
context = context[1:] + [ix]
out.append(ix)
# if we sample the special '.' token, break
if ix == 0:
break
print(''.join(i2char[i] for i in out)) # decode and print the generated word
王才新.
王继东.
王忠营.
王志存.
王胜滨.
王其旗.
王章章.
王铁江.
王三生.
王柏健.