强化学习:基础开发

news2024/11/22 14:03:29

基本就是把看到有用的资料整合在一起了

资料

https://blog.csdn.net/weixin_48878618/article/details/133590646

https://blog.csdn.net/weixin_42769131/article/details/104783188?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522166792845916800182132771%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=166792845916800182132771&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-1-104783188-null-null.142%5Ev63%5Ewechat,201%5Ev3%5Eadd_ask,213%5Ev2%5Et3_esquery_v3&utm_term=gym%E5%A4%9A%E5%B1%82%E6%84%9F%E7%9F%A5%E6%9C%BA&spm=1018.2226.3001.4187

https://blog.csdn.net/Scc_hy/article/details/128297350

一、gym自定义环境

1、定义环境类

"""
http://incompleteideas.net/sutton/MountainCar/MountainCar1.cp
permalink: https://perma.cc/6Z2N-PFWC
"""
 
import math
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding
 
 
class GridEnv(gym.Env):
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 30
    }
 
    def __init__(self, goal_velocity=0):
        self.min_position = -1.2
        self.max_position = 0.6
        self.max_speed = 0.07
        self.goal_position = 0.5
        self.goal_velocity = goal_velocity
 
        self.force = 0.001
        self.gravity = 0.0025
 
        self.low = np.array([self.min_position, -self.max_speed], dtype=np.float32)
        self.high = np.array([self.max_position, self.max_speed], dtype=np.float32)
 
        self.viewer = None
 
        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(self.low, self.high, dtype=np.float32)
 
        self.seed()
 
    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]
 
    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
 
        position, velocity = self.state
        velocity += (action - 1) * self.force + math.cos(3 * position) * (-self.gravity)
        velocity = np.clip(velocity, -self.max_speed, self.max_speed)
        position += velocity
        position = np.clip(position, self.min_position, self.max_position)
        if (position == self.min_position and velocity < 0): velocity = 0
 
        done = bool(position >= self.goal_position and velocity >= self.goal_velocity)
        reward = -1.0
 
        self.state = (position, velocity)
 
        return np.array(self.state), reward, done, action, {}
 
    def reset(self):
        self.state = np.array([self.np_random.uniform(low=-0.6, high=-0.4), 0])
        return np.array(self.state)
 
    def _height(self, xs):
        return np.sin(3 * xs) * .45 + .55
 
    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400
 
        world_width = self.max_position - self.min_position
        scale = screen_width / world_width
        carwidth = 40
        carheight = 20
 
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            xs = np.linspace(self.min_position, self.max_position, 100)
            ys = self._height(xs)
            xys = list(zip((xs - self.min_position) * scale, ys * scale))
 
            self.track = rendering.make_polyline(xys)
            self.track.set_linewidth(4)
            self.viewer.add_geom(self.track)
 
            clearance = 10
 
            l, r, t, b = -carwidth / 2, carwidth / 2, carheight, 0
            car = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            car.add_attr(rendering.Transform(translation=(0, clearance)))
            self.cartrans = rendering.Transform()
            car.add_attr(self.cartrans)
            self.viewer.add_geom(car)
            frontwheel = rendering.make_circle(carheight / 2.5)
            frontwheel.set_color(.5, .5, .5)
            frontwheel.add_attr(rendering.Transform(translation=(carwidth / 4, clearance)))
            frontwheel.add_attr(self.cartrans)
            self.viewer.add_geom(frontwheel)
            backwheel = rendering.make_circle(carheight / 2.5)
            backwheel.add_attr(rendering.Transform(translation=(-carwidth / 4, clearance)))
            backwheel.add_attr(self.cartrans)
            backwheel.set_color(.5, .5, .5)
            self.viewer.add_geom(backwheel)
            flagx = (self.goal_position - self.min_position) * scale
            flagy1 = self._height(self.goal_position) * scale
            flagy2 = flagy1 + 50
            flagpole = rendering.Line((flagx, flagy1), (flagx, flagy2))
            self.viewer.add_geom(flagpole)
            flag = rendering.FilledPolygon([(flagx, flagy2), (flagx, flagy2 - 10), (flagx + 25, flagy2 - 5)])
            flag.set_color(.8, .8, 0)
            self.viewer.add_geom(flag)
 
        pos = self.state[0]
        self.cartrans.set_translation((pos - self.min_position) * scale, self._height(pos) * scale)
        self.cartrans.set_rotation(math.cos(3 * pos))
 
        return self.viewer.render(return_rgb_array=mode == 'rgb_array')
 
    def get_keys_to_action(self):
        return {(): 1, (276,): 0, (275,): 2, (275, 276): 1}  # control with left and right arrow keys
 
    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None

