如何使用PyTorch 在 OpenAI Gym 上的 CartPole-v0 任务上训练深度 Q 学习(DQN)智能体

news2024/9/23 11:24:33

强化学习(DQN)教程

本教程说明如何使用 PyTorch 在 OpenAI Gym 上的 CartPole-v0 任务上训练深度 Q 学习(DQN)智能体。

任务

智能体必须在两个动作之间做出决定-向左或向右移动推车-以便使与之相连的杆子保持直立。 您可以在 Gym 网站上找到具有各种算法和可视化效果的官方排行榜。

在这里插入图片描述

卡特波尔

当智能体观察环境的当前状态并选择一个动作时,环境会转换为到新状态,并且还会返回表示该动作后果的奖励。 在此任务中,每增加一个时间步长,奖励为 +1,并且如果杆子掉落得太远或手推车离中心的距离超过 2.4 个单位,则环境终止。 这意味着表现更好的方案将持续更长的时间,从而积累更大的回报。

对 CartPole 任务进行了设计,以使对智能体的输入是代表环境状态(位置,速度等)的 4 个实际值。 但是,神经网络可以完全通过查看场景来解决任务,因此我们将以推车为中心的一部分屏幕作为输入。 因此,我们的结果无法直接与官方排行榜上的结果进行比较-我们的任务更加艰巨。 不幸的是,这确实减慢了训练速度,因为我们必须渲染所有帧。

严格来说,我们将状态显示为当前屏幕补丁与前一个屏幕补丁之间的差异。 这将允许智能体从一张图像中考虑极点的速度。

首先,让我们导入所需的包。 首先,我们需要针对环境的 Gym(使用pip install Gym进行安装)。 我们还将使用 PyTorch 中的以下内容:

  • 神经网络(torch.nn
  • 优化(torch.optim
  • 自动微分(torch.autograd
  • 视觉任务的工具(torchvision-单独的包)。
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

env = gym.make('CartPole-v0').unwrapped

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

回放记忆

我们将使用经验回放记忆来训练我们的 DQN。 它存储智能体观察到的转换,使我们以后可以重用此数据。 通过从中随机采样,可以构建批量的转换相关。 已经表明,这极大地稳定和改善了 DQN 训练程序。

为此,我们将需要两个类:

  • Transition-表示我们环境中单个过渡的命名元组。 它本质上将(状态,动作)对映射到其(下一个状态,奖励)结果,该状态是屏幕差异图像,如下所述。
  • ReplayMemory-有界大小的循环缓冲区,用于保存最近观察到的转换。 它还实现了.sample()方法,用于选择随机的过渡批量进行训练。
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

现在,让我们定义我们的模型。 但是首先,让我们快速回顾一下 DQN 是什么。

DQN 算法

我们的环境是确定性的,因此为简单起见,此处介绍的所有方程式也都确定性地制定。 在强化学习文献中,它们还将包含对环境中随机转变的期望。

我们的目标是制定一种策略,尝试最大化折扣的累积奖励R[t[0]] = Σ γ^(t - t[0]) r[t], t = t[0] -> ∞,其中R[t[0]]也称为回报。 折扣γ应该是01之间的常数,以确保总和收敛。 这使得来自不确定的遥远未来的回报对我们的智能体而言不如可以对其充满信心的近期回报重要。

Q 学习的主要思想是,如果我们有一个函数Q*:State x Action => R,这可以告诉我们,如果我们在给定状态下采取行动,那么我们就可以轻松地制定出使我们的回报最大化的策略:

但是,我们对世界一无所知,因此无法访问Q*。 但是,由于神经网络是通用函数逼近器,因此我们可以简单地创建一个并将其训练为类似于Q*的函数。

对于我们的训练更新规则,我们将使用一个事实,即某些策略的每个Q函数都遵循贝尔曼方程:

等式两侧之间的差异称为时间差异误差delta

为了最小化此误差,我们将使用 Huber 损失。 当误差较小时,Huber 损失的作用类似于均方误差,而当误差较大时,则表现为平均绝对误差-当Q的估计值非常嘈杂时,这使它对异常值的鲁棒性更高。 我们通过从重播内存中采样的一批过渡B来计算:

Q 网络

我们的模型将是一个卷积神经网络,该卷积神经网络将吸收当前屏幕补丁和先前屏幕补丁之间的差异。 它有两个输出,分别代表Q(s, left)Q(s, right)(其中s是网络的输入)。 实际上,网络正在尝试预测在给定当前输入的情况下执行每个操作的预期收益

class DQN(nn.Module):

    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        linear_input_size = convw * convh * 32
        self.head = nn.Linear(linear_input_size, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

输入提取

以下代码是用于从环境中提取和处理渲染图像的工具。 它使用torchvision包,可轻松组成图像变换。 一旦运行单元,它将显示它提取的示例补丁。

resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),
                    T.ToTensor()])

def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART

def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3\. Transpose it into torch order (CHW).
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    # Cart is in the lower half, so strip off the top and bottom of the screen
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    # Strip off the edges, so that we have a square image centered on a cart
    screen = screen[:, :, slice_range]
    # Convert to float, rescale, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).to(device)

env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
           interpolation='none')
