Hands on RL 之 Deep Deterministic Policy Gradient(DDPG)

news2025/1/20 6:23:26

Hands on RL 之 Deep Deterministic Policy Gradient(DDPG)

文章目录

  • Hands on RL 之 Deep Deterministic Policy Gradient(DDPG)
    • 1. 理论部分
      • 1.1 回顾 Deterministic Policy Gradient(DPG)
      • 1.2 Neural Network Difference
      • 1.3 Why is off-policy?
      • 1.4 Soft target update
      • 1.5 Maintain Exploration
      • 1.6 Other Techniques
      • 1.7 Pesudocode
    • 2. 代码实践
    • Reference

1. 理论部分

1.1 回顾 Deterministic Policy Gradient(DPG)

在介绍DDPG之前,我们先回顾一下DPG中最重要的结论,

Deterministic Policy Gradient Theorem即确定性策略梯度定理

∇ θ J ( μ θ ) = ∫ S ρ μ ( s ) ∇ θ μ θ ( s ) ∇ a Q μ ( s , a ) ∣ a = μ θ ( s ) d s = E s ∼ ρ μ [ ∇ θ μ θ ( s ) ∇ a Q μ ( s , a ) ∣ a = μ θ ( s ) ] \begin{aligned} \nabla_\theta J(\mu_\theta) & = \int_{\mathcal{S}} \rho^\mu(s) \nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)} \mathrm{d}s \\ & = \mathbb{E}_{s\sim\rho^\mu} \Big[ \nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)} \Big] \end{aligned} θJ(μθ)=Sρμ(s)θμθ(s)aQμ(s,a)a=μθ(s)ds=Esρμ[θμθ(s)aQμ(s,a)a=μθ(s)]

其中, a = μ θ ( s ) a=\mu_\theta(s) a=μθ(s)表示确定性的策略是从状态空间到动作空间的映射 μ θ : S → A \mu_\theta: \mathcal{S}\to\mathcal{A} μθ:SA,网络的参数为 θ \theta θ s ∼ ρ μ s\sim\rho^\mu sρμ表示状态 s s s符合在策略 μ \mu μ下的状态访问分布。如何推导的,这里不详细阐述。(可以参考Deterministic policy gradient algorithms)

接下来逐点介绍DDPG相较于DPG的改进

1.2 Neural Network Difference

​ DDPG在相较于传统的AC算法在网络结构上也有很大不同,首先看看传统算法的网络结构

Image

然后再看看DDPG的网络结构

Image

为什么DDPG会是这样的网络结构呢,这是因为DDPG中的actor输出的是确定性动作,而不是动作的概率分布,因此确定性的动作是连续的可以看作动作空间的维度为无穷,如果采用AC中critic的结构,我们无法通过遍历所有动作来取出某个特定动作对应的Q-value。因此DDPG中将actor的输出作为critic的输入,再联合状态输入,就能直接获得所采取动作 a = μ ( s t ) a=\mu(st) a=μ(st)的Q-value。

1.3 Why is off-policy?