2、修改环境类所处的模块的初始化代码

文件地址:‘/home/xxx/gym/gym/envs/classic_control/__init__.py’

from gym.envs.classic_control import grid_mdp
from gym.envs.classic_control.grid_mdp import GridEnv

3、在gym中注册

文件地址:‘/home/xxx/gym/gym/envs/__init__.py’

register (
id= 'GridWorld-v0',
entry_point='gym.envs.classic_control:GridEnv', 
max_episode_steps=200, reward_threshold=100.0,
)

4、测试代码

 
import gym
import time
env = gym.make('GridWorld-v0')
 
for eq in range(10):
    obs = env.reset()
    done = False
    rewards = 0
    while not done:
        action = env.action_space.sample()
        observation, reward, terminated, action, info= env.step(action)
        env.render()
        rewards += reward
    print(rewards)

在这里插入图片描述

二、gym自定义mujoco环境

1、调用gym中的mujoco环境

  • gym中有使用mujoco环境的例子,其中使用mujoco模型,并将相关的api改成gym风格
import gym

env = gym.make('Ant-v4',render_mode='human')  # 创建Humanoid环境实例
observation = env.reset()  # 重置环境并获取初始观测

for _ in range(1000):  # 执行1000个步骤
    env.render()  # 渲染环境图像

    action = env.action_space.sample()  # 从动作空间中随机采样一个动作
    print(a)
    next_observation, reward, terminated,done, info = env.step(action)  # 执行动作并获取下一个观测、奖励等信息
    print(_)
    if done:
        break

env.close()  # 关闭环境

在这里插入图片描述

2、gym中的mujoco自带例子

  • 主要就是调用了from gym.envs.mujoco import MujocoEnv这个模块
import numpy as np

from gym import utils
from gym.envs.mujoco import MujocoEnv
from gym.spaces import Box

DEFAULT_CAMERA_CONFIG = {
    "distance": 4.0,
}