plt.title('Example extracted screen')
plt.show()

训练

超参数和工具

该单元实例化我们的模型及其优化器,并定义一些工具:

  • select_action-将根据 ε 贪婪策略选择一个动作。 简而言之,有时我们会使用模型来选择操作,有时我们会统一采样。 选择随机动作的可能性将从EPS_START开始,并朝EPS_END呈指数衰减。 EPS_DECAY控制衰减率。
  • plot_durations-绘制剧集持续时间以及最近 100 个剧集的平均值(官方评估中使用的度量)的助手。 该图将在包含主要训练循环的单元下面,并且将在每个剧集之后更新。
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

# Get screen size so that we can initialize layers correctly based on shape
# returned from AI gym. Typical dimensions at this point are close to 3x40x90
# which is the result of a clamped and down-scaled render buffer in get_screen()
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape

# Get number of actions from gym action space
n_actions = env.action_space.n

policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)

steps_done = 0

def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1\. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

episode_durations = []

def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())

训练循环

最后,是训练模型的代码。

在这里,您可以找到执行优化步骤的optimize_model函数。 它首先对一批进行采样,将所有张量连接为一个张量,计算Q(s[t], a[t])V(s[t+1])= max[a] Q(s[t+1], a),并将其合并为我们的损失。 根据定义,如果s为终端状态,则设置V(s) = 0。 我们还使用目标网络来计算V(s[t+1]),以提高稳定性。 目标网络的权重大部分时间保持冻结状态,但经常更新以策略网络的权重。 通常这是一组固定的步骤,但是为了简单起见,我们将使用剧集。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

在下面,您可以找到主要的训练循环。 首先,我们重置环境并初始化state张量。 然后,我们采样一个动作,执行它,观察下一个屏幕和奖励(总是 1),并一次优化我们的模型。 当剧集结束(我们的模型失败)时,我们重新开始循环。

下面,将num_episodes设置得较小。 您应该下载笔记本并运行更多的片段,例如 300 多个片段,才能显着改善持续时间。

num_episodes = 50
for i_episode in range(num_episodes):
    # Initialize the environment and state
    env.reset()
    last_screen = get_screen()
    current_screen = get_screen()
    state = current_screen - last_screen
    for t in count():
        # Select and perform an action
        action = select_action(state)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        # Observe new state
        last_screen = current_screen
        current_screen = get_screen()
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the target network)
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()

这是说明总体结果数据流的图。
在这里插入图片描述

可以随机选择或根据策略选择动作,从健身环境中获取下一步样本。 我们将结果记录在重播内存中,并在每次迭代时运行优化步骤。 优化会从重播内存中随机抽取一批来进行新策略的训练。 “较旧”的target_net也用于优化计算期望的 Q 值; 有时会对其进行更新以使其保持最新状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/658831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

