Python神经网络学习(七)--强化学习--使用神经网络

news2024/9/23 3:25:19

前言

前面说到了强化学习,但是仅仅是使用了一个表格,这也是属于强化学习的范畴了,毕竟强化学习属于在试错中学习的。

但是现在有一些问题,如果这个表格非常大呢?悬崖徒步仅仅是一个长12宽4,每个位置4个动作的表格而已,如果游戏是英雄联盟,那么多的位置,每个位置那么多的可能动作,画出一个表格简直是不可想象的。

但其实,如果把这个表格看作一个数学函数,他的输入是坐标,输出是一个动作(或者每个动作对应的价值):

那也就是说,只要我们有一个坐标,得到一个动作,中间什么过程是可以不用管的,还记得这篇文章中说过:神经元(函数)+神经元(函数) = 神经网络(人工神经网络),那么,中间这一块也就可以使用神经网络代替,这也就是深度强化学习。

论文(Playing Atari with Deep Reinforcement Learning)地址:https://arxiv.org/abs/1312.5602

 设置环境

注意:今天的环境代码我修改过了,跟上一篇的不一样,所以大家还是要先读一下环境代码。

本次环境代码中添加了对于棋盘大小的设置,修复了一些bug。

# -*- coding: utf-8 -*-
"""
作者:CSDN,chuckiezhu
作者地址:https://blog.csdn.net/qq_38431572
本文可用作学习使用,交流代码时需要附带本出处声明
"""

import random
import numpy as np

from gym import spaces

"""
nrows
     0  1  2  3  4  5  6  7  8  9  10  11  ncols
   ---------------------------------------
0  |  |  |  |  |  |  |  |  |  |  |   |   |
   ---------------------------------------
1  |  |  |  |  |  |  |  |  |  |  |   |   |
   ---------------------------------------
2  |  |  |  |  |  |  |  |  |  |  |   |   |
   ---------------------------------------
3   * |       cliff                  | ^ |

  *: start point
  cliff: cliff
  ^: goal
"""

class CustomCliffWalking(object):
    def __init__(self, stepReward: int=-1, cliffReward: int=-10, goalReward: int=10, col=12, row=4) -> None:
        self.sr = stepReward
        self.cr = cliffReward
        self.gr = goalReward
        self.col = col
        self.row = row

        self.action_space = spaces.Discrete(4)  # 上下左右
        self.reward_range = (cliffReward, goalReward)

        self.pos = np.array([row-1, 0], dtype=np.int8)  # agent 在3,0处出生,掉到悬崖内就会死亡,触发done和cliffReward

        self.die_pos = []
        for c in range(1, self.col-1):
            self.die_pos.append([self.row-1, c])
        print("die pos: ", self.die_pos)
        print("goal pos: ", [[self.row-1, self.col-1]])

        self.reset()
    
    def reset(self, random_reset=False):
        """
        初始化agent的位置
        random: 是否随机出生, 如果设置random为True, 则出生点会随机产生
        """
        x, y = self.row-1, 0
        if random_reset:
            y = random.randint(0, self.col-1)
            if y == 0:
                x = random.randint(0, self.row-1)
            else:  # 除了正常坐标之外,还有一个不正常坐标:(3, 0)
                x = random.randint(0, self.row-2)
            # 严格来讲,cliff和goal不算在坐标体系内
        # agent 在3,0处出生,掉到悬崖内就会死亡,触发done和cliffReward
        self.pos = np.array([x, y], dtype=np.int8)
        # print("reset at:", self.pos)
    
    def step(self, action: int) -> list[list, int, bool, bool, dict]:
        """
        执行一个动作
        action:
            0: 上
            1: 下
            2: 左
            3: 右
        """

        move = [
            np.array([-1, 0], dtype=np.int8), # 向上,就是x-1, y不动,
            np.array([ 1, 0], dtype=np.int8), # 向下,就是x+1, y不动,
            np.array([0, -1], dtype=np.int8), # 向左,就是y-1, x不动,
            np.array([0,  1], dtype=np.int8), # 向右,就是y+1, x不动,
        ]
        new_pos = self.pos + move[action]
        # 上左不能小于0
        new_pos[new_pos < 0] = 0  # 超界的处理,比如0, 0 处向上或者向右走,处理完还是0,0
        # 上右不能超界
        if new_pos[0] > self.row-1:
            new_pos[0] = self.row-1  # 超界处理
        if new_pos[1] > self.col-1:
            new_pos[1] = self.col-1

        reward = self.sr  # 每走一步的奖励
        die = False
        win = False
        info = {
            "reachGoal": False,
            "fallCliff": False,
        }
        
        if self.__is_pos_die(new_pos.tolist()):
            die = True
            info["fallCliff"] = True
            reward = self.cr
        elif self.__is_pos_win(new_pos.tolist()):
            win = True
            info["reachGoal"] = True
            reward = self.gr

        self.pos = new_pos  # 更新坐标
        return new_pos, reward, die, win, info
    
    def __is_pos_die(self, pos: list[int, int]) -> bool:
        """判断自己的这个状态是不是已经结束了"""
        return pos in self.die_pos

    def __is_pos_win(self, pos: list[int, int]) -> bool:
        """判断自己的这个状态是不是已经结束了"""
        return pos in [
            [self.row-1, self.col-1],
        ]