class AntEnv(MujocoEnv, utils.EzPickle):
    metadata = {
        "render_modes": [
            "human",
            "rgb_array",
            "depth_array",
        ],
        "render_fps": 20,
    }

    def __init__(
        self,
        xml_file="ant.xml",
        ctrl_cost_weight=0.5,
        use_contact_forces=False,
        contact_cost_weight=5e-4,
        healthy_reward=1.0,
        terminate_when_unhealthy=True,
        healthy_z_range=(0.2, 1.0),
        contact_force_range=(-1.0, 1.0),
        reset_noise_scale=0.1,
        exclude_current_positions_from_observation=True,
        **kwargs
    ):
        utils.EzPickle.__init__(
            self,
            xml_file,
            ctrl_cost_weight,
            use_contact_forces,
            contact_cost_weight,
            healthy_reward,
            terminate_when_unhealthy,
            healthy_z_range,
            contact_force_range,
            reset_noise_scale,
            exclude_current_positions_from_observation,
            **kwargs
        )

        self._ctrl_cost_weight = ctrl_cost_weight
        self._contact_cost_weight = contact_cost_weight

        self._healthy_reward = healthy_reward
        self._terminate_when_unhealthy = terminate_when_unhealthy
        self._healthy_z_range = healthy_z_range

        self._contact_force_range = contact_force_range

        self._reset_noise_scale = reset_noise_scale

        self._use_contact_forces = use_contact_forces

        self._exclude_current_positions_from_observation = (
            exclude_current_positions_from_observation
        )

        obs_shape = 27
        if not exclude_current_positions_from_observation:
            obs_shape += 2
        if use_contact_forces:
            obs_shape += 84

        observation_space = Box(
            low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float64
        )

        MujocoEnv.__init__(
            self, xml_file, 5, observation_space=observation_space, **kwargs
        )

    @property
    def healthy_reward(self):
        return (
            float(self.is_healthy or self._terminate_when_unhealthy)
            * self._healthy_reward
        )

    def control_cost(self, action):
        control_cost = self._ctrl_cost_weight * np.sum(np.square(action))
        return control_cost

    @property
    def contact_forces(self):
        raw_contact_forces = self.data.cfrc_ext
        min_value, max_value = self._contact_force_range
        contact_forces = np.clip(raw_contact_forces, min_value, max_value)
        return contact_forces

    @property
    def contact_cost(self):
        contact_cost = self._contact_cost_weight * np.sum(
            np.square(self.contact_forces)
        )
        return contact_cost

    @property
    def is_healthy(self):
        state = self.state_vector()
        min_z, max_z = self._healthy_z_range
        is_healthy = np.isfinite(state).all() and min_z <= state[2] <= max_z
        return is_healthy

    @property
    def terminated(self):
        terminated = not self.is_healthy if self._terminate_when_unhealthy else False
        return terminated

    def step(self, action):
        xy_position_before = self.get_body_com("torso")[:2].copy()
        self.do_simulation(action, self.frame_skip)
        xy_position_after = self.get_body_com("torso")[:2].copy()

        xy_velocity = (xy_position_after - xy_position_before) / self.dt
        x_velocity, y_velocity = xy_velocity

        forward_reward = x_velocity
        healthy_reward = self.healthy_reward

        rewards = forward_reward + healthy_reward

        costs = ctrl_cost = self.control_cost(action)

        terminated = self.terminated
        observation = self._get_obs()
        info = {
            "reward_forward": forward_reward,
            "reward_ctrl": -ctrl_cost,
            "reward_survive": healthy_reward,
            "x_position": xy_position_after[0],
            "y_position": xy_position_after[1],
            "distance_from_origin": np.linalg.norm(xy_position_after, ord=2),
            "x_velocity": x_velocity,
            "y_velocity": y_velocity,
            "forward_reward": forward_reward,
        }
        if self._use_contact_forces:
            contact_cost = self.contact_cost
            costs += contact_cost
            info["reward_ctrl"] = -contact_cost

        reward = rewards - costs

        if self.render_mode == "human":
            self.render()
        return observation, reward, terminated, False, info

    def _get_obs(self):
        position = self.data.qpos.flat.copy()
        velocity = self.data.qvel.flat.copy()

        if self._exclude_current_positions_from_observation:
            position = position[2:]

        if self._use_contact_forces:
            contact_force = self.contact_forces.flat.copy()
            return np.concatenate((position, velocity, contact_force))
        else:
            return np.concatenate((position, velocity))

    def reset_model(self):
        noise_low = -self._reset_noise_scale
        noise_high = self._reset_noise_scale

        qpos = self.init_qpos + self.np_random.uniform(
            low=noise_low, high=noise_high, size=self.model.nq
        )
        qvel = (
            self.init_qvel
            + self._reset_noise_scale * self.np_random.standard_normal(self.model.nv)
        )
        self.set_state(qpos, qvel)

        observation = self._get_obs()

        return observation

    def viewer_setup(self):
        assert self.viewer is not None
        for key, value in DEFAULT_CAMERA_CONFIG.items():
            if isinstance(value, np.ndarray):
                getattr(self.viewer.cam, key)[:] = value
            else:
                setattr(self.viewer.cam, key, value)

三、mujoco定义倒立摆模型

  • 在mujoco自带的例子中,inverted_pendulum.xml可以参考,这里重新写了一个
<?xml version="1.0" ?>
<mujoco>

    <default>
        <geom rgba=".8 .6 .4 1"/>
    </default>
    <asset>
        <texture type="skybox" builtin="gradient" rgb1="1 1 1" rgb2=".6 .8 1" 
                 width="256" height="256"/>
    </asset>
    <worldbody>
        <body name="cart" pos="0 0 0">
            <geom type="box" size="0.2 0.1 0.1" rgba="1 0 0 1"/>
            <joint type="slide" axis="1 0 0" name="slide_joint"/>

            
            <body name="pole" pos="0 0 -0.5">
                <joint type="hinge" pos="0 0 0.5" axis="0 1 0" name="hinge_joint"/>
                <geom type="cylinder" size="0.05 0.5" rgba="0 1 0 1" />
                <inertial pos="0 0 0" mass="10" diaginertia="0.1 0.1 0.1"/>
            </body>
        </body>
    </worldbody>
    
    <actuator>
        <motor ctrllimited="true" ctrlrange="-1 1" joint="slide_joint" gear="100"/>
		<position name="position_servo" joint="slide_joint" kp="500" />
		<velocity name="velocity_servo" joint="slide_joint" kv="100" />

    </actuator>
</mujoco>

在这里插入图片描述

四、倒立摆强化学习算法

1、离散动作空间

import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym

from torch.distributions import Bernoulli
from torch.autograd import Variable
from itertools import count

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PGN(nn.Module):
    def __init__(self):
        super(PGN, self).__init__()
        self.linear1 = nn.Linear(4, 24)
        self.linear2 = nn.Linear(24, 36)
        self.linear3 = nn.Linear(36, 1)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = torch.sigmoid(self.linear3(x))
        return x


