相对于DQN输出采取动作的Q值,Policy Gradient网络输出采取动作的概率,根据概率来判断需要采取的动作,并在训练过程不断修正网络,使输出的概率更好的符合最优的采取动作的策略。关于Policy Gradient方法的详细原理,可以参考
https://blog.csdn.net/ygp12345/article/details/109009311
应用到倒立摆控制,可以通过构建一个前向网络和一个学习策略来实现。
1 载入模块
载入需要的模块,代码如下
import gym
import numpy as np
import math
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from matplotlib import animation
animation模块用于生成倒立摆控制的gif动图。
2 定义前向网络
代码如下
# prediction model
class Net(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(4, 10)
self.fc2 = nn.Linear(10, 2)
self.fc1.weight.data.normal_(0,0.1)
self.fc2.weight.data.normal_(0,0.1)
def forward(self, state):
x = self.fc1(state)
x = nn.functional.relu(x)
x = self.fc2(x)
output = nn.functional.softmax(x)
return output
这里采用两层全连接层,中间通过relu函数激活,采用softmax函数输出采取动作(0和1)的概率。
3 定义Policy Gradient策略
代码如下
# define Policy Gradient
class PolicyGradient(nn.Module):
def __init__(self):
super().__init__()
self.net = Net()
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.01)
self.history_log_probs = []
self.history_rewards = []
self.gamma = 0.99
def choose_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0)
probs = self.net(state)
ctgr = torch.distributions.Categorical(probs)
action = ctgr.sample()
self.history_log_probs.append(ctgr.log_prob(action))
return action.item()
def choose_best_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0)
probs = self.net(state)
action = int(torch.argmax(probs))
return action
def get_reward(self, state):
pos, vel, ang, avel = state
pos1 = 2.0
ang1 = math.pi/6
r1 = 5-10*abs(pos/pos1)
r2 = 5-10*abs(ang/ang1)
r1 = max(r1, -5)
r2 = max(r2, -5)
return r1+r2
def gg(self, state):
pos, vel, ang, avel = state
bad = abs(pos) > 2.0 or abs(ang) > math.pi/4
return bad
def store_transition(self, reward):
self.history_rewards.append(reward)
def learn(self):
# backward calculate rewards
R = 0
rewards = []
for r in self.history_rewards[::-1]:
R = r + self.gamma*R
rewards.insert(0,R)
rewards = torch.tensor(rewards)
rewards = (rewards-rewards.mean())/rewards.std()
loss = 0
for i in range(len(rewards)):
loss += -self.history_log_probs[i]*rewards[i]
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.history_log_probs.clear()
self.history_rewards.clear()
# define some functions
def print_red(string):
print('\033[0;31m', end='')
print(string, end='')
print('\033[0m')
def save_gif(frames, filename):
figure = plt.imshow(frames[0])
plt.axis('off')
# callback function
def animate(i):
figure.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5)
anim.save(filename, writer='pillow', fps=30)
其中包含用于动作决策的前向网络,以及动作决策函数(choose_action),奖励记录函数(store_transition),学习函数(learn)等。在Policy Gradient方法中,动作的决策由网络输出的概率来实行,概率高的动作具有较高的概率被执行。每次执行过程,奖励被记录下来用于后面的学习。学习过程通过对概率的对数log(prob)和奖励(reward)的乘积和求导,进行梯度下降学习,使奖励高的动作采取概率增加,奖励低的动作采取概率减小。
4 仿真训练
仿真训练通过CartPole对象模拟来实现
# create cartpole model
env = gym.make('CartPole-v1', render_mode='human')
# reset state of env
state, _ = env.reset()
# crate Policy Gradient model
model = PolicyGradient()
# step of learning
learn_step = 0
# flag of train ok
train_ok = False
episode = 0
# play and train
while not train_ok:
state, _ = env.reset()
play_step = 0
total_rewards = 0
episode += 1
print(f'\nEpisode {episode} ...')
while True:
env.render()
action = model.choose_action(state)
state, reward, done, _, info = env.step(action)
pos, vel, a, a_vel = state # position, velocity, angle, angular velocity
reward = model.get_reward(state)
if model.gg(state):
reward += -10
model.store_transition(reward)
total_rewards += reward
play_step += 1
if play_step%1000 == 0 or model.gg(state):
model.learn()
learn_step += 1
print(f'play step {play_step} rewards {total_rewards:.2f} learn {learn_step}')
if model.gg(state):
break
if play_step >= 20000:
train_ok = True
break
# train ok, save model
save_file = 'policy_gradient.ptl'
torch.save(model, save_file)
print_red(f'\nmodel trained ok, saved to {save_file}')
# close env
env.close()
程序在循环中,不断的根据网络的决策对倒立摆进行控制,每次倒立摆控制失败,进行下一次尝试控制和学习。一直到倒立摆控制步数能够大于一定数值(10000)训练完成,表示达到了稳定控制倒立摆的能力。然后对控制模型进行保存。其中倒立摆对象的奖励和结束采用这里采用自己定义的函数。
5 进行验证
从保存的模型中载入数据,对一个新的对象进行控制
# create game model
env = gym.make('CartPole-v1', render_mode='rgb_array')
# load trained model
model = torch.load('policy_gradient.ptl')
# frames to store game play
frames = []
state, _ = env.reset()
# play a period of time
for i in range(400):
frames.append(env.render())
action = model.choose_best_action(state)
state, reward, done, _, info = env.step(action)
if model.gg(state):
break
#save frames to gif file
save_gif(frames, 'cart_pole_policy_gradient.gif')
env.close()
如上,这里采用choose_best_action函数来选择采取的动作,和choose_action的区别在于choose_action按照概率来选择采取的动作,概率高的动作有更高的概率被选择,概率小的动作有较小的概率被选择。choose_best_action函数则直接选择概率高的动作,表示是在所在情况下最好的选择。控制过程记录倒立摆响应的画面,并写到gif文件。
最后效果如下