环境描述
环境是倒立摆(Inverted Pendulum),该环境下有一个处于随机位置的倒立摆。环境的状态包括倒立摆角度的正弦值,余弦值,角速度;动作为对倒立摆施加的力矩(action = Box(-2.0, 2.0, (1,), float32)
)。每一步都会根据当前倒立摆的状态的好坏给予智能体不同的奖励,该环境的奖励函数为,倒立摆向上保持直立不动时奖励为 0,倒立摆在其他位置时奖励为负数。环境本身没有终止状态,所以训练的时候需要设置终止条件(笔者在本文设置了260
)。
一、构建智能体
构建智能体:
policy
是和之前一样的。探索和利用, 就是利用的时候基于nn模型的预测
主要核心:
- QNet:
- 就是一个多层的NN
- update就是用MSELoss进行梯度下降
DQN
:支持DQN和doubleDQN
- update:
- 经验回放池R中的数据足够, 从R中采样N个数据
{ (si, ai, ri, si+1) }+i=1,...,N
- 对于DQN: 对每个数据计算 y i = r i + γ Q w − ( s ′ , a r g m a x ( Q w − ( s ′ , a ′ ) ) y_i = r_i + \gamma Q_{w^-}(s',arg max(Q_{w^-}(s', a')) yi=ri+γQw−(s′,argmax(Qw−(s′,a′)) ,动作的选取依靠目标网络( Q w − Q_{w-} Qw−)
- 对于doubleDQN: 对每个数据计算
y
i
=
r
i
+
γ
Q
w
−
(
s
′
,
a
r
g
m
a
x
(
Q
w
(
s
′
,
a
′
)
)
y_i = r_i + \gamma Q_{w^-}(s',arg max(Q_{w}(s', a'))
yi=ri+γQw−(s′,argmax(Qw(s′,a′)) ,动作的选取依靠训练网络(
Q
w
Q_w
Qw)
-
f
m
a
x
f_{max}
fmax就是用
Q
N
e
t
(
s
i
+
1
)
QNet(s_{i+1})
QNet(si+1)计算出每个action对应的值,取最大值的index, 然后根据这个index去
T
a
g
e
t
Q
N
e
t
(
s
i
+
1
)
TagetQNet(s_{i+1})
TagetQNet(si+1)中取最大值
下面的代码可以看出差异
-
f
m
a
x
f_{max}
fmax就是用
Q
N
e
t
(
s
i
+
1
)
QNet(s_{i+1})
QNet(si+1)计算出每个action对应的值,取最大值的index, 然后根据这个index去
T
a
g
e
t
Q
N
e
t
(
s
i
+
1
)
TagetQNet(s_{i+1})
TagetQNet(si+1)中取最大值
- 经验回放池R中的数据足够, 从R中采样N个数据
- update:
# 下个状态的最大Q值
if self.dqn_type == 'DoubleDQN': # DQN与Double DQN的区别
max_action = self.q(next_states).max(1)[1].view(-1, 1)
max_next_q_values = n_actions_q.gather(1, max_action)
else: # DQN的情况
max_next_q_values = n_actions_q.max(1)[0].view(-1, 1)
q_targets = reward + self.gamma * max_next_q_values * (1 - done)
- 最小化目标函数
L
=
1
N
∑
(
y
i
−
Q
N
e
t
(
s
i
,
a
i
)
)
2
L=\frac{1}{N}\sum (yi - QNet(s_i, a_i))^2
L=N1∑(yi−QNet(si,ai))2
- 用
y
i
y_i
yi 和
q(states).gather(1, action)
计算损失并更新参数
- 用
y
i
y_i
yi 和
代码实现
class QNet(nn.Module):
def __init__(self, state_dim: int, hidden_layers_dim: typ.List, action_dim: int):
super(QNet, self).__init__()
self.features = nn.ModuleList()
for idx, h in enumerate(hidden_layers_dim):
self.features.append(
nn.ModuleDict({
'linear': nn.Linear(state_dim if not idx else hidden_layers_dim[idx-1], h),
'linear_active': nn.ReLU(inplace=True)
})
)
self.header = nn.Linear(hidden_layers_dim[-1], action_dim)
def forward(self, x):
for layer in self.features:
x = layer['linear_active'](layer['linear'](x))
return self.header(x)
def model_compelet(self, learning_rate):
self.cost_func = nn.MSELoss()
self.opt = torch.optim.Adam(self.parameters(), lr=learning_rate)
def update(self, pred, target):
self.opt.zero_grad()
loss = self.cost_func(pred, target)
loss.backward()
self.opt.step()
class DQN:
def __init__(self,
state_dim: int,
hidden_layers_dim,
action_dim: int,
learning_rate: float,
gamma: float,
epsilon: float=0.05,
traget_update_freq: int=1,
device: typ.AnyStr='cpu',
dqn_type: typ.AnyStr='DQN'
):
self.action_dim = action_dim
# QNet & targetQNet
self.q = QNet(state_dim, hidden_layers_dim, action_dim)
self.target_q = copy.deepcopy(self.q)
self.q.to(device)
self.q.model_compelet(learning_rate)
self.target_q.to(device)
# iteration params
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = epsilon
# target update freq
self.traget_update_freq = traget_update_freq
self.count = 0
self.device = device
# dqn类型
self.dqn_type = dqn_type
def policy(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.action_dim)
action = self.target_q(torch.FloatTensor(state))
return np.argmax(action.detach().numpy())
def update(self, samples: deque):
"""
Q<s, a, t> = R<s, a, t> + gamma * Q<s+1, a_max, t+1>
"""
self.count += 1
state, action, reward, next_state, done = zip(*samples)
states = torch.FloatTensor(state).to(self.device)
action = torch.tensor(action).view(-1, 1).to(self.device)
reward = torch.tensor(reward).view(-1, 1).to(self.device)
next_states = torch.FloatTensor(next_state)
done = torch.FloatTensor(done).view(-1, 1).to(self.device)
actions_q = self.q(states)
n_actions_q = self.target_q(next_states)
q_values = actions_q.gather(1, action)
# 下个状态的最大Q值
if self.dqn_type == 'DoubleDQN': # DQN与Double DQN的区别
max_action = self.q(next_states).max(1)[1].view(-1, 1)
max_next_q_values = n_actions_q.gather(1, max_action)
else: # DQN的情况
max_next_q_values = n_actions_q.max(1)[0].view(-1, 1)
q_targets = reward + self.gamma * max_next_q_values * (1 - done)
# MSELoss update
self.q.update(q_values.float(), q_targets.float())
if self.count % self.traget_update_freq == 0:
self.target_q.load_state_dict(
self.q.state_dict()
)
二、智能体训练
2.1 注意点
在训练的时候:
- 所有的参数都在
Config
中进行配置,便于调参
class Config:
num_episode = 300
state_dim = None
hidden_layers_dim = [10, 10]
action_dim = 20
learning_rate = 2e-3
gamma = 0.95
epsilon = 0.01
traget_update_freq = 3
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
buffer_size = 2048
minimal_size = 1024
batch_size = 128
render = False
save_path = r'D:\TMP\model.ckpt'
dqn_type = 'DoubleDQN'
# 回合停止控制
max_episode_rewards = 260
max_episode_steps = 260
def __init__(self, env):
self.state_dim = env.observation_space.shape[0]
try:
self.action_dim = env.action_space.n
except Exception as e:
pass
print(f'device = {self.device} | env={str(env)}')
- 由于这次环境的action空间是连续的,我们需要有一个函数进行action的离散和连续的转换
def Pendulum_dis_to_con(discrete_action, env, action_dim): # 离散动作转回连续的函数
action_lowbound = env.action_space.low[0] # 连续动作的最小值
action_upbound = env.action_space.high[0] # 连续动作的最大值
action_range = action_upbound - action_lowbound
return action_lowbound + (discrete_action / (action_dim - 1)) * action_range
2.2 训练
需要注意的是笔者的gym版本是0.26.2
def train_dqn(env, cfg, action_contiguous=False):
buffer = replayBuffer(cfg.buffer_size)
dqn = DQN(
state_dim=cfg.state_dim,
hidden_layers_dim=cfg.hidden_layers_dim,
action_dim=cfg.action_dim,
learning_rate=cfg.learning_rate,
gamma=cfg.gamma,
epsilon=cfg.epsilon,
traget_update_freq=cfg.traget_update_freq,
device=cfg.device,
dqn_type=cfg.dqn_type
)
tq_bar = tqdm(range(cfg.num_episode))
rewards_list = []
now_reward = 0
bf_reward = -np.inf
for i in tq_bar:
tq_bar.set_description(f'Episode [ {i+1} / {cfg.num_episode} ]')
s, _ = env.reset()
done = False
episode_rewards = 0
steps = 0
while not done:
a = dqn.policy(s)
# [Any, float, bool, bool, dict]
if action_contiguous:
c_a = Pendulum_dis_to_con(a, env, cfg.action_dim)
n_s, r, done, _, _ = env.step([c_a])
else:
n_s, r, done, _, _ = env.step(a)
buffer.add(s, a, r, n_s, done)
s = n_s
episode_rewards += r
steps += 1
# buffer update
if len(buffer) > cfg.minimal_size:
samples = buffer.sample(cfg.batch_size)
dqn.update(samples)
if (episode_rewards >= cfg.max_episode_rewards) or (steps >= cfg.max_episode_steps):
break
rewards_list.append(episode_rewards)
now_reward = np.mean(rewards_list[-10:])
if bf_reward < now_reward:
torch.save(dqn.target_q.state_dict(), cfg.save_path)
bf_reward = max(bf_reward, now_reward)
tq_bar.set_postfix({'lastMeanRewards': f'{now_reward:.2f}', 'BEST': f'{bf_reward:.2f}'})
env.close()
return dqn
if __name__ == '__main__':
print('=='*35)
print('Training Pendulum-v1')
p_env = gym.make('Pendulum-v1')
p_cfg = Config(p_env)
p_dqn = train_dqn(p_env, p_cfg, True)
三、训练出的智能体观测
最后将训练的最好的网络拿出来进行观察
p_dqn.target_q.load_state_dict(torch.load(p_cfg.save_path))
play(gym.make('Pendulum-v1', render_mode="human"), p_dqn, p_cfg, episode_count=2, action_contiguous=True)
从下图中我们可以看出,本次的训练成功还是可以的。
完整脚本查看笔者github: Doubledqn_lr.py 记得点
Star
哦
笔者后续会更深入的学习强化学习并对gym
各个环境逐一进行训练