class CartAgent(object):
    def __init__(self, learning_rate, gamma):
        self.pgn = PGN()
        self.gamma = gamma

        self._init_memory()
        self.optimizer = torch.optim.RMSprop(self.pgn.parameters(), lr=learning_rate)

    def memorize(self, state, action, reward):
        # save to memory for mini-batch gradient descent
        self.state_pool.append(state)
        self.action_pool.append(action)
        self.reward_pool.append(reward)
        self.steps += 1

    def learn(self):
        self._adjust_reward()

        # policy gradient
        self.optimizer.zero_grad()
        for i in range(self.steps):
            # all steps in multi games 
            state = self.state_pool[i]
            action = torch.FloatTensor([self.action_pool[i]])
            reward = self.reward_pool[i]

            probs = self.act(state)
            m = Bernoulli(probs)
            loss = -m.log_prob(action) * reward
            loss.backward()
        self.optimizer.step()
        
        self._init_memory()

    def act(self, state):
        return self.pgn(state) 

    def _init_memory(self):
        self.state_pool = []
        self.action_pool = []
        self.reward_pool = []
        self.steps = 0

    def _adjust_reward(self):
        # backward weight
        running_add = 0
        for i in reversed(range(self.steps)):
            if self.reward_pool[i] == 0:
                running_add = 0
            else:
                running_add = running_add * self.gamma + self.reward_pool[i]
                self.reward_pool[i] = running_add

        # normalize reward
        reward_mean = np.mean(self.reward_pool)
        reward_std = np.std(self.reward_pool)
        for i in range(self.steps):
            self.reward_pool[i] = (self.reward_pool[i] - reward_mean) / reward_std


def train():
    # hyper parameter
    BATCH_SIZE = 5
    LEARNING_RATE = 0.01
    GAMMA = 0.99
    NUM_EPISODES = 500

    env = gym.make('CartPole-v1',render_mode='human')
    cart_agent = CartAgent(learning_rate=LEARNING_RATE, gamma=GAMMA)

    for i_episode in range(NUM_EPISODES):
        next_state = env.reset()[0]

        for t in count():
            state = torch.from_numpy(next_state).float()

            probs = cart_agent.act(state)
            m = Bernoulli(probs)
            action = m.sample()

            action = action.data.numpy().astype(int).item()
            next_state, reward, done, _,_ = env.step(action)

            # end action's reward equals 0
            if done:
                reward = 0

            cart_agent.memorize(state, action, reward)

            if done:
                logger.info({'Episode {}: durations {}'.format(i_episode, t)})
                break

        # update parameter every batch size
        if i_episode > 0 and i_episode % BATCH_SIZE == 0:
            cart_agent.learn()


if __name__ == '__main__':
    train()

在这里插入图片描述

2、连续动作空间

# python3
# Create Dat3: 2022-12-27
# Func: PPO 输出action为连续变量
# =====================================================================================================

import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import gym
import copy
import random
from collections import deque
from tqdm import tqdm
import typing as typ



class policyNet(nn.Module):
    """
    continuity action:
    normal distribution (mean, std) 
    """
    def __init__(self, state_dim: int, hidden_layers_dim: typ.List, action_dim: int):
        super(policyNet, self).__init__()
        self.features = nn.ModuleList()
        for idx, h in enumerate(hidden_layers_dim):
            self.features.append(nn.ModuleDict({
                'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_dim, h),
                'linear_action': nn.ReLU(inplace=True)
            }))

        self.fc_mu = nn.Linear(hidden_layers_dim[-1], action_dim)
        self.fc_std = nn.Linear(hidden_layers_dim[-1], action_dim)

    def forward(self, x):
        for layer in self.features:
            x = layer['linear_action'](layer['linear'](x))
        
        mean_ = 2.0 * torch.tanh(self.fc_mu(x))
        # np.log(1 + np.exp(2))
        std = F.softplus(self.fc_std(x))
        return mean_, std


class valueNet(nn.Module):
    def __init__(self, state_dim, hidden_layers_dim):
        super(valueNet, self).__init__()
        self.features = nn.ModuleList()
        for idx, h in enumerate(hidden_layers_dim):
            self.features.append(nn.ModuleDict({
                'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_dim, h),
                'linear_activation': nn.ReLU(inplace=True)
            }))
        
        self.head = nn.Linear(hidden_layers_dim[-1] , 1)
        
    def forward(self, x):
        for layer in self.features:
            x = layer['linear_activation'](layer['linear'](x))
        return self.head(x)