至于讲解这个环境,我觉得这个注释还是比较清楚的,如果有不明白的,请评论留言告知我。

制作网络

首先,我们先把自己代入表格,如果我们站到某个坐标,那么我们应该知道四个方向上的奖励,所以,网络可以有两种方式;

方式一、

网络输入是坐标和方向,输出是对应的奖励。

方式二、

网络输入是坐标,输出是四个方向对应的奖励。

这里我要来一句场外推理:方式一真的很麻烦,并且选择动作的时候,有多少个动作需要经过多少次网络。所以方式二是比较好的选择。


class Qac(nn.Module):
    def __init__(self, in_shape, out_shape) -> None:
        super(Qac, self).__init__()
        self.in_shape = in_shape  # 就是 智能体 现在的坐标
        self.action_space = out_shape  # 上0下1左2右3
        self.dense1 = nn.Linear(self.in_shape, self.action_space)
        # 输出就是每个动作的价值

        self.lrelu = nn.LeakyReLU()  # 换用tanh
        self.softmax = nn.Softmax(-1)
    
    def forward(self, x) -> torch.Tensor:
        x = self.dense1(x)
        return x

    def sample_action(self, action_value: torch.Tensor, epsilon: float):
        """从产生的动作概率中采样一个动作,利用epsilon贪心"""
        if random.random() < epsilon:
            # 随机选择
            action = random.randint(0, self.action_space-1)
            action = torch.tensor(action)
        else:
            action = torch.argmax(action_value)
        
        return action
    
    def load_model(self, modelpath):
        """加载模型"""
        tmp = torch.load(modelpath)
        self.load_state_dict(tmp["model"])
    
    def save_model(self, modelpath):
        """保存模型"""
        tmp = {
            "model": self.state_dict(),
        }
        torch.save(tmp, modelpath)

细心的人可能发现了,这个网络只有一层,非常简单,好像没有所谓的“特征提取”就直接到输出层了。这里有一个小技巧,就是我手动把坐标转成了onehot向量,可以认为是手动提取了特征。

def num_to_onehot(pos: torch.Tensor) -> torch.Tensor:
    """把坐标转成one_hot向量"""
    n = int((pos[0] * 12 + pos[1]).item())
    return nn.functional.one_hot(torch.tensor(n), num_classes=48)

如果大家使用两层神经网络,直接输入坐标,中间层是48,然后是一个输出层,也可以, 但是我试了,训练很慢,效果不好。不如这样直接手动编码了。

训练

整个训练的代码我直接贴在这里了:

# -*- coding: utf-8 -*-
"""
利用DQN实现
"""
"""
作者:CSDN,chuckiezhu
作者地址:https://blog.csdn.net/qq_38431572
本文可用作学习使用,交流代码时需要附带本出处声明
"""
import os
import random
import torch
import numpy as np
from torch import nn

from matplotlib import pyplot as plt

from cliff_walking_env import CustomCliffWalking


nepisodes = 10000  # total 1w episodes
epsilon = 1.0  # epsilon greedy policy
epsilon_min = 0.05
epsilon_decay = 0.9975

gamma = 0.9  # discount factor
lr = 0.001
random_reset = False

seed = 42

normalization = torch.tensor([3, 11], dtype=torch.float)

sr = -1
cr = -10
gr = 10