​ 首先为什么DDPG或者说DPG是off-policy的?我们回顾stochastic policy π θ ( a ∣ s ) \pi_\theta(a|s) πθ(as)定义下的Q-value
Q π ( s t , a t ) = E r t , s t + 1 ∼ E [ r ( s t , a t ) + γ E a t + 1 ∼ π [ Q π ( s t + 1 , a t + 1 ) ] ] Q^\pi(s_t,a_t) = \mathbb{E}_{r_t, s_{t+1}\sim E}[r(s_t,a_t) + \gamma \mathbb{E}_{a_{t+1}\sim\pi}[Q^\pi(s_{t+1}, a_{t+1})]] Qπ(st,at)=Ert,st+1E[r(st,at)+γEat+1π[Qπ(st+1,at+1)]]
其中, E E E表示的是环境,即状态 s ∼ E s\sim E sE状态符合环境本身的分布。当我们使用确定性策略的时候 a = μ θ ( s ) a=\mu_\theta(s) a=μθ(s),那么inner expectation就自动被抵消掉了
Q π ( s t , a t ) = E r t , s t + 1 ∼ E [ r ( s t , a t ) + γ Q π ( s t + 1 , a t + 1 = μ ( s t + 1 ) ) ] Q^\pi(s_t,a_t) = \mathbb{E}_{r_t, s_{t+1}\sim E}[r(s_t,a_t) + \gamma Q^\pi(s_{t+1}, a_{t+1}=\mu(s_{t+1}))] Qπ(st,at)=Ert,st+1E[r(st,at)+γQπ(st+1,at+1=μ(st+1))]
这就意味着Q-value不再依赖于动作的访问分布,即没有了 a t + 1 ∼ π a_{t+1}\sim\pi at+1π。那么我们就可以通过行为策略behavior policy β \beta β产生的结果来计算该值,这让off-policy成为可能。

​ 实际上Q-value不再依赖于动作的访问分布,那么确定性梯度定理可以写作
∇ θ J ( μ θ ) ≈ E s ∼ ρ β [ ∇ θ μ θ ( s ) ∇ a Q μ ( s , a ) ∣ a = μ θ ( s ) ] \textcolor{red}{\nabla_\theta J(\mu_\theta) \approx \mathbb{E}_{s\sim\rho^\beta} \Big[ \nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)} \Big]} θJ(μθ)Esρβ[θμθ(s)aQμ(s,a)a=μθ(s)]
可以写作依赖于behavior policy β \beta β产生的状态访问分布的期望,这就是一种off-policy的形式。

1.4 Soft target update

​ 在DDPG中维护了四个神经网络,分别是policy network, target policy network, action value network, target action value network。使用了DQN中的将目标网络和训练网络分离的思想,并且采用soft更新的方式,能够更有效维护训练中的稳定性。soft更新方式如下
θ − ← τ θ + ( 1 − τ ) θ − \theta^- \leftarrow \tau \theta + (1-\tau)\theta^- θτθ+(1τ)θ
其中, θ − \theta^- θ表示目标网络参数, θ \theta θ表示训练网络参数, τ ≪ 1 \tau \ll 1 τ1 τ \tau τ是软更新参数。

1.5 Maintain Exploration

​ 确定性的策略是不具有探索性的,为了保持策略的探索性,我们可以在策略网络的输出中增加高斯噪声,让输出的动作值有些许偏差来增加网络的探索性。用数学的方式来表示即是
μ ′ ( s t ) = μ θ ( s t ) + N \mu^\prime(s_t) = \mu_\theta(s_t) + \mathcal{N} μ(st)=μθ(st)+N
其中 μ ′ \mu^\prime μ表示探索性的策略, N \mathcal{N} N表示高斯噪声。

1.6 Other Techniques

​ DDPG还集成了一些别的算法的常用技巧,比如Replay Buffer来产生independent and identically distribution的样本,使用了Batch Normalization来预处理数据。

1.7 Pesudocode

伪代码如下

Image

2. 代码实践

我们采用gym中的Pendulum-v1作为本次实验的环境,Pendulum-v1是典型的确定性连续动作空间环境,整体的代码如下

import torch
import torch.nn as nn
import torch.nn.functional as F
import gym
import random
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import collections

# Policy Network
class PolicyNet(nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)
        self.action_bound = action_bound
    
    def forward(self, observation):
        x = F.relu(self.fc1(observation))
        x = F.tanh(self.fc2(x))
        return x * self.action_bound