def compute_advantage(gamma, lmbda, td_delta):
    td_delta = td_delta.detach().numpy()
    adv_list = []
    adv = 0
    for delta in td_delta[::-1]:
        adv = gamma * lmbda * adv + delta
        adv_list.append(adv)
    adv_list.reverse()
    return torch.FloatTensor(adv_list)


class PPO:
    """
    PPO算法, 采用截断方式
    """
    def __init__(self,
                state_dim: int,
                hidden_layers_dim: typ.List,
                action_dim: int,
                actor_lr: float,
                critic_lr: float,
                gamma: float,
                PPO_kwargs: typ.Dict,
                device: torch.device
                ):
        self.actor = policyNet(state_dim, hidden_layers_dim, action_dim).to(device)
        self.critic = valueNet(state_dim, hidden_layers_dim).to(device)
        self.actor_opt = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_opt = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        
        self.gamma = gamma
        self.lmbda = PPO_kwargs['lmbda']
        self.ppo_epochs = PPO_kwargs['ppo_epochs'] # 一条序列的数据用来训练的轮次
        self.eps = PPO_kwargs['eps'] # PPO中截断范围的参数
        self.count = 0 
        self.device = device
    
    def policy(self, state):
        state = torch.FloatTensor([state]).to(self.device)
        mu, std = self.actor(state)
        action_dist = torch.distributions.Normal(mu, std)
        action = action_dist.sample()
        return [action.item()]
        
    def update(self, samples: deque):
        self.count += 1
        state, action, reward, next_state, done = zip(*samples)

        state = torch.FloatTensor(state).to(self.device)
        action = torch.tensor(action).view(-1, 1).to(self.device)
        reward = torch.tensor(reward).view(-1, 1).to(self.device)
        reward = (reward + 8.0) / 8.0  # 和TRPO一样,对奖励进行修改,方便训练
        next_state = torch.FloatTensor(next_state).to(self.device)
        done = torch.FloatTensor(done).view(-1, 1).to(self.device)
        
        td_target = reward + self.gamma * self.critic(next_state) * (1 - done)
        td_delta = td_target - self.critic(state)
        advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
                
        mu, std = self.actor(state)
        action_dists = torch.distributions.Normal(mu.detach(), std.detach())
        # 动作是正态分布
        old_log_probs = action_dists.log_prob(action)
        for _ in range(self.ppo_epochs):
            mu, std = self.actor(state)
            action_dists = torch.distributions.Normal(mu, std)
            log_prob = action_dists.log_prob(action)
            
            # e(log(a/b))
            ratio = torch.exp(log_prob - old_log_probs)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage

            actor_loss = torch.mean(-torch.min(surr1, surr2)).float()
            critic_loss = torch.mean(
                F.mse_loss(self.critic(state).float(), td_target.detach().float())
            ).float()
            self.actor_opt.zero_grad()
            self.critic_opt.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_opt.step()
            self.critic_opt.step()




class replayBuffer:
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append( (state, action, reward, next_state, done) )
    
    def __len__(self):
        return len(self.buffer)

    def sample(self, batch_size: int):
        return random.sample(self.buffer, batch_size)
        

def play(env, env_agent, cfg, episode_count=2):
    for e in range(episode_count):
        s, _ = env.reset()
        done = False
        episode_reward = 0
        episode_cnt = 0
        while not done:
            env.render()
            a = env_agent.policy(s)
            n_state, reward, done, _, _ = env.step(a)
            episode_reward += reward
            episode_cnt += 1
            s = n_state
            if (episode_cnt >= 3 * cfg.max_episode_steps) or (episode_reward >= 3*cfg.max_episode_rewards):
                break
    
        print(f'Get reward {episode_reward}. Last {episode_cnt} times')
    env.close()








class Config:
    num_episode = 1200
    state_dim = None
    hidden_layers_dim = [ 128, 128 ]
    action_dim = 20
    actor_lr = 1e-4
    critic_lr = 5e-3
    PPO_kwargs = {
        'lmbda': 0.9,
        'eps': 0.2,
        'ppo_epochs': 10
    }
    gamma = 0.9
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    buffer_size = 20480
    minimal_size = 1024
    batch_size = 128
    save_path = r'./ac_model.ckpt'
    # 回合停止控制
    max_episode_rewards = 260
    max_episode_steps = 260
    
    
    def __init__(self, env):
        self.state_dim = env.observation_space.shape[0]
        try:
            self.action_dim = env.action_space.n
        except Exception as e:
            self.action_dim = env.action_space.shape[0]
        print(f'device={self.device} | env={str(env)}')