存储笔记7 NAS

NAS  Describe NAS, its benefits and components  Discuss different NAS implementations  Describe NAS file-sharing protocols  Discuss NAS management options File Sharing Environment 文件系统&#xff1a;存储组织数据文件的结构化方式文件共享 网络存储…

python3GUI--图片浏览器By:PyQt5(附源码)

文章目录 一&#xff0e;前言二&#xff0e;展示1.主界面2.添加图片3.多级目录4.查看文件信息5.调整UI布局 三&#xff0e;源代码1.image_god_main_v.py2.image_god_GUI.py 四&#xff0e;总结 一&#xff0e;前言 本次使用PyQt5开发一款图片浏览器&#xff0c;本篇主要练习QD…

AIGC新时代,注意政策走向,产业方向,拥抱可信AI。需要了解基本理论,基础模型,前沿进展,产品应用,以及小小的项目复现

AIGC&#xff08;AI-Generated Content&#xff0c;AI生成内容&#xff09;是指基于生成对抗网络&#xff08;GAN&#xff09;、大型预训练模型等人工智能技术的方法&#xff0c;通过对已有数据进行学习和模式识别&#xff0c;以适当的泛化能力生成相关内容的技术。类似的概念还…

免费、不用部署SD:AI二维码制作教程

大家好&#xff0c;我是可夫小子&#xff0c;《小白玩转ChatGPT》专栏作者&#xff0c;关注AIGC、读书和自媒体。 最近&#xff0c;风格化的AI二维码&#xff0c;应该没少见吧。生成的原理大家大概也知道&#xff0c;主要通过stable diffusion和一些插件来完成&#xff0c;但对…

【Linux】HTTPS协议

目录 &#x1f37a;前言&#x1f37b;HTTPS协议原理&#x1f380;1、概念&#x1f381;2、加密和解密&#x1f382;3、常见加密方式&#x1f341;3.1、对称加密&#x1f342;3.2、非对称加密 &#x1f383;4、数据摘要和数据指纹&#x1f364;5、HTTPS工作原理&#x1f338;5.1…

学生速看!免费领取一台阿里云服务器全流程

阿里云学生服务器优惠活动&#xff1a;高效计划&#xff0c;可以免费领取一台阿里云服务器&#xff0c;如果你是一名高校学生&#xff0c;想搭建一个linux学习环境、git代码托管服务器&#xff0c;或者创建个人博客网站记录自己的学习成长历程&#xff0c;拥有一台云服务器是很…

零基础速成simulink代码生成——简单滤波器实现2

simulink setting 找到model settings solver求解器配置 Code Generation 代码生成配置 生成代码报告 添加stateflow注释 可以将变量保存在定义的文件(选) 实践 简单一阶滤波器

鼠标键盘实验

文章目录 USB参考资料USB设备STM32F407USB 硬件连接软件移植官方HIDSTM32F4USB通信库 USB参考资料 ①《STM32F4xx中文参考手册》-第30章 全速USB on-the-go(OTG_FS) ②光盘&#xff1a;STM32参考资料:STM32 USB 学习资料-CD00289278.pdf(UM1021) ③光盘&#xff1a;STM32参考资…

Android 14 新特性:语法性别 Grammatical Gender

背景 如同汉语里的他、她、它&#xff0c;英语里的 He、She、it&#xff0c;很多语言都存在依据性别、对象不同而造成的语法差异&#xff0c;甚至不仅限于名词&#xff0c;还涉及到形容词、动词等&#xff0c;复杂得多。 而这部分语言所涉及到的人群多达 30 亿之众&#xff0…

【树形DP+可重集排列】至至子的公司排队

好屌的题 F-至至子的公司排队_牛客小白月赛55 (nowcoder.com) 题意&#xff1a; 思路&#xff1a; 其实题目问的就是&#xff0c;森林的拓扑序有几种 那么我们先去考虑一棵树的拓扑序有几种 这个可以用树形DP来解决 设dp[u]为&#xff0c;以u为根的子树的拓扑序的种类数&…