# Q Value Network
class QValueNet(nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(QValueNet, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_out = nn.Linear(hidden_dim, 1)
    
    def forward(self, x, a):
        cat = torch.cat([x, a], dim=1)    # 拼接状态和动作
        x = F.relu(self.fc1(cat))
        x = F.relu(self.fc2(x))
        return self.fc_out(x)

# Deep Deterministic Policy Gradient
class DDPG():
    def __init__(self, state_dim, hidden_dim, action_dim, 
                action_bound, actor_lr, critic_lr, 
                sigma, tau, gamma, device):
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
        self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)
        self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
        self.target_critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)

        # initialize target actor network with same parameters
        self.target_actor.load_state_dict(self.actor.state_dict())
        # initialize target critic network with same parameters
        self.target_critic.load_state_dict(self.critic.state_dict())

        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma
        self.sigma = sigma  # 高斯噪声的标准差,均值直接设置为0
        self.action_dim = action_dim
        self.device = device
        self.tau = tau
    
    def take_action(self, state):
        state = torch.tensor(np.array([state]), dtype=torch.float).to(self.device)
        action = self.actor(state).item()
        # add noise to increase exploratory
        action = action + self.sigma * np.random.randn(self.action_dim)
        return action
    
    def soft_update(self, net, target_net):
        # implement soft update rule
        for param_target, param in zip(target_net.parameters(), net.parameters()):
            param_target.data.copy_(param_target.data * (1.0-self.tau) + param.data * self.tau)
    
    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device)
        actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1,1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device)

        next_q_values = self.target_critic(next_states, self.target_actor(next_states))
        td_targets = rewards + self.gamma * next_q_values * (1-dones)
        critic_loss = torch.mean(F.mse_loss(self.critic(states, actions), td_targets))

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        actor_loss = torch.mean( - self.critic(states, self.actor(states)))
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # soft update actor net and critic net
        self.soft_update(self.actor, self.target_actor)
        self.soft_update(self.critic, self.target_critic)
    

class ReplayBuffer():
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)
    
    def add(self, s, a, r, s_, d):
        self.buffer.append((s,a,r,s_,d))
    
    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*transitions)
        return np.array(states), actions, np.array(rewards), np.array(next_states), dones

    def size(self):
        return len(self.buffer)


def train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size, render, seed_number):
    return_list = []
    for i in range(10):
        with tqdm(total=int(num_episodes/10), desc='Iteration %d'%(i+1)) as pbar:
            for i_episode in range(int(num_episodes/10)):
                observation, _ = env.reset(seed=seed_number)
                done = False
                episode_return = 0

                while not done:
                    if render:
                        env.render()
                    action = agent.take_action(observation)
                    observation_, reward, terminated, truncated, _ = env.step(action)
                    done = terminated or truncated
                    replay_buffer.add(observation, action, reward, observation_, done)
                    # swap states
                    observation = observation_
                    episode_return += reward
                    if replay_buffer.size() > minimal_size:
                        b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
                        transition_dict = {
                            'states': b_s,
                            'actions': b_a,
                            'rewards': b_r,
                            'next_states': b_ns,
                            'dones': b_d
                        }
                        agent.update(transition_dict)
                return_list.append(episode_return)
                if(i_episode+1) % 10 == 0:
                    pbar.set_postfix({
                        'episode': '%d'%(num_episodes/10 * i + i_episode + 1),
                        'return': "%.3f"%(np.mean(return_list[-10:]))
                    })
                pbar.update(1)
    env.close()
    return return_list

def moving_average(a, window_size):
    cumulative_sum = np.cumsum(np.insert(a, 0, 0)) 
    middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
    r = np.arange(1, window_size-1, 2)
    begin = np.cumsum(a[:window_size-1])[::2] / r
    end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
    return np.concatenate((begin, middle, end))

def plot_curve(return_list, mv_return, algorithm_name, env_name):
    episodes_list = list(range(len(return_list)))
    plt.plot(episodes_list, return_list, c='gray', alpha=0.6)
    plt.plot(episodes_list, mv_return)
    plt.xlabel('Episodes')
    plt.ylabel('Returns')
    plt.title('{} on {}'.format(algorithm_name, env_name))
    plt.show()