class Qac(nn.Module):
    def __init__(self, in_shape, out_shape) -> None:
        super(Qac, self).__init__()
        self.in_shape = in_shape  # 就是智能体现在的坐标
        self.action_space = out_shape  # 上0下1左2右3
        self.dense1 = nn.Linear(self.in_shape, self.action_space)

        # 输出就是每个动作的价值

        self.lrelu = nn.LeakyReLU()  # 换用tanh
        self.softmax = nn.Softmax(-1)
    
    def forward(self, x) -> torch.Tensor:
        x = self.dense1(x)
        return x

    def sample_action(self, action_value: torch.Tensor, epsilon: float):
        """从产生的动作概率中采样一个动作,利用epsilon贪心"""
        if random.random() < epsilon:
            # 随机选择
            action = random.randint(0, self.action_space-1)
            action = torch.tensor(action)
        else:
            action = torch.argmax(action_value)
        
        return action
    
    def load_model(self, modelpath):
        """加载模型"""
        tmp = torch.load(modelpath)
        self.load_state_dict(tmp["model"])
    
    def save_model(self, modelpath):
        """保存模型"""
        tmp = {
            "model": self.state_dict(),
        }
        torch.save(tmp, modelpath)


def num_to_onehot(pos: torch.Tensor) -> torch.Tensor:
    """把坐标转成one_hot向量"""
    n = int((pos[0] * 12 + pos[1]).item())
    return nn.functional.one_hot(torch.tensor(n), num_classes=48)

    
def main():
    global epsilon
    random.seed(seed)
    torch.manual_seed(seed=seed)
    plt.ion()

    os.makedirs("./out/ff_DQN/")
    # cw = gym.make("CliffWalking-v0", render_mode="human")
    cw = CustomCliffWalking(stepReward=sr, goalReward=gr, cliffReward=cr)

    # 专程onehot了
    Q = Qac(in_shape=48, out_shape=cw.action_space.n)

    optimizer = torch.optim.Adam(Q.parameters(), lr=lr)
    loss_fn = torch.nn.MSELoss()

    win_1000 = []  # 记录最近一千场赢的几率
    total_win = 0
    for i in range(1, nepisodes+1):
        cw.reset(random_reset=random_reset)  # 重置环境
        steps = 0
        while True:
            steps += 1
            state_now = torch.tensor(cw.pos, dtype=torch.float)
            state_now = num_to_onehot(state_now).unsqueeze_(0).to(torch.float)
            action_values = Q(state_now)
            action_values = action_values.squeeze()
            action_now = Q.sample_action(action_value=action_values, epsilon=epsilon)

            action_now_value = action_values[action_now]  # 这个是采取这个动作的预测奖励

            state_next, reward_now, terminated, truncated, info = cw.step(action=action_now.item())   # 执行动作
            state_next = num_to_onehot(state_next).unsqueeze_(0).to(torch.float)
            with torch.no_grad():
                next_values = Q(state_next)
                next_values = next_values.squeeze()
                # 得到下一个的动作,(同一个策略下,因为这是onpolicy的sarsa
                action_next = Q.sample_action(action_value=action_values, epsilon=epsilon)
                action_next_value = next_values[action_next]  # 计算下一个动作的预期价值

            
            # 计算  instantR + gamma * value_next,这个是实际上这个动作带来的预期收益
            discounted_reward = reward_now + gamma * action_next_value * (1 - terminated) * (1 - truncated)

            # 计算误差
            loss = loss_fn(action_now_value, discounted_reward)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if terminated or truncated:
                if terminated:
                    win_1000.append(0)
                if truncated:
                    win_1000.append(1)
                    total_win += 1
                break

            epsilon = epsilon * epsilon_decay
            epsilon = max(epsilon, epsilon_min)  # 衰减学习旅
        win_1000 = win_1000[-1000:]
        win_rate = sum(win_1000)/1000.0
        print("{}/{}, 当前探索率: {}, 是否成功: {}, 千场胜率:{}.".format(i, nepisodes, epsilon, truncated, win_rate), flush=True)
        if i % 10000 == 0:
            Q.save_model("./out/ff_DQN/Qac_{}_{}_{}_{}.pth".format(i, gr, cr, win_rate))
    print("total win: ", total_win)

    # 收尾测试看看能不能通关
    path = np.zeros((4, 12), dtype=np.float64)
    cw.reset(random_reset=False)

    steps = 0
    while steps <= 48:  # 走,48步走不到头就不会走到了
        steps += 1
        state_now = torch.tensor(cw.pos, dtype=torch.float)
        state_now = num_to_onehot(state_now).unsqueeze_(0).to(torch.float)
        action_values = Q(state_now).squeeze()
        # 贪心算法选择动作
        action_now = Q.sample_action(action_values, 0)
        print(cw.pos[0], cw.pos[1], action_now)
        new_pos, _, die, win, _ = cw.step(action=action_now)
        if win:
            print("[+] you win!")
            break
        if die:
            print("[+] you lose!")
            break
        x = new_pos[0]
        y = new_pos[1]
        if x >= 0 and x <= 3 and y >= 0 and y <= 11:
            path[x, y] = 1.0
    plt.imshow(path)
    plt.colorbar()
    plt.savefig("./out/ff_DQN/path_sarsa_"+str(sr)+"_"+str(gr)+"_"+str(cr)+".png")