def train_agent(env, cfg):
    ac_agent = PPO(
        state_dim=cfg.state_dim,
        hidden_layers_dim=cfg.hidden_layers_dim,
        action_dim=cfg.action_dim,
        actor_lr=cfg.actor_lr,
        critic_lr=cfg.critic_lr,
        gamma=cfg.gamma,
        PPO_kwargs=cfg.PPO_kwargs,
        device=cfg.device
    )           
    tq_bar = tqdm(range(cfg.num_episode))
    rewards_list = []
    now_reward = 0
    bf_reward = -np.inf
    for i in tq_bar:
        buffer_ = replayBuffer(cfg.buffer_size)
        tq_bar.set_description(f'Episode [ {i+1} / {cfg.num_episode} ]')    
        s, _ = env.reset()
        done = False
        episode_rewards = 0
        steps = 0
        while not done:
            a = ac_agent.policy(s)
            n_s, r, done, _, _ = env.step(a)
            buffer_.add(s, a, r, n_s, done)
            s = n_s
            episode_rewards += r
            steps += 1
            if (episode_rewards >= cfg.max_episode_rewards) or (steps >= cfg.max_episode_steps):
                break

        ac_agent.update(buffer_.buffer)
        rewards_list.append(episode_rewards)
        now_reward = np.mean(rewards_list[-10:])
        if bf_reward < now_reward:
            torch.save(ac_agent.actor.state_dict(), cfg.save_path)
            bf_reward = now_reward
        
        tq_bar.set_postfix({'lastMeanRewards': f'{now_reward:.2f}', 'BEST': f'{bf_reward:.2f}'})
    env.close()
    return ac_agent






if __name__ == '__main__':
    print('=='*35)
    print('Training Pendulum-v1')
    env = gym.make('Pendulum-v1')
    cfg = Config(env)
    ac_agent = train_agent(env, cfg)
    ac_agent.actor.load_state_dict(torch.load(cfg.save_path))
    play(gym.make('Pendulum-v1', render_mode="human"), ac_agent, cfg)

在这里插入图片描述

3、倒立摆连续空间

import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym

from torch.distributions import Normal
from torch.autograd import Variable
from itertools import count

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PGN(nn.Module):
    def __init__(self):
        super(PGN, self).__init__()
        self.linear1 = nn.Linear(4, 24)
        self.linear2 = nn.Linear(24, 36)
        self.mean = nn.Linear(36, 1)  # 输出均值
        self.std = nn.Parameter(torch.tensor(0.1))  # 输出标准差

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        mean = self.mean(x)
        return mean


class CartAgent(object):
    def __init__(self, learning_rate, gamma):
        self.pgn = PGN()
        self.gamma = gamma

        self._init_memory()
        self.optimizer = torch.optim.RMSprop(self.pgn.parameters(), lr=learning_rate)

    def memorize(self, state, action, reward):
        # save to memory for mini-batch gradient descent
        self.state_pool.append(state)
        self.action_pool.append(action)
        self.reward_pool.append(reward)
        self.steps += 1

    def learn(self):
        self._adjust_reward()

        # policy gradient
        self.optimizer.zero_grad()
        for i in range(self.steps):
            state = self.state_pool[i]
            action = torch.FloatTensor([self.action_pool[i]])
            reward = self.reward_pool[i]

            mean = self.act(state)
            m = Normal(mean, self.pgn.std)
            loss = -m.log_prob(action) * reward
            loss.backward()
        self.optimizer.step()
        
        self._init_memory()

    def act(self, state):
        return self.pgn(state) 

    def _init_memory(self):
        self.state_pool = []
        self.action_pool = []
        self.reward_pool = []
        self.steps = 0

    def _adjust_reward(self):
        running_add = 0
        for i in reversed(range(self.steps)):
            if self.reward_pool[i] == 0:
                running_add = 0
            else:
                running_add = running_add * self.gamma + self.reward_pool[i]
                self.reward_pool[i] = running_add

        reward_mean = np.mean(self.reward_pool)
        reward_std = np.std(self.reward_pool)
        for i in range(self.steps):
            self.reward_pool[i] = (self.reward_pool[i] - reward_mean) / reward_std