if __name__ == "__main__":
    # reproducible
    seed_number = 0
    random.seed(seed_number)
    np.random.seed(seed_number)
    torch.manual_seed(seed_number)

    num_episodes = 250     # episodes length
    hidden_dim = 128        # hidden layers dimension
    gamma = 0.98            # discounted rate
    actor_lr = 1e-3         # lr of actor
    critic_lr = 1e-3        # lr of critic
    tau = 0.005             # soft update parameter
    sigma = 0.01            # std variance of guassian noise
    buffer_size = 10000
    minimal_size = 1000
    batch_size = 64

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    env_name = 'Pendulum-v1'

    render = False
    if render:
        env = gym.make(id=env_name, render_mode='human')
    else:
        env = gym.make(id=env_name)
                    
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]  
    action_bound = env.action_space.high[0]


    replay_buffer = ReplayBuffer(buffer_size)        
    agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device)
    return_list = train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size, render, seed_number)

    mv_return = moving_average(return_list, 9)
    plot_curve(return_list, mv_return, 'DDPG', env_name)

DDPG训练的回报曲线如图所示

Image

Reference

Tutorial: Hands on RL

Paper: Continuous control with deep reinforcement learning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/883856.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型PEFT技术原理(三):Adapter Tuning及其变体

随着预训练模型的参数越来越大,尤其是175B参数大小的GPT3发布以来,让很多中小公司和个人研究员对于大模型的全量微调望而却步,近年来研究者们提出了各种各样的参数高效迁移学习方法(Parameter-efficient Transfer Learning&#x…

java17新特性+ZGC

ZGC垃圾收集 11引入的追求低延迟的垃圾回收器 1.ZGC的内存布局 1.1 region 和G1一样,也是基于Region的堆内存布局。但是ZGC的Region具有动态性:动态创建、动态销毁、动态数据容量。 1.2 垃圾回收机制 相较于CMS,ZGC只有6个阶段&#xff1…

同步、异步、协程

目录 同步异步https 异步请求: 协程1.为什么会要协程?2.异步的运行流程是什么3.协程的原语操作4.协程的定义?5.调度器的定义?6.调度的策略?7. api封装, hook8.多核的模式?9.协程的性能?10.要有哪些案例?nty_servernty_ mysql_client.cnty_ mysql oper.cnty_ …

Python项目实战:基于napari的3D可视化(点云+slice)

文章目录 一、napari 简介二、napari 安装与更新三、napari【巨巨巨大的一个BUG】四、napari 使用指南4.1、菜单栏(File View Plugins Window Help)4.2、Window:layer list(参数详解)4.3、Window:layer…

city walk结合VR全景,打造新时代下的智慧城市

近期爆火的city walk是什么梗?它其实是近年来备受追捧的城市漫步方式,一种全新的城市探索方式,与传统的旅游观光不同,城市漫步更注重与城市的亲密接触,一步步地感受城市的脉动。其实也是一种自由、休闲的方式&#xff…

vscode搭建java开发环境

一、配置extensions环境变量VSCODE_EXTENSIONS, 该环境变量路径下的存放安装组件: 二、setting配置文件 {"java.jdt.ls.java.home": "e:\\software\\jdk\\jdk17",// java运行环境"java.configuration.runtimes": [{"…

分类预测 | MATLAB实现DRN深度残差网络多输入分类预测

分类预测 | MATLAB实现DRN深度残差网络多输入分类预测 目录 分类预测 | MATLAB实现DRN深度残差网络多输入分类预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.分类预测 | MATLAB实现DRN深度残差网络多输入分类预测 2.代码说明:MATLAB实现DRN深度残差网络…

信捷 XDH Ethercat A_GearIn指令与轴配置

在前面的文章中描述了A_FOLLOW指令,有时不能满足要求,需要更高级的指令A_GearIn指令。 下面的例子A_GearIn指令和CNT_AB指令 实现手轮动马达动,手轮停马达停,手轮转的快马达也转得快。(手轮输出接到PLC的X0和X1点&am…

【内测】百度AI搜索体验

收到百度搜索AI体验邀请,简单测试了一下,目前支持文案创作,AI绘画等。 文案创作功能还行,绘画功能效果比较差。

【数据库】P4 过滤数据 WHERE

过滤数据 WHERE 简介WHERE 子句操作符检测单个值案例范围值检查 BETWEEN AND空值检查 NULL 简介 数据库表一般包含大量的数据,很少需要检索表中的所有行。我们只检索所需数据需要指定搜索条件(search criteria),搜索条件也称为过滤条件(filter conditio…

【每日一题】617. 合并二叉树

【每日一题】617. 合并二叉树 617. 合并二叉树题目描述解题思路 617. 合并二叉树 题目描述 给你两棵二叉树: root1 和 root2 。 想象一下,当你将其中一棵覆盖到另一棵之上时,两棵树上的一些节点将会重叠(而另一些不会&#xff…

SpringBoot携带Jre绿色部署项目

文章目录 SpringBoot携带Jre绿色部署运行项目1. 实现步骤2. 自测项目文件目录及bat文件内容,截图如下:2-1 项目文件夹列表:2-2. bat内容 3. 扩展: 1.6-1.8版本的jdk下载 SpringBoot携带Jre绿色部署运行项目 说明: 实…

【数据结构与算法】十大经典排序算法-归并排序

🌟个人博客:www.hellocode.top 🏰Java知识导航:Java-Navigate 🔥CSDN:HelloCode. 🌞知乎:HelloCode 🌴掘金:HelloCode ⚡如有问题,欢迎指正&#…

基本变量与引用变量的区别

基本数据类型创建的变量,称为基本变量,该变量空间中直接存放的是其所对应的值; 而引用数据类型创建的变量,一般称为对象的引用,其空间中存储的是对象所在空间的地址。 public static void func() { int a 10; int b …

vue3 如果切换角色后权限不同 怎么清空之前添加动态路由。

项目中切换角色后发现会保留前面一个角色的权限,方法一是到login页面,权限重新reload,不过这样确实会影响体验,如果不采用此方案的话,你可以看下我从发现问题到解决问题的思路: 1、首先要找到一个初始状态…

光耦继电器:实现电气隔离的卓越选择

光耦继电器是一种常用的电子元件,用于实现电气隔离和信号传输。在工业控制、自动化系统和电力电子等领域,光耦继电器具有独特的特点和优势。本文将从可靠性、隔离性、响应速度和适应性等方面对光耦继电器的特点进行概述。 光耦继电器是一种典型的固态继电…

24、springboot的自动配置01--类条件注解@ConditionalOnClass、bean条件注解@ConditionalOnBean

springboot的自动配置 ★ 自动配置 Spring Boot的自动配置通常可根据依赖库自动触发——当Spring Boot检测到项目中包含某些框架的JAR包时,Spring Boot就会触发自动配置。其实通过EnableAutoConfiguration注解来启动▲ 其实你用到SpringBootApplication&#xff0…

vmware添加额外网卡

为vmware虚拟机添加额外网卡 vmware 配置管理界面配置系统内配置查看系统中的网卡状态启用网卡重启网络修改IP地址 vmware 配置管理界面配置 关闭运行的的系统。 编辑虚拟机设置—》添加–》选择网络适配器 选择网络适配器的模式 系统内配置 查看系统中的网卡状态 第一…

使用element UI 的el-upload上传图片并携带参数的用法

直接看代码&#xff1a;前端实现 <div class"upload"><el-uploadclass"upload-demo"name"upload_name":data"{user_name:user_name}"action"http://localhost:8000/api/deal_pest_Image":show-file-list"fal…