if __name__ == "__main__":
    main()

上面的代码我测试没问题,如果不修改直接使用是完全可以的,目录结构是这样的:

那两个文件夹都是自动生成的,不需要手动建立。 

网络结构分析

这是上面代码的网络结构和更新流程。注意:实线代表有梯度,虚线代表无梯度。

每次由环境产生一个状态,先转成一个one_hot向量,作为网络的输入,得到四个动作分别价值多少。然后采样到的动作得到当前的Q(s, a)值,也就是action_value。

另一方面,采样得到的动作送入环境,环境给出下一个状态和立即奖励。下一个状态送入网络(没有梯度的计算),同样得到四个动作的价值。由于代码使用的是SARSA算法,所以需要按照同样的策略采样一个动作,同时得到动作的价值。也就是next_action_value。

这个时候,就可以根据环境的立即奖励reward_now和下一个状态的动作的价值next_action_value得到一个ground truth,而action_value作为网络的预测值,这两个可以用于计算损失。

损失的反向传播就是沿着实现传递到顶。实现网络的更新。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/708107.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

制造业网络安全最佳实践

网络安全已成为生产部门的一个重要关注点&#xff0c;制造业网络安全现在是高管层的主要考虑因素。 在工业 4.0和物联网 ( IoT )出现的推动下&#xff0c;当今的互连工业系统提供了多种优势&#xff0c;但也使组织面临新的风险和漏洞。 让我们探讨一些制造业网络安全最佳实践…

SOLIDWORKS软件有哪些版本?

SOLIDWORKS软件是基于Windows开发的三维 CAD系统&#xff0c;技术创新符合CAD技术的发展潮流和趋势&#xff0c;SOLIDWORKS每年都有数十乃至数百项的技术创新&#xff0c;公司也获得了很多荣誉。该系统在1995-1999年获得全球微机平台CAD系统评比NO1&#xff1b;从1995年至今&am…

Quiz 15: Object-Oriented Programming | Python for Everybody 配套练习_解题记录

文章目录 Python for Everybody课程简介 Quiz 15: Object-Oriented Programming单选题&#xff08;1-11&#xff09;Multiple instances Python for Everybody 课程简介 Python for Everybody 零基础程序设计&#xff08;Python 入门&#xff09; This course aims to teach e…

qemu 源码编译 qemu-system-aarch64 的方法

前言 最近调试 RT-Thread bsp qemu-virt64-aarch64 时&#xff0c;遇到无法使用网络设备问题&#xff0c;最后确认是 当前 ubuntu 20.04 系统 使用 apt install 安装的 qemu qemu-system-aarch64 版本太低。 RT-Thread qemu-virt64-aarch64 里面的网络设备&#xff0c;需要较新…

回顾分类决策树相关知识并利用python实现

大家好&#xff0c;我是带我去滑雪&#xff01; 决策树&#xff08;Decision Tree&#xff09;是一种基本的分类与回归方法&#xff0c;呈树形结构&#xff0c;在分类问题中&#xff0c;表示预计特征对实例进行分类的过程。它可以认为是if-then规则的集合&#xff0c;也可以认为…

多表-DDL以及DQL

多表DDL 个表之间也可能存在关系 存在在一对多和多对多和一对多的关系 一对多&#xff08;外键&#xff09; 在子表建一哥字段&#xff08;列&#xff09;和对应父表关联 父表是一&#xff0c;对应子表的多&#xff08;一个部门对应多个员工&#xff0c;但一个员工只能归属一…

结构体和数据结构--从基本数据类型到抽象数据类型、结构体的定义

在冯-诺依曼体系结构中&#xff0c;程序代码和数据都是以二进制存储的&#xff0c;因此对计算机系统和硬件本身而言&#xff0c;数据类型的概念其实是不存在的。 在高级语言中&#xff0c;为了有效的组织数据&#xff0c;规范数据的使用&#xff0c;提高程序的可读性&#xff0…

使用Streamlit和Matplotlib创建交互式折线图

大家好&#xff0c;本文将介绍使用Streamlit和Matplotlib创建一个用户友好的数据可视化Web应用程序。该应用程序允许上传CSV文件&#xff0c;并为任何选定列生成折线图。 构建Streamlit应用程序 在本文中&#xff0c;我们将指导完成创建此应用程序的步骤。无论你是专家还是刚刚…