def train():
    BATCH_SIZE = 5
    LEARNING_RATE = 0.01
    GAMMA = 0.99
    NUM_EPISODES = 500

    env = gym.make('InvertedPendulum-v2', render_mode='human')
    cart_agent = CartAgent(learning_rate=LEARNING_RATE, gamma=GAMMA)

    for i_episode in range(NUM_EPISODES):
        next_state = env.reset()[0]

        for t in count():
            # print("++++++++++++++:",t)
            state = torch.from_numpy(next_state).float()

            mean = cart_agent.act(state)
            # print(cart_agent.pgn.std)
            if(cart_agent.pgn.std <=0):
                cart_agent.pgn.std = nn.Parameter(torch.tensor(0.01)) 
            m = Normal(mean, cart_agent.pgn.std)
            action = m.sample()


            action = action.data.numpy().astype(float).item()
            action = np.clip(action, -3.0, 3.0)  # 限制力矩范围在[-1.0, 1.0]
            action = [action]
            next_state, reward, done, _, _ = env.step(action)

            if done:
                reward = 0

            cart_agent.memorize(state, action, reward)

            if done:
                logger.info({'Episode {}: durations {}'.format(i_episode, t)})
                break

        if i_episode > 0 and i_episode % BATCH_SIZE == 0:
            cart_agent.learn()


if __name__ == '__main__':
    train()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1581278.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【azure笔记 1】容器实例管理python sdk封装

容器实例管理python sdk封装 测试结果 说明 这是根据我的需求写的&#xff0c;所以有些参数是写死的&#xff0c;比如cpu核数和内存&#xff0c;你可以根据你的需要自行修改。前置条件&#xff1a; 当前环境已安装python3.8以上版本和azure cli并且已经登陆到你的账户 依赖安…

Angular 使用DomSanitizer

跨站脚本Cross-site scripting 简称XSS&#xff0c;是代码注入的一种&#xff0c;是一种网站应用程序的安全漏洞攻击。它允许恶意用户将代码注入到网页上&#xff0c;其他用户在使用网页时就会收到影响&#xff0c;这类攻击通常包含了HTML和用户端脚本语言&#xff08;JS&…

echarts 条形图(柱状图)多个图例按钮默认高亮一个,且只能高亮一个

核心&#xff1a;给图例按钮添加点击事件 myChart.on("legendselectchanged", function (params) {let selected {功率柜: true,母线柜: false,充电桩终端: false,网络柜: false,};for (let key in selected) {if (key ! params.name) {myChart.setOption({legend:…

数据结构之来链表——单链表

什么是单链表&#xff1a; 文字说明&#xff1a; 单链表顾名思义&#xff0c;就是指单项链表&#xff0c;即只有一个方向的链性线性表。 图解&#xff1a; 如下图所示&#xff0c;即为链表&#xff08;DATA为我们自己所定义的数据类型&#xff09;&#xff1a; 单链表的创建&am…

【jQuery】jQuery框架

目录 1.jQuery基本用法 1.1选择器 1.2jQuery对象 1.3事件绑定 1.4链式编程 1.5过滤方法 1.6样式操纵 1.6属性操纵 1.7操作value 1.8查找方法 1.9类名操纵 1.10事件进阶 1.11触发事件 1.12window事件绑定 2.节点操作与动画 2.1获取位置 2.2滚动距离 2.3显示/隐…

使用阿里云试用Elasticsearch学习:4. 聚合——1

在这之前&#xff0c;本书致力于搜索。 通过搜索&#xff0c;如果我们有一个查询并且希望找到匹配这个查询的文档集&#xff0c;就好比在大海捞针。 通过聚合&#xff0c;我们会得到一个数据的概览。我们需要的是分析和总结全套的数据而不是寻找单个文档&#xff1a; 在大海里…

ONERugged车载平板终端:提升港口运输水平

现代港口是国际贸易中至关重要的枢纽&#xff0c;而提高港口运输效率对于促进贸易流通和经济发展至关重要。近年来&#xff0c;车载平板技术的快速发展为港口运输行业带来了巨大的变革和机遇。车载平板的广泛应用不仅提高了港口的操作效率&#xff0c;还改善了货物跟踪、通信和…

【第七篇】使用BurpSuite进行主动、被动扫描和主动、被动爬虫

文章目录 前言主动扫描被动扫描主动爬虫被动爬虫前言 Burp Scanner 既可以用作全自动扫描仪,也可以用作增强手动测试工作流程的强大手段。 扫描网站涉及两个阶段: 抓取内容和功能: Burp Scanner 首先在目标站点周围导航,密切反映真实用户的行为。它对站点的结构和内容以及…

MATLAB Simulink仿真搭建及代码生成技术—01自定义新建模型模板

MATLAB Simulink仿真搭建及代码生成技术 目录 01-自定义新建模型模板点击运行&#xff1a;显示效果&#xff1a;查看模型设置&#xff1a; 01-自定义新建模型模板 新建模型代码如下&#xff1a; function new_model(modelname) %建立一个名为SmartAss的新的模型并打开 open_…

