反向传播算法:loss.backward()的实现细节
- 向前传播:输入数据得到预测结果。
- 向后传播:计算梯度加更新参数。
- 反向传播:计算梯度
计算图
计算图 = 有向无环图 + 基本运算
- 节点:变量节点 & 计算节点
- 有向边:表示计算关系
- 基本运算:比如四则运算、sigmid等
import torch
import torch.nn as nn
from graphviz import Digraph
定义类表示计算图
初始化函数参数: value表示值,prevs表示直接的前序节点, op表示计算符号(针对计算节点),label表示变量名。
定义类的输出字符串。
定义 ‘+’ 与 ‘*’ 触发函数(包括计算当前节点局部梯度)。
# 计算图
class ScalarTmp:
def __init__(self, value, prevs=[], op=None, label=''):
# value表示值,prevs表示直接的前序节点, op表示计算符号(针对计算节点),label表示变量名
self.value = value
self.prevs = prevs
self.op = op
self.label = label
self.grad = 0.0 # 全局梯度
self.grad_wrt = {} # 局部梯度
# 定义类的输出字符串
def __repr__(self):
return f'{self.value} | {self.op} | {self.label}'
def __add__(self, other):
# self + other触发函数
value = self.value + other.value
prevs = [self, other]
output = ScalarTmp(value, prevs, op='+')
output.grad_wrt[self] = 1
output.grad_wrt[other] = 1
return output
def __mul__(self, other):
# self * other触发函数
value = self.value * other.value
prevs = [self, other]
output = ScalarTmp(value, prevs, op='*')
output.grad_wrt[self] = other.value
output.grad_wrt[other] = self.value
return output
画图
def _trace(root):
# 遍历所有点和边
nodes, edges = set(), set() # set集合存储唯一的元素,并且没有特定的顺序
def _build(v):
if v not in nodes:
nodes.add(v)
for prev in v.prevs:
edges.add((prev, v))
_build(prev)
_build(root)
return nodes, edges
def draw_graph(root, direction='forward'):
nodes, edges = _trace(root)
rankdir = 'BT' if direction == 'forward' else 'TB'
graph = Digraph(format='svg', graph_attr={'rankdir': rankdir})
# 可缩放矢量图形(SVG),这是一种用于网页的矢量图形格式。
# rankdir 属性定义了图的布局方向
# 'TB': 从上到下布局。
# 'LR': 从左到右布局。
# 'BT': 从下到上布局。
# 'RL': 从右到左布局。
# 画点
for node in nodes:
label = node.label if node.op is None else node.op
node_attr = f'{{grad={node.grad:.2f} | value={node.value:.2f} | {label}}}'
uid = str(id(node))
graph.node(name=uid, label=node_attr, shape='record')
# 画边
for edge in edges:
id1 = str(id(edge[0]))
id2 = str(id(edge[1]))
graph.edge(id1, id2)
return graph
利用深度优先搜索排序
ordered 用于存储排序后的节点序列,visited 是一个集合,用于记录已经访问过的节点,以避免重复访问。
# 拓扑排序
def _top_order(root):
# 利用深度优先搜索排序
# ordered 用于存储排序后的节点序列,visited 是一个集合,用于记录已经访问过的节点,以避免重复访问。
ordered, visited = [], set()
def _add_prevs(node):
if node not in visited:
visited.add(node)
for prev in node.prevs:
_add_prevs(prev)
ordered.append(node) # 将当前节点添加到 ordered 列表的末尾
_add_prevs(root)
return ordered
计算梯度
链式法则:v.grad += node.grad * node.grad_wrt[v]
# 计算梯度
def backward(root):
# 定义顶点的梯度为1
root.grad = 1.0
ordered = _top_order(root)
for node in reversed(ordered): # reversed反转
for v in node.prevs:
v.grad += node.grad * node.grad_wrt[v]
return root
绘制简单计算图
a = ScalarTmp(value=1.0, label='a')
b = ScalarTmp(value=2.0, label='b')
c = ScalarTmp(value=4.0, label='c')
d = a + b
e = a * c
f = d * e
backward(f)
draw_graph(f)
完善计算图代码保存到utils.py文件中
源码如下:
from graphviz import Digraph
import math
class Scalar:
def __init__(self, value, prevs=[], op=None, label='', requires_grad=True):
# 节点的值
self.value = value
# 节点的标识(label)和对应的运算(op),用于作图
self.label = label
self.op = op
# 节点的前节点,即当前节点是运算的结果,而前节点是参与运算的量
self.prevs = prevs
# 是否需要计算该节点偏导数,即∂loss/∂self(loss表示最后的模型损失)
self.requires_grad = requires_grad
# 该节点偏导数,即∂loss/∂self
self.grad = 0.0
# 如果该节点的prevs非空,存储所有的∂self/∂prev
self.grad_wrt = dict()
# 作图需要,实际上对计算没有作用
self.back_prop = dict()
def __repr__(self):
return f'Scalar(value={self.value:.2f}, grad={self.grad:.2f})'
def __add__(self, other):
'''
定义加法,self + other将触发该函数
'''
if not isinstance(other, Scalar):
other = Scalar(other, requires_grad=False)
# output = self + other
output = Scalar(self.value + other.value, [self, other], '+')
output.requires_grad = self.requires_grad or other.requires_grad
# 计算偏导数 ∂output/∂self = 1
output.grad_wrt[self] = 1
# 计算偏导数 ∂output/∂other = 1
output.grad_wrt[other] = 1
return output
def __sub__(self, other):
'''
定义减法,self - other将触发该函数
'''
if not isinstance(other, Scalar):
other = Scalar(other, requires_grad=False)
# output = self - other
output = Scalar(self.value - other.value, [self, other], '-')
output.requires_grad = self.requires_grad or other.requires_grad
# 计算偏导数 ∂output/∂self = 1
output.grad_wrt[self] = 1
# 计算偏导数 ∂output/∂other = -1
output.grad_wrt[other] = -1
return output
def __mul__(self, other):
'''
定义乘法,self * other将触发该函数
'''
if not isinstance(other, Scalar):
other = Scalar(other, requires_grad=False)
# output = self * other
output = Scalar(self.value * other.value, [self, other], '*')
output.requires_grad = self.requires_grad or other.requires_grad
# 计算偏导数 ∂output/∂self = other
output.grad_wrt[self] = other.value
# 计算偏导数 ∂output/∂other = self
output.grad_wrt[other] = self.value
return output
def __pow__(self, other):
'''
定义乘方,self**other将触发该函数
'''
assert isinstance(other, (int, float))
# output = self ** other
output = Scalar(self.value ** other, [self], f'^{other}')
output.requires_grad = self.requires_grad
# 计算偏导数 ∂output/∂self = other * self**(other-1)
output.grad_wrt[self] = other * self.value ** (other - 1)
return output
def sigmoid(self):
'''
定义sigmoid
'''
s = 1 / (1 + math.exp(-1 * self.value))
output = Scalar(s, [self], 'sigmoid')
output.requires_grad = self.requires_grad
# 计算偏导数 ∂output/∂self = output * (1 - output)
output.grad_wrt[self] = s * (1 - s)
return output
def __rsub__(self, other):
'''
定义右减法,other - self将触发该函数
'''
if not isinstance(other, Scalar):
other = Scalar(other, requires_grad=False)
output = Scalar(other.value - self.value, [self, other], '-')
output.requires_grad = self.requires_grad or other.requires_grad
# 计算偏导数 ∂output/∂self = -1
output.grad_wrt[self] = -1
# 计算偏导数 ∂output/∂other = 1
output.grad_wrt[other] = 1
return output
def __radd__(self, other):
'''
定义右加法,other + self将触发该函数
'''
return self.__add__(other)
def __rmul__(self, other):
'''
定义右乘法,other * self将触发该函数
'''
return self * other
def backward(self, fn=None):
'''
由当前节点出发,求解以当前节点为顶点的计算图中每个节点的偏导数,i.e. ∂self/∂node
参数
----
fn :画图函数,如果该变量不等于None,则会返回向后传播每一步的计算的记录
返回
----
re :向后传播每一步的计算的记录
'''
def _topological_order():
'''
利用深度优先算法,返回计算图的拓扑排序(topological sorting)
'''
def _add_prevs(node):
if node not in visited:
visited.add(node)
for prev in node.prevs:
_add_prevs(prev)
ordered.append(node)
ordered, visited = [], set()
_add_prevs(self)
return ordered
def _compute_grad_of_prevs(node):
'''
由node节点出发,向后传播
'''
# 作图需要,实际上对计算没有作用
node.back_prop = dict()
# 得到当前节点在计算图中的梯度。由于一个节点可以在多个计算图中出现,
# 使用cg_grad记录当前计算图的梯度
dnode = cg_grad[node]
# 使用node.grad记录节点的累积梯度
node.grad += dnode
for prev in node.prevs:
# 由于node节点的偏导数已经计算完成,可以向后扩散(反向传播)
# 需要注意的是,向后扩散到上游节点是累加关系
grad_spread = dnode * node.grad_wrt[prev]
cg_grad[prev] = cg_grad.get(prev, 0.0) + grad_spread
node.back_prop[prev] = node.back_prop.get(prev, 0.0) + grad_spread
# 当前节点的偏导数等于1,因为∂self/∂self = 1。这是反向传播算法的起点
cg_grad = {self: 1}
# 为了计算每个节点的偏导数,需要使用拓扑排序的倒序来遍历计算图
ordered = reversed(_topological_order())
re = []
for node in ordered:
_compute_grad_of_prevs(node)
# 作图需要,实际上对计算没有作用
if fn is not None:
re.append(fn(self, 'backward'))
return re
def _get_node_attr(node, direction='forward'):
'''
节点的属性
'''
node_type = _get_node_type(node)
# 设置字体
res = {'fontname': 'Menlo'}
def _forward_attr():
if node_type == 'param':
node_text = f'{{ grad=None | value={node.value:.2f} | {node.label}}}'
res.update(
dict(label=node_text, shape='record', fontsize='10', fillcolor='lightgreen', style='filled, bold'))
return res
elif node_type == 'computation':
node_text = f'{{ grad=None | value={node.value:.2f} | {node.op}}}'
res.update(
dict(label=node_text, shape='record', fontsize='10', fillcolor='gray94', style='filled, rounded'))
return res
elif node_type == 'input':
if node.label == '':
node_text = f'input={node.value:.2f}'
else:
node_text = f'{node.label}={node.value:.2f}'
res.update(dict(label=node_text, shape='oval', fontsize='10'))
return res
def _backward_attr():
attr = _forward_attr()
attr['label'] = attr['label'].replace('grad=None', f'grad={node.grad:.2f}')
if not node.requires_grad:
attr['style'] = 'dashed'
# 为了作图美观
# 如果向后扩散(反向传播)的梯度等于0,或者扩散给不需要梯度的节点,那么该节点用虚线表示
grad_back = [v if k.requires_grad else 0 for (k, v) in node.back_prop.items()]
if len(grad_back) > 0 and sum(grad_back) == 0:
attr['style'] = 'dashed'
return attr
if direction == 'forward':
return _forward_attr()
else:
return _backward_attr()
def _get_node_type(node):
'''
决定节点的类型,计算节点、参数以及输入数据
'''
if node.op is not None:
return 'computation'
if node.requires_grad:
return 'param'
return 'input'
def _trace(root):
'''
遍历图中的所有点和边
'''
nodes, edges = set(), set()
def _build(v):
if v not in nodes:
nodes.add(v)
for prev in v.prevs:
edges.add((prev, v))
_build(prev)
_build(root)
return nodes, edges
def _draw_node(graph, node, direction='forward'):
'''
画节点
'''
node_attr = _get_node_attr(node, direction)
uid = str(id(node)) + direction
graph.node(name=uid, **node_attr)
def _draw_edge(graph, n1, n2, direction='forward'):
'''
画边
'''
uid1 = str(id(n1)) + direction
uid2 = str(id(n2)) + direction
def _draw_back_edge():
if n1.requires_grad and n2.requires_grad:
grad = n2.back_prop.get(n1, None)
if grad is None:
graph.edge(uid2, uid1, arrowhead='none', color='deepskyblue')
elif grad == 0:
graph.edge(uid2, uid1, style='dashed', label=f'{grad:.2f}', color='deepskyblue', fontname='Menlo')
else:
graph.edge(uid2, uid1, label=f'{grad:.2f}', color='deepskyblue', fontname='Menlo')
else:
graph.edge(uid2, uid1, style='dashed', arrowhead='none', color='deepskyblue')
if direction == 'forward':
graph.edge(uid1, uid2)
elif direction == 'backward':
_draw_back_edge()
else:
_draw_back_edge()
graph.edge(uid1, uid2)
def draw_graph(root, direction='forward'):
'''
图形化展示由root为顶点的计算图
参数
----
root :Scalar,计算图的顶点
direction :str,向前传播(forward)或者反向传播(backward)
返回
----
re :Digraph,计算图
'''
nodes, edges = _trace(root)
rankdir = 'BT' if direction == 'forward' else 'TB'
graph = Digraph(format='svg', graph_attr={'rankdir': rankdir})
for item in nodes:
_draw_node(graph, item, direction)
for n1, n2 in edges:
_draw_edge(graph, n1, n2, direction)
return graph
引用utils文件绘制计算图
from utils import Scalar, draw_graph
from IPython.display import clear_output
import time
a = Scalar(value=1.0, label='a')
b = Scalar(value=2.0, label='b')
c = Scalar(value=4.0, label='c')
d = a + b
e = a * c
f = d * e
backward_process = f.backward(draw_graph)
draw_graph(f, 'backward')
检查每一步梯度计算
for i in backward_process:
display(i)
time.sleep(3)
clear_output(wait=True)
梯度累积
在linear_model.py文件中定义线性模型Linear和计算均方误差函数mse
from utils import Scalar
def mse(errors):
'''
计算均方误差
'''
n = len(errors)
wrt = {}
value = 0.0
requires_grad = False
for item in errors:
value += item.value ** 2 / n
wrt[item] = 2 / n * item.value
requires_grad = requires_grad or item.requires_grad
output = Scalar(value, errors, 'mse')
output.requires_grad=requires_grad
output.grad_wrt = wrt
return output
class Linear:
def __init__(self):
'''
定义线性回归模型的参数:a, b
'''
self.a = Scalar(0.0, label='a')
self.b = Scalar(0.0, label='b')
def forward(self, x):
'''
根据当前的参数估计值,得到模型的预测结果
'''
return self.a * x + self.b
def error(self, x, y):
'''
当前数据的模型误差
'''
return y - self.forward(x)
def string(self):
'''
输出当前模型的结果
'''
return f'y = {self.a.value:.2f} * x + {self.b.value:.2f}'
引用utils.py和linear.py文件搭建线性模型并讨论拟合情况
from utils import Scalar, draw_graph
from linear_model import Linear, mse
# 生成数据
import torch
torch.manual_seed(1024) # 随机种子
x = torch.linspace(100, 300, 200)
x = (x - torch.mean(x)) / torch.std(x) # 将数据的分布转换为均值为0,标准差为1的标准正态分布
epsilon = torch.randn(x.shape)
y = 10 * x + 5 + epsilon
model = Linear()
batch_size = 20
learning_rate = 0.1
for t in range(20):
ix = (t * batch_size) % len(x)
xx = x[ix : ix + batch_size]
yy = y[ix : ix + batch_size]
loss = mse([model.error(_x, _y) for _x, _y in zip(xx, yy)])
loss.backward()
model.a -= learning_rate * model.a.grad
model.b -= learning_rate * model.b.grad
model.a.grad = 0.0
model.b.grad = 0.0
print(model.string())
参数估计全流程
计算图膨胀
绘制两个数据点时的计算图
# 计算图膨胀
model = Linear()
# 定义两组数据
x1 = Scalar(1.5, label='x1', requires_grad=False)
y1 = Scalar(1.0, label='y1', requires_grad=False)
x2 = Scalar(2.0, label='x2', requires_grad=False)
y2 = Scalar(4.0, label='y2', requires_grad=False)
loss = mse([model.error(x1, y1), model.error(x2, y2)])
loss.backward()
draw_graph(loss, 'backward')
如图所示当有n个数据点时计算图会包含n个重复的部分(比较胖),这样的现象叫计算图膨胀,后果是内存使用量增大。
梯度累积
由于每个数据点在反向传播的时候,不依赖于其他数据点的反向传播,启发我们可以用多次累加的方式达到同样的效果,避免计算图膨胀。
- 第一次传播
因为模型的损失是平均数,当使用n个点时,需要将n个数据点的损失相加再除以n,这里就是乘以1/2进行系数调整。
# 第一次传播
model = Linear()
loss = 0.5 * mse([model.error(x1, y1)])
loss.backward()
draw_graph(loss, 'backward')
结果如下图所示,与上图右半部分是一样的。
- 第二次传播
第一次传播结束后不能立即进行梯度清零,需要等第二次传播才能得到真正想要的梯度,然后再更新参数。因此在现有基础上再进行一次传播。
# 第二次传播(梯度积累)
loss = 0.5 * mse([model.error(x2, y2)])
loss.backward()
draw_graph(loss, 'backward')
结果如下图所示,这里的梯度是在第一次传播的基础上累加得到的。
全流程
- 新增梯度积累次数,即梯度积累gradient_accu_iter次后再去更新模型参数
- 定义小批量数据量micro_size = batch_size / gradient_accu_iter,取数据时使用micro_size
- 循环次数增加,因为gradient_accu_iter次才更新一次,所以循环次数为20 * gradient_accu_iter
- 权重调整,损失乘以1 / gradient_accu_iter
model = Linear()
batch_size = 20
learning_rate = 0.1
# 梯度积累次数
gradient_accu_iter = 4
# 小批量数据量
micro_size = int(batch_size / gradient_accu_iter)
for t in range(20 * gradient_accu_iter):
ix = (t * batch_size) % len(x)
xx = x[ix : ix + micro_size]
yy = y[ix : ix + micro_size]
loss = mse([model.error(_x, _y) for _x, _y in zip(xx, yy)])
# 权重调整
loss *= 1 / gradient_accu_iter
loss.backward()
if (t + 1) % gradient_accu_iter == 0:
model.a -= learning_rate * model.a.grad
model.b -= learning_rate * model.b.grad
model.a.grad = 0.0
model.b.grad = 0.0
print(model.string())