three.js利用点材质打造星空

最终效果如图&#xff1a; 一、THREE.BufferGeometry介绍 这里只是做个简单的介绍&#xff0c;详细的介绍大家可以看看THREE.BufferGeometry及其属性介绍 THREE.BufferGeometry是Three.js中的一个重要的类&#xff0c;用于管理和操作几何图形数据。它是对THREE.Geometry的一…

leetcode 226. 翻转二叉树

2023.7.1 这题依旧可以用层序遍历的思路来做。 在层序遍历的代码上将所有节点的左右节点进行互换即可实现二叉树的反转。 下面上代码&#xff1a; class Solution { public:TreeNode* invertTree(TreeNode* root) {queue<TreeNode*> que;if(root nullptr) return{};que…

gradio库中的Dropdown模块:创建交互式下拉菜单

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

2020年全国硕士研究生入学统一考试管理类专业学位联考逻辑试题——纯享题目版

&#x1f3e0;个人主页&#xff1a;fo安方的博客✨ &#x1f482;个人简历&#xff1a;大家好&#xff0c;我是fo安方&#xff0c;考取过HCIE Cloud Computing、CCIE Security、CISP等证书。&#x1f433; &#x1f495;兴趣爱好&#xff1a;b站天天刷&#xff0c;题目常常看&a…

编译原理期末复习简记(更新中~)

注意&#xff1a;该复习简记只是针对我校期末该课程复习纲要进行的&#xff0c;仅供参考 第一章 引论 编译程序是什么&#xff1f; 编译程序是一个涉及分析和综合的复杂系统 编译程序组成 编译程序通常由以下内容组成 词法分析器 输入 组成源程序的字符串输出 记号/单词序列语法…

Jenkins+Gitlab+Springboot项目部署Jar和image两种方式

Springboot环境准备 利用spring官网快速创建springboot项目。 添加一个controller package com.example.demo;import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;RestController public class…

新华三眼中的AI天路

ChatGPT的火爆&#xff0c;在全球范围内掀起了新一轮的AI风暴。如今&#xff0c;各行各业都在讨论AI&#xff0c;各个国家都在密集进行新一轮的AI基础设施建设与技术投入。 但眼前的盛景并非突然到来&#xff0c;就拿这一轮大模型热潮来说&#xff0c;谷歌早在2018年底就发布了…

协议速攻 IIC协议详解

介绍 IIC是一种 同步 半双工 串行 总线 同步 指的是同一根时钟线(SCL) 半双工 可以进行双向通信&#xff0c;但是收发不能同时进行&#xff0c;发的时候禁止接收&#xff0c;接的时候禁止发送 串行 数据是一位一位发送的 总线 两根线(SCL SDA)可以接多个IIC类型器件&#…

《统计学习方法》——逻辑斯蒂回归和最大熵模型

参考资料&#xff1a; 《统计学习方法》李航通俗理解信息熵 - 知乎 (zhihu.com)拉格朗日函数为什么要先最大化&#xff1f; - 知乎 (zhihu.com) 1 逻辑斯蒂回归 1.1 逻辑斯蒂回归 输入 x ( x ( 1 ) , x ( 2 ) , ⋯ , x ( n ) , 1 ) T x(x^{(1)},x^{(2)},\cdots,x^{(n)},1…

【动态规划算法练习】day11

文章目录 一、1312. 让字符串成为回文串的最少插入次数1.题目简介2.解题思路3.代码4.运行结果 二、1143. 最长公共子序列1.题目简介2.解题思路3.代码4.运行结果 三、1035. 不相交的线1.题目简介2.解题思路3.代码4.运行结果 总结 一、1312. 让字符串成为回文串的最少插入次数 1…

DevOps系列文章之 设计一个简单的DevOps系统

前置条件 gitlab gitlab-runner k8s docker 1. gitlab创建群组 创建群组的好处是,对项目进行分组,群组内的资源可以共享,这里创建了一个tibos的群组 2. 在群组创建一个项目 这里创建一个空白项目,项目名为Gourd.Test,将项目克隆到本地,然后在该目录下创建一个.net core3.1的w…

Spring Cloud Alibaba Seata源码分析

目录 一、Seata源码分析 1、Seata源码入口 1.1、2.0.0.RELEASE 1.2、2.2.6.RELEASE 2、Seata源码分析-2PC核心源码 3、Seata源码分析-数据源代理 3.1、数据源代理DataSourceProxy 4、Seata源码分析- Seata服务端&#xff08;TC&#xff09;源码 一、Seata源码分析 Sea…