STM32-模数转化器

ADC(Analog-to-Digital Converter) 指模数转换器。是指将连续变化的模拟信号转换 为离散的数字信号的器件。 ADC相关参数说明&#xff1a; 分辨率&#xff1a; 分辨率以二进制&#xff08;或十进制&#xff09;数的位数来表示&#xff0c;一般有 8 位、10 位、12 位、16 位…

Linux登录访问限制

Linux系统下&#xff0c;用户密码的有效期可以通过编辑/etc/login.defs文件控制&#xff1b;密码复杂度规则设定需要通过/etc/pam.d/system-auth文件控制&#xff1b;登录失败次数限制通常由/etc/pam.d/login文件限制&#xff0c;可使用pam_tally2模块进行设置。 Linux系统下的…

uniapp小程序下载并导出excel

<button click"confirmExport">导出excel</button>confirmExport() {let header {"X-Access-Token": uni.getStorageSync(ACCESS_TOKEN), //自定义请求头信息} let url "http"/......"; // 后端API地址uni.request({url: ur…

【神经网络】生成对抗网络GAN

生成对抗网络GAN 欢迎访问Blog总目录&#xff01; 文章目录 生成对抗网络GAN1.学习链接2.GAN结构2.1.生成模型Generator2.2.判别模型Discrimintor2.3.伪代码 3.优缺点3.1.优势3.2.缺点 4.pytorch GAN4.1.API4.2.GAN的搭建4.2.1.结果4.2.2.代码 4.3.示意图:star: 1.学习链接 …

0.开篇:SSM+Spring Boot导学

1. 为什么要使用框架 Spring是一个轻量级Java开发框架&#xff0c;最早有Rod Johnson创建&#xff0c;目的是为了解决企业级应用开发的业务逻辑层和其他各层的耦合问题。 几乎当下所有企业级JavaEE开发都离不开SSM&#xff08;Spring SpringMVC MyBatis&#xff09;Spring B…

计算机视觉——Python OpenCV BGR转HSV

这里将介绍如何使用 OpenCV 与 Python 来作彩色影像转HSV(RGB to HSV 或 BGR to HSV)&#xff0c;在写 Python 影像处理程序时常会用到 OpenCV cvtColor 作颜色空间转换的功能&#xff0c;接下来介绍怎么使用 Python 搭配 OpenCV 模块来进行 RGB/BGR 转 HSV 彩色转HSV空间。 H…

中科方德服务器操作系统安装zabbix5.0

原文链接&#xff1a;中科方德服务器操作系统安装zabbix5.0 Hello&#xff0c;大家好啊&#xff01;接着我们上一次的讨论&#xff0c;今天我要为大家介绍如何在已经安装好的中科方德服务器操作系统基础上&#xff0c;安装和配置Zabbix 5.0。Zabbix是一个开源的监控软件工具&am…

尝试在手机上运行google 最新开源的gpt模型 gemma

Gemma介绍 Gemma简介 Gemma是谷歌于2024年2月21日发布的一系列轻量级、最先进的开放语言模型&#xff0c;使用了与创建Gemini模型相同的研究和技术。由Google DeepMind和Google其他团队共同开发。 Gemma提供两种尺寸的模型权重&#xff1a;2B和7B。每种尺寸都带有经过预训练&a…

redis的三大模式的演化及集群模式思考和总结

redis的三大模式&#xff0c;也是循序渐进。 1、主从复制 比如一开始的读写分离的&#xff0c;主从复制。 一个master&#xff0c;多个slave。 master进行写和 增量同步&#xff0c;slave负责读&#xff0c;和接收增量同步的信息。 这样压力减轻。 2、哨兵模式 这个推出…

结构型模式--1.适配器模式【托尼托尼·乔巴】

1. 翻译家 在海贼王中&#xff0c;托尼托尼乔巴&#xff08;Tony Tony Chopper&#xff09;是草帽海贼团的船医&#xff0c;它本来是一头驯鹿&#xff0c;但是误食了动物系人人果实之后可以变成人的形态。 乔巴吃了恶魔果实之后的战斗力暂且抛开不谈&#xff0c;说说它掌握的第…

【数组】【最长距离】使循环数组所有元素相等的最少秒数

本文涉及知识点 数组 最长距离 LeetCode2808. 使循环数组所有元素相等的最少秒数 给你一个下标从 0 开始长度为 n 的数组 nums 。 每一秒&#xff0c;你可以对数组执行以下操作&#xff1a; 对于范围在 [0, n - 1] 内的每一个下标 i &#xff0c;将 nums[i] 替换成 nums[i] …