【Java】项目中大批量数据查询导致OOM

文章目录 背景内存溢出的具体原因错误模拟问题复现解决办法流式查询和分页查询的使用场景查询数据的建议 背景 项目中有时候一次性将大批量数据都查出来到内存中导致内存占用过多很可能会导致内存溢出 内存溢出的具体原因 在JVM内存结构中分为以下几个模块 程序计数器虚拟机…

SSL协议,一文带你了解

SSL简介 SSL&#xff08;Secure Sockets Layer&#xff09;是一种安全协议&#xff0c;用于保护互联网上的数据传输安全。SSL协议最初由网景公司开发&#xff0c;现在已经被TLS&#xff08;Transport Layer Security&#xff09;协议所取代。SSL协议和TLS协议都是为了保护数据传…

一文带你弄懂【时间复杂度】

文章目录 算法时间复杂度时间复杂度计算常见的时间复杂度时间复杂度的差异 总结 算法 算法&#xff08;Algorithm&#xff09;是求解一个问题需要遵循的&#xff0c;被清楚指定的简单指令的集合。 一个算法的评价主要从时间复杂度和空间复杂度来考虑。而时间复杂度是一个函数…

Netty核心技术四--Netty概述

1. 原生NIO存在的问题 NIO 的类库和 API 繁杂&#xff0c;使用麻烦&#xff1a;需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。需要具备其他的额外技能&#xff1a;要熟悉 Java 多线程编程&#xff0c;因为NIO编程涉及到Reactor模式&#xff0c;…

从tomcat说起全面理解Java web开发原理

从tomcat说起全面理解Java web开发原理 简介&#xff1a;Java开发分为Java ME&#xff0c;Java SE&#xff0c;Java EE。回顾过去这些的开发工作基本上都是围绕着Java EE的&#xff0c;在开发经历中分别经历了Java EE开发框架从jsp servlet一路经历了ssh&#xff0c; ss…

存储笔记8 ipsan

Module Objectives IP SAN的组件 IP SAN的好处 描述SAN中的IP融合及其影响 描述的基本架构 –iSCSI –FCIP –FCoE 讨论IP SAN技术的市场驱动因素 列出IP SAN技术 列出iSCSI的组件和连接选项 描述iSCSI体系结构和拓扑结构 解释iSNS操作 描述FCIP的体系结构 IP SAN互联…

Springboot整合第三方登录

文章目录 Springboot整合第三方登录为什么采用第三方登录整合第三方登录创建应用导入依赖创建controller类 Springboot整合第三方登录 为什么采用第三方登录 ​ 采用第三方登录可以避免重新注册账号的繁琐&#xff0c;也不需要再为密码和昵称发愁&#xff0c;而第三方登录有一…

Linux命令——top相关之Load Average平均负载

Linux 平均负载 Load Average 详解_系统1f分钟负载_欧晨eli的博客-CSDN博客 一、什么是Load Average&#xff1f; 系统负载&#xff08;System Load&#xff09;是系统CPU繁忙程度的度量&#xff0c;即有多少进程在等待被CPU调度&#xff08;进程等待队列的长度&#xff09;。…

2-JVM运行流程

JVM 是 Java 运行的基础&#xff0c;也是实现一次编译到处执行的关键&#xff0c;那么 JVM 是如何执行的呢&#xff1f; 程序在执行之前先要把java源代码&#xff08;.java&#xff09;转换成字节码文件&#xff08;.class&#xff09;。JVM 首先需要通过一定的方式类加载器&a…

如何做好前端性能优化

前端推荐官网: http://luckycola.com.cn/ 前言: 前端性能优化一直是一个前端开发人员必须关注的经典话题,虽然现在随着技术的不断发展,网页容器(浏览器、webview)性能也越来越强大,但是网站应用的功能也不断丰富,体积不可避免的增加,当网络环境等因素不好时,仍然会存在白屏时…