基本就是把看到有用的资料整合在一起了
资料
https://blog.csdn.net/weixin_48878618/article/details/133590646
https://blog.csdn.net/weixin_42769131/article/details/104783188?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522166792845916800182132771%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=166792845916800182132771&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-1-104783188-null-null.142%5Ev63%5Ewechat,201%5Ev3%5Eadd_ask,213%5Ev2%5Et3_esquery_v3&utm_term=gym%E5%A4%9A%E5%B1%82%E6%84%9F%E7%9F%A5%E6%9C%BA&spm=1018.2226.3001.4187
https://blog.csdn.net/Scc_hy/article/details/128297350
一、gym自定义环境
1、定义环境类
"""
http://incompleteideas.net/sutton/MountainCar/MountainCar1.cp
permalink: https://perma.cc/6Z2N-PFWC
"""
import math
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding
class GridEnv(gym.Env):
metadata = {
'render.modes': ['human', 'rgb_array'],
'video.frames_per_second': 30
}
def __init__(self, goal_velocity=0):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.goal_velocity = goal_velocity
self.force = 0.001
self.gravity = 0.0025
self.low = np.array([self.min_position, -self.max_speed], dtype=np.float32)
self.high = np.array([self.max_position, self.max_speed], dtype=np.float32)
self.viewer = None
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high, dtype=np.float32)
self.seed()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
position, velocity = self.state
velocity += (action - 1) * self.force + math.cos(3 * position) * (-self.gravity)
velocity = np.clip(velocity, -self.max_speed, self.max_speed)
position += velocity
position = np.clip(position, self.min_position, self.max_position)
if (position == self.min_position and velocity < 0): velocity = 0
done = bool(position >= self.goal_position and velocity >= self.goal_velocity)
reward = -1.0
self.state = (position, velocity)
return np.array(self.state), reward, done, action, {}
def reset(self):
self.state = np.array([self.np_random.uniform(low=-0.6, high=-0.4), 0])
return np.array(self.state)
def _height(self, xs):
return np.sin(3 * xs) * .45 + .55
def render(self, mode='human'):
screen_width = 600
screen_height = 400
world_width = self.max_position - self.min_position
scale = screen_width / world_width
carwidth = 40
carheight = 20
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.Viewer(screen_width, screen_height)
xs = np.linspace(self.min_position, self.max_position, 100)
ys = self._height(xs)
xys = list(zip((xs - self.min_position) * scale, ys * scale))
self.track = rendering.make_polyline(xys)
self.track.set_linewidth(4)
self.viewer.add_geom(self.track)
clearance = 10
l, r, t, b = -carwidth / 2, carwidth / 2, carheight, 0
car = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
car.add_attr(rendering.Transform(translation=(0, clearance)))
self.cartrans = rendering.Transform()
car.add_attr(self.cartrans)
self.viewer.add_geom(car)
frontwheel = rendering.make_circle(carheight / 2.5)
frontwheel.set_color(.5, .5, .5)
frontwheel.add_attr(rendering.Transform(translation=(carwidth / 4, clearance)))
frontwheel.add_attr(self.cartrans)
self.viewer.add_geom(frontwheel)
backwheel = rendering.make_circle(carheight / 2.5)
backwheel.add_attr(rendering.Transform(translation=(-carwidth / 4, clearance)))
backwheel.add_attr(self.cartrans)
backwheel.set_color(.5, .5, .5)
self.viewer.add_geom(backwheel)
flagx = (self.goal_position - self.min_position) * scale
flagy1 = self._height(self.goal_position) * scale
flagy2 = flagy1 + 50
flagpole = rendering.Line((flagx, flagy1), (flagx, flagy2))
self.viewer.add_geom(flagpole)
flag = rendering.FilledPolygon([(flagx, flagy2), (flagx, flagy2 - 10), (flagx + 25, flagy2 - 5)])
flag.set_color(.8, .8, 0)
self.viewer.add_geom(flag)
pos = self.state[0]
self.cartrans.set_translation((pos - self.min_position) * scale, self._height(pos) * scale)
self.cartrans.set_rotation(math.cos(3 * pos))
return self.viewer.render(return_rgb_array=mode == 'rgb_array')
def get_keys_to_action(self):
return {(): 1, (276,): 0, (275,): 2, (275, 276): 1} # control with left and right arrow keys
def close(self):
if self.viewer:
self.viewer.close()
self.viewer = None
2、修改环境类所处的模块的初始化代码
文件地址:‘/home/xxx/gym/gym/envs/classic_control/__init__.py’
from gym.envs.classic_control import grid_mdp
from gym.envs.classic_control.grid_mdp import GridEnv
3、在gym中注册
文件地址:‘/home/xxx/gym/gym/envs/__init__.py’
register (
id= 'GridWorld-v0',
entry_point='gym.envs.classic_control:GridEnv',
max_episode_steps=200, reward_threshold=100.0,
)
4、测试代码
import gym
import time
env = gym.make('GridWorld-v0')
for eq in range(10):
obs = env.reset()
done = False
rewards = 0
while not done:
action = env.action_space.sample()
observation, reward, terminated, action, info= env.step(action)
env.render()
rewards += reward
print(rewards)
二、gym自定义mujoco环境
1、调用gym中的mujoco环境
- gym中有使用mujoco环境的例子,其中使用mujoco模型,并将相关的api改成gym风格
import gym
env = gym.make('Ant-v4',render_mode='human') # 创建Humanoid环境实例
observation = env.reset() # 重置环境并获取初始观测
for _ in range(1000): # 执行1000个步骤
env.render() # 渲染环境图像
action = env.action_space.sample() # 从动作空间中随机采样一个动作
print(a)
next_observation, reward, terminated,done, info = env.step(action) # 执行动作并获取下一个观测、奖励等信息
print(_)
if done:
break
env.close() # 关闭环境
2、gym中的mujoco自带例子
- 主要就是调用了
from gym.envs.mujoco import MujocoEnv
这个模块
import numpy as np
from gym import utils
from gym.envs.mujoco import MujocoEnv
from gym.spaces import Box
DEFAULT_CAMERA_CONFIG = {
"distance": 4.0,
}
class AntEnv(MujocoEnv, utils.EzPickle):
metadata = {
"render_modes": [
"human",
"rgb_array",
"depth_array",
],
"render_fps": 20,
}
def __init__(
self,
xml_file="ant.xml",
ctrl_cost_weight=0.5,
use_contact_forces=False,
contact_cost_weight=5e-4,
healthy_reward=1.0,
terminate_when_unhealthy=True,
healthy_z_range=(0.2, 1.0),
contact_force_range=(-1.0, 1.0),
reset_noise_scale=0.1,
exclude_current_positions_from_observation=True,
**kwargs
):
utils.EzPickle.__init__(
self,
xml_file,
ctrl_cost_weight,
use_contact_forces,
contact_cost_weight,
healthy_reward,
terminate_when_unhealthy,
healthy_z_range,
contact_force_range,
reset_noise_scale,
exclude_current_positions_from_observation,
**kwargs
)
self._ctrl_cost_weight = ctrl_cost_weight
self._contact_cost_weight = contact_cost_weight
self._healthy_reward = healthy_reward
self._terminate_when_unhealthy = terminate_when_unhealthy
self._healthy_z_range = healthy_z_range
self._contact_force_range = contact_force_range
self._reset_noise_scale = reset_noise_scale
self._use_contact_forces = use_contact_forces
self._exclude_current_positions_from_observation = (
exclude_current_positions_from_observation
)
obs_shape = 27
if not exclude_current_positions_from_observation:
obs_shape += 2
if use_contact_forces:
obs_shape += 84
observation_space = Box(
low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float64
)
MujocoEnv.__init__(
self, xml_file, 5, observation_space=observation_space, **kwargs
)
@property
def healthy_reward(self):
return (
float(self.is_healthy or self._terminate_when_unhealthy)
* self._healthy_reward
)
def control_cost(self, action):
control_cost = self._ctrl_cost_weight * np.sum(np.square(action))
return control_cost
@property
def contact_forces(self):
raw_contact_forces = self.data.cfrc_ext
min_value, max_value = self._contact_force_range
contact_forces = np.clip(raw_contact_forces, min_value, max_value)
return contact_forces
@property
def contact_cost(self):
contact_cost = self._contact_cost_weight * np.sum(
np.square(self.contact_forces)
)
return contact_cost
@property
def is_healthy(self):
state = self.state_vector()
min_z, max_z = self._healthy_z_range
is_healthy = np.isfinite(state).all() and min_z <= state[2] <= max_z
return is_healthy
@property
def terminated(self):
terminated = not self.is_healthy if self._terminate_when_unhealthy else False
return terminated
def step(self, action):
xy_position_before = self.get_body_com("torso")[:2].copy()
self.do_simulation(action, self.frame_skip)
xy_position_after = self.get_body_com("torso")[:2].copy()
xy_velocity = (xy_position_after - xy_position_before) / self.dt
x_velocity, y_velocity = xy_velocity
forward_reward = x_velocity
healthy_reward = self.healthy_reward
rewards = forward_reward + healthy_reward
costs = ctrl_cost = self.control_cost(action)
terminated = self.terminated
observation = self._get_obs()
info = {
"reward_forward": forward_reward,
"reward_ctrl": -ctrl_cost,
"reward_survive": healthy_reward,
"x_position": xy_position_after[0],
"y_position": xy_position_after[1],
"distance_from_origin": np.linalg.norm(xy_position_after, ord=2),
"x_velocity": x_velocity,
"y_velocity": y_velocity,
"forward_reward": forward_reward,
}
if self._use_contact_forces:
contact_cost = self.contact_cost
costs += contact_cost
info["reward_ctrl"] = -contact_cost
reward = rewards - costs
if self.render_mode == "human":
self.render()
return observation, reward, terminated, False, info
def _get_obs(self):
position = self.data.qpos.flat.copy()
velocity = self.data.qvel.flat.copy()
if self._exclude_current_positions_from_observation:
position = position[2:]
if self._use_contact_forces:
contact_force = self.contact_forces.flat.copy()
return np.concatenate((position, velocity, contact_force))
else:
return np.concatenate((position, velocity))
def reset_model(self):
noise_low = -self._reset_noise_scale
noise_high = self._reset_noise_scale
qpos = self.init_qpos + self.np_random.uniform(
low=noise_low, high=noise_high, size=self.model.nq
)
qvel = (
self.init_qvel
+ self._reset_noise_scale * self.np_random.standard_normal(self.model.nv)
)
self.set_state(qpos, qvel)
observation = self._get_obs()
return observation
def viewer_setup(self):
assert self.viewer is not None
for key, value in DEFAULT_CAMERA_CONFIG.items():
if isinstance(value, np.ndarray):
getattr(self.viewer.cam, key)[:] = value
else:
setattr(self.viewer.cam, key, value)
三、mujoco定义倒立摆模型
- 在mujoco自带的例子中,inverted_pendulum.xml可以参考,这里重新写了一个
<?xml version="1.0" ?>
<mujoco>
<default>
<geom rgba=".8 .6 .4 1"/>
</default>
<asset>
<texture type="skybox" builtin="gradient" rgb1="1 1 1" rgb2=".6 .8 1"
width="256" height="256"/>
</asset>
<worldbody>
<body name="cart" pos="0 0 0">
<geom type="box" size="0.2 0.1 0.1" rgba="1 0 0 1"/>
<joint type="slide" axis="1 0 0" name="slide_joint"/>
<body name="pole" pos="0 0 -0.5">
<joint type="hinge" pos="0 0 0.5" axis="0 1 0" name="hinge_joint"/>
<geom type="cylinder" size="0.05 0.5" rgba="0 1 0 1" />
<inertial pos="0 0 0" mass="10" diaginertia="0.1 0.1 0.1"/>
</body>
</body>
</worldbody>
<actuator>
<motor ctrllimited="true" ctrlrange="-1 1" joint="slide_joint" gear="100"/>
<position name="position_servo" joint="slide_joint" kp="500" />
<velocity name="velocity_servo" joint="slide_joint" kv="100" />
</actuator>
</mujoco>
四、倒立摆强化学习算法
1、离散动作空间
import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
from torch.distributions import Bernoulli
from torch.autograd import Variable
from itertools import count
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PGN(nn.Module):
def __init__(self):
super(PGN, self).__init__()
self.linear1 = nn.Linear(4, 24)
self.linear2 = nn.Linear(24, 36)
self.linear3 = nn.Linear(36, 1)
def forward(self, x):
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = torch.sigmoid(self.linear3(x))
return x
class CartAgent(object):
def __init__(self, learning_rate, gamma):
self.pgn = PGN()
self.gamma = gamma
self._init_memory()
self.optimizer = torch.optim.RMSprop(self.pgn.parameters(), lr=learning_rate)
def memorize(self, state, action, reward):
# save to memory for mini-batch gradient descent
self.state_pool.append(state)
self.action_pool.append(action)
self.reward_pool.append(reward)
self.steps += 1
def learn(self):
self._adjust_reward()
# policy gradient
self.optimizer.zero_grad()
for i in range(self.steps):
# all steps in multi games
state = self.state_pool[i]
action = torch.FloatTensor([self.action_pool[i]])
reward = self.reward_pool[i]
probs = self.act(state)
m = Bernoulli(probs)
loss = -m.log_prob(action) * reward
loss.backward()
self.optimizer.step()
self._init_memory()
def act(self, state):
return self.pgn(state)
def _init_memory(self):
self.state_pool = []
self.action_pool = []
self.reward_pool = []
self.steps = 0
def _adjust_reward(self):
# backward weight
running_add = 0
for i in reversed(range(self.steps)):
if self.reward_pool[i] == 0:
running_add = 0
else:
running_add = running_add * self.gamma + self.reward_pool[i]
self.reward_pool[i] = running_add
# normalize reward
reward_mean = np.mean(self.reward_pool)
reward_std = np.std(self.reward_pool)
for i in range(self.steps):
self.reward_pool[i] = (self.reward_pool[i] - reward_mean) / reward_std
def train():
# hyper parameter
BATCH_SIZE = 5
LEARNING_RATE = 0.01
GAMMA = 0.99
NUM_EPISODES = 500
env = gym.make('CartPole-v1',render_mode='human')
cart_agent = CartAgent(learning_rate=LEARNING_RATE, gamma=GAMMA)
for i_episode in range(NUM_EPISODES):
next_state = env.reset()[0]
for t in count():
state = torch.from_numpy(next_state).float()
probs = cart_agent.act(state)
m = Bernoulli(probs)
action = m.sample()
action = action.data.numpy().astype(int).item()
next_state, reward, done, _,_ = env.step(action)
# end action's reward equals 0
if done:
reward = 0
cart_agent.memorize(state, action, reward)
if done:
logger.info({'Episode {}: durations {}'.format(i_episode, t)})
break
# update parameter every batch size
if i_episode > 0 and i_episode % BATCH_SIZE == 0:
cart_agent.learn()
if __name__ == '__main__':
train()
2、连续动作空间
# python3
# Create Dat3: 2022-12-27
# Func: PPO 输出action为连续变量
# =====================================================================================================
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import gym
import copy
import random
from collections import deque
from tqdm import tqdm
import typing as typ
class policyNet(nn.Module):
"""
continuity action:
normal distribution (mean, std)
"""
def __init__(self, state_dim: int, hidden_layers_dim: typ.List, action_dim: int):
super(policyNet, self).__init__()
self.features = nn.ModuleList()
for idx, h in enumerate(hidden_layers_dim):
self.features.append(nn.ModuleDict({
'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_dim, h),
'linear_action': nn.ReLU(inplace=True)
}))
self.fc_mu = nn.Linear(hidden_layers_dim[-1], action_dim)
self.fc_std = nn.Linear(hidden_layers_dim[-1], action_dim)
def forward(self, x):
for layer in self.features:
x = layer['linear_action'](layer['linear'](x))
mean_ = 2.0 * torch.tanh(self.fc_mu(x))
# np.log(1 + np.exp(2))
std = F.softplus(self.fc_std(x))
return mean_, std
class valueNet(nn.Module):
def __init__(self, state_dim, hidden_layers_dim):
super(valueNet, self).__init__()
self.features = nn.ModuleList()
for idx, h in enumerate(hidden_layers_dim):
self.features.append(nn.ModuleDict({
'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_dim, h),
'linear_activation': nn.ReLU(inplace=True)
}))
self.head = nn.Linear(hidden_layers_dim[-1] , 1)
def forward(self, x):
for layer in self.features:
x = layer['linear_activation'](layer['linear'](x))
return self.head(x)
def compute_advantage(gamma, lmbda, td_delta):
td_delta = td_delta.detach().numpy()
adv_list = []
adv = 0
for delta in td_delta[::-1]:
adv = gamma * lmbda * adv + delta
adv_list.append(adv)
adv_list.reverse()
return torch.FloatTensor(adv_list)
class PPO:
"""
PPO算法, 采用截断方式
"""
def __init__(self,
state_dim: int,
hidden_layers_dim: typ.List,
action_dim: int,
actor_lr: float,
critic_lr: float,
gamma: float,
PPO_kwargs: typ.Dict,
device: torch.device
):
self.actor = policyNet(state_dim, hidden_layers_dim, action_dim).to(device)
self.critic = valueNet(state_dim, hidden_layers_dim).to(device)
self.actor_opt = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_opt = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.gamma = gamma
self.lmbda = PPO_kwargs['lmbda']
self.ppo_epochs = PPO_kwargs['ppo_epochs'] # 一条序列的数据用来训练的轮次
self.eps = PPO_kwargs['eps'] # PPO中截断范围的参数
self.count = 0
self.device = device
def policy(self, state):
state = torch.FloatTensor([state]).to(self.device)
mu, std = self.actor(state)
action_dist = torch.distributions.Normal(mu, std)
action = action_dist.sample()
return [action.item()]
def update(self, samples: deque):
self.count += 1
state, action, reward, next_state, done = zip(*samples)
state = torch.FloatTensor(state).to(self.device)
action = torch.tensor(action).view(-1, 1).to(self.device)
reward = torch.tensor(reward).view(-1, 1).to(self.device)
reward = (reward + 8.0) / 8.0 # 和TRPO一样,对奖励进行修改,方便训练
next_state = torch.FloatTensor(next_state).to(self.device)
done = torch.FloatTensor(done).view(-1, 1).to(self.device)
td_target = reward + self.gamma * self.critic(next_state) * (1 - done)
td_delta = td_target - self.critic(state)
advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
mu, std = self.actor(state)
action_dists = torch.distributions.Normal(mu.detach(), std.detach())
# 动作是正态分布
old_log_probs = action_dists.log_prob(action)
for _ in range(self.ppo_epochs):
mu, std = self.actor(state)
action_dists = torch.distributions.Normal(mu, std)
log_prob = action_dists.log_prob(action)
# e(log(a/b))
ratio = torch.exp(log_prob - old_log_probs)
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage
actor_loss = torch.mean(-torch.min(surr1, surr2)).float()
critic_loss = torch.mean(
F.mse_loss(self.critic(state).float(), td_target.detach().float())
).float()
self.actor_opt.zero_grad()
self.critic_opt.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_opt.step()
self.critic_opt.step()
class replayBuffer:
def __init__(self, capacity: int):
self.buffer = deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
self.buffer.append( (state, action, reward, next_state, done) )
def __len__(self):
return len(self.buffer)
def sample(self, batch_size: int):
return random.sample(self.buffer, batch_size)
def play(env, env_agent, cfg, episode_count=2):
for e in range(episode_count):
s, _ = env.reset()
done = False
episode_reward = 0
episode_cnt = 0
while not done:
env.render()
a = env_agent.policy(s)
n_state, reward, done, _, _ = env.step(a)
episode_reward += reward
episode_cnt += 1
s = n_state
if (episode_cnt >= 3 * cfg.max_episode_steps) or (episode_reward >= 3*cfg.max_episode_rewards):
break
print(f'Get reward {episode_reward}. Last {episode_cnt} times')
env.close()
class Config:
num_episode = 1200
state_dim = None
hidden_layers_dim = [ 128, 128 ]
action_dim = 20
actor_lr = 1e-4
critic_lr = 5e-3
PPO_kwargs = {
'lmbda': 0.9,
'eps': 0.2,
'ppo_epochs': 10
}
gamma = 0.9
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
buffer_size = 20480
minimal_size = 1024
batch_size = 128
save_path = r'./ac_model.ckpt'
# 回合停止控制
max_episode_rewards = 260
max_episode_steps = 260
def __init__(self, env):
self.state_dim = env.observation_space.shape[0]
try:
self.action_dim = env.action_space.n
except Exception as e:
self.action_dim = env.action_space.shape[0]
print(f'device={self.device} | env={str(env)}')
def train_agent(env, cfg):
ac_agent = PPO(
state_dim=cfg.state_dim,
hidden_layers_dim=cfg.hidden_layers_dim,
action_dim=cfg.action_dim,
actor_lr=cfg.actor_lr,
critic_lr=cfg.critic_lr,
gamma=cfg.gamma,
PPO_kwargs=cfg.PPO_kwargs,
device=cfg.device
)
tq_bar = tqdm(range(cfg.num_episode))
rewards_list = []
now_reward = 0
bf_reward = -np.inf
for i in tq_bar:
buffer_ = replayBuffer(cfg.buffer_size)
tq_bar.set_description(f'Episode [ {i+1} / {cfg.num_episode} ]')
s, _ = env.reset()
done = False
episode_rewards = 0
steps = 0
while not done:
a = ac_agent.policy(s)
n_s, r, done, _, _ = env.step(a)
buffer_.add(s, a, r, n_s, done)
s = n_s
episode_rewards += r
steps += 1
if (episode_rewards >= cfg.max_episode_rewards) or (steps >= cfg.max_episode_steps):
break
ac_agent.update(buffer_.buffer)
rewards_list.append(episode_rewards)
now_reward = np.mean(rewards_list[-10:])
if bf_reward < now_reward:
torch.save(ac_agent.actor.state_dict(), cfg.save_path)
bf_reward = now_reward
tq_bar.set_postfix({'lastMeanRewards': f'{now_reward:.2f}', 'BEST': f'{bf_reward:.2f}'})
env.close()
return ac_agent
if __name__ == '__main__':
print('=='*35)
print('Training Pendulum-v1')
env = gym.make('Pendulum-v1')
cfg = Config(env)
ac_agent = train_agent(env, cfg)
ac_agent.actor.load_state_dict(torch.load(cfg.save_path))
play(gym.make('Pendulum-v1', render_mode="human"), ac_agent, cfg)
3、倒立摆连续空间
import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
from torch.distributions import Normal
from torch.autograd import Variable
from itertools import count
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PGN(nn.Module):
def __init__(self):
super(PGN, self).__init__()
self.linear1 = nn.Linear(4, 24)
self.linear2 = nn.Linear(24, 36)
self.mean = nn.Linear(36, 1) # 输出均值
self.std = nn.Parameter(torch.tensor(0.1)) # 输出标准差
def forward(self, x):
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
mean = self.mean(x)
return mean
class CartAgent(object):
def __init__(self, learning_rate, gamma):
self.pgn = PGN()
self.gamma = gamma
self._init_memory()
self.optimizer = torch.optim.RMSprop(self.pgn.parameters(), lr=learning_rate)
def memorize(self, state, action, reward):
# save to memory for mini-batch gradient descent
self.state_pool.append(state)
self.action_pool.append(action)
self.reward_pool.append(reward)
self.steps += 1
def learn(self):
self._adjust_reward()
# policy gradient
self.optimizer.zero_grad()
for i in range(self.steps):
state = self.state_pool[i]
action = torch.FloatTensor([self.action_pool[i]])
reward = self.reward_pool[i]
mean = self.act(state)
m = Normal(mean, self.pgn.std)
loss = -m.log_prob(action) * reward
loss.backward()
self.optimizer.step()
self._init_memory()
def act(self, state):
return self.pgn(state)
def _init_memory(self):
self.state_pool = []
self.action_pool = []
self.reward_pool = []
self.steps = 0
def _adjust_reward(self):
running_add = 0
for i in reversed(range(self.steps)):
if self.reward_pool[i] == 0:
running_add = 0
else:
running_add = running_add * self.gamma + self.reward_pool[i]
self.reward_pool[i] = running_add
reward_mean = np.mean(self.reward_pool)
reward_std = np.std(self.reward_pool)
for i in range(self.steps):
self.reward_pool[i] = (self.reward_pool[i] - reward_mean) / reward_std
def train():
BATCH_SIZE = 5
LEARNING_RATE = 0.01
GAMMA = 0.99
NUM_EPISODES = 500
env = gym.make('InvertedPendulum-v2', render_mode='human')
cart_agent = CartAgent(learning_rate=LEARNING_RATE, gamma=GAMMA)
for i_episode in range(NUM_EPISODES):
next_state = env.reset()[0]
for t in count():
# print("++++++++++++++:",t)
state = torch.from_numpy(next_state).float()
mean = cart_agent.act(state)
# print(cart_agent.pgn.std)
if(cart_agent.pgn.std <=0):
cart_agent.pgn.std = nn.Parameter(torch.tensor(0.01))
m = Normal(mean, cart_agent.pgn.std)
action = m.sample()
action = action.data.numpy().astype(float).item()
action = np.clip(action, -3.0, 3.0) # 限制力矩范围在[-1.0, 1.0]
action = [action]
next_state, reward, done, _, _ = env.step(action)
if done:
reward = 0
cart_agent.memorize(state, action, reward)
if done:
logger.info({'Episode {}: durations {}'.format(i_episode, t)})
break
if i_episode > 0 and i_episode % BATCH_SIZE == 0:
cart_agent.learn()
if __name__ == '__main__':
train()