强化深度学习中使用Dyna-Q算法确定机器人问题中不同规划的学习和策略实战(超详细 附源码)

news2025/1/28 1:16:00

需要源码请点赞关注收藏后评论区留下QQ并且私信~~~

一、模型、学习、规划简介

1:模型

Agent可以通过模型来预测环境并做出反应,这里所说的模型通常指模拟模型,即在给定一个状态和动作时,通过模型可以对下一状态和奖赏做出预测

模型通常可以分为分布模型和样本模型两种类型

分布模型:该模型可以生成所有可能的结果及其对应的概率分布

样本模型:该模型能够从所有可能的情况中产生一个确定的结果

从功能上讲,模型是用于模拟环境和产生模拟经验的。与样本模型相比,分布模型包含更多的信息,只是现实任务中难以获得所有的状态转移概率

2:学习

学习过程是从环境产生的真实经验中进行学习,根据经验的使用方法,学习过程可以分为直接强化学习和模型学习两种类型

直接强化学习:在真实环境中采集真实经验,根据真实经验直接更新值函数或策略,不受模型偏差的影响

模型学习:在真实环境中采集真实经验,根据真实经验来构建和改进模拟模型,提供模拟模型精度,使其更接近真实环境

3:规划

规划过程是基于模拟环境或经验模型,从模拟经验中更新值函数,实现改进策略的目的,学习和规划的核心都是通过回溯操作来评估值函数,不同之处在于:在规划过程中,Agent并没有与真实环境交互

规划通常可分为状态空间规划和方案空间规划,状态空间规划是在状态空间中寻找最优策略,值函数的计算都是基于状态的,通常该规划方法视为搜索方法,其基本思想如下

1:所有规划算法都以计算值函数作为策略改进的中间关键步骤

2:所有规划算法都可以通过基于模型产生的模拟经验来计算值函数

 二、Dyna-Q结构及其算法

Dyna-Q架构包含了在线规划Agent所需要的主要功能,该架构讲学习和规划有机地结合在一起,是有模型和无模型方法的融合,其数据来源包括基于真实环境采样的真实经验以及基于模拟模型采样的模拟经验,通过直接强化学习或间接强化学习来更新值函数或者策略

架构图如下

 三、Dyna-Q不同规划对学习步数的影响

机器人环境搭建以及背景可点击如下链接了解

机器人环境

此处比较不同规划步数对实验效果的影响,当机器人离开边界或者撞到障碍物则得到-10的奖赏,到达充电桩获得+1的奖赏,其他情况奖赏均为0,不同需要不同情节数,可视化结果如下

 

 代码如下



import gym
from gym import spaces
from gym.utils import seeding
from random import random, choice
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties


class Grid(object):
    def __init__(self, x:int = None,
                       y:int = None,
                       type:int = 0,
                       reward:float = 0.0):
        self.x = x                          # 坐标x
        self.y = y
        self.type = type                    # 类别值(0:空;1:障碍或边界)
        self.reward = reward                # 该格子的即时奖励
        self.name = None                    # 该格子的名称
        self._update_name()

    def _update_name(self):
        self.name = "X{0}-Y{1}".format(self.x, self.y)

    def __str__(self):
        return "name:{3}, x:{0}, y:{1}, type:{2}".format(self.x,
                                                                    self.y,
                                                                    self.type,
                                                                    self.name
                                                                    )

class GridMatrix(object):
    def __init__(self, n_width:int,                 # 水平方向格子数
                       n_height:int,                # 竖直方向格子数
                       default_type:int = 0,        # 默认类型
                       default_reward:float = 0.0,  # 默认即时奖励值
                       ):
        self.grids = None
        self.n_height = n_height
        self.n_width = n_width
        self.len = n_width * n_height
        self.default_reward = default_reward
        self.default_type = default_type
        self.reset()

    def reset(self):
        self.grids = []
        for x in range(self.n_height):
            for y in range(self.n_width):
                self.grids.append(Grid(x,
                                       y,
                                       self.default_type,
                                       self.default_reward))

    def get_grid(self, x, y=None):
        '''获取一个格子信息
        args:坐标信息,由x,y表示或仅有一个类型为tuple的x表示
        return:grid object
        '''
        xx, yy = None, None
        if isinstance(x, int):
            xx, yy = x, y
        elif isinstance(x, tuple):
            xx, yy = x[0], x[1]
        assert(xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), "任意坐标值应在合理区间"
        index = yy * self.n_width + xx
        return self.grids[index]

    def set_reward(self, x, y, reward):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.reward = reward
        else:
            raise("grid doesn't exist")

    def set_type(self, x, y, type):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.type = type
        else:
            raise("grid doesn't exist")

    def get_reward(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.reward


    def get_type(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.type

# 格子世界环境
class GridWorldEnv(gym.Env):

    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 30
    }

    def __init__(self, n_width: int=5,
                       n_height: int = 5,
                       u_size=40,
                       default_reward: float = 0.0,
                       default_type=0):
        self.u_size = u_size                        # 当前格子绘制尺寸
        self.n_width = n_width                      # 格子世界宽度(以格子数计)
        self.n_height = n_height                    # 高度
        self.width = u_size * n_width               # 场景宽度 screen width
        self.height = u_size * n_height             # 场景长度
        self.default_reward = default_reward
        self.default_type = default_type

        self.grids = GridMatrix(n_width=self.n_width,
                                n_height=self.n_height,
                                default_reward=self.default_reward,
                                default_type=self.default_type)
        self.reward = 0                             # for rendering
        self.action = None                          # for rendering

        # 0,1,2,3 represent up, down, left, right
        self.action_space = spaces.Discrete(4)
        # 观察空间由low和high决定
        self.observation_space = spaces.Discrete(self.n_height * self.n_width)

        self.ends = [(0, 0)]  # 终止格子坐标,可以有多个
        self.start = (0, 4)  # 起始格子坐标,只有一个
        self.types = [(2, 2, 1)]
        self.rewards = []
        self.refresh_setting()
        self.viewer = None  # 图形接口对象
        self.seed()  # 产生一个随机子
        self.reset()

    def seed(self, seed=None):
        # 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
        self.action = action                        # action for rendering
        old_x, old_y = self._state_to_xy(self.state)
        new_x, new_y = old_x, old_y

        if action == 0: new_y += 1  # up
        elif action == 1: new_y -= 1  # down
        elif action == 2: new_x -= 1  # left
        elif action == 3: new_x += 1  # right

        # boundary effect
        if new_x < 0: new_x = 0
        if new_x >= self.n_width: new_x = self.n_width - 1
        if new_y < 0: new_y = 0
        if new_y >= self.n_height: new_y = self.n_height - 1

        # wall effect:
        # 类型为1的格子为障碍格子,不可进入
        if self.grids.get_type(new_x, new_y) == 1:
            new_x, new_y = old_x, old_y

        self.reward = self.grids.get_reward(new_x, new_y)
        done = self._is_end_state(new_x, new_y)
        self.state = self._xy_to_state(new_x, new_y)
        # 提供格子世界所有的信息在info内
        info = {"x": new_x, "y": new_y, "grids": self.grids}
        return self.state, self.reward, done, info

    # 将状态变为横纵坐标
    def _state_to_xy(self, s):
        x = s % self.n_width
        y = int((s - x) / self.n_width)
        return x, y

    def _xy_to_state(self, x, y=None):
        if isinstance(x, int):
            assert (isinstance(y, int)), "incomplete Position info"
            return x + self.n_width * y
        elif isinstance(x, tuple):
            return x[0] + self.n_width * x[1]
        return -1  # 未知状态

    def refresh_setting(self):
        '''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
        的设置,修改设置后通过调用该方法使得设置生效。
        '''
        for x, y, r in self.rewards:
            self.grids.set_reward(x, y, r)
        for x, y, t in self.types:
            self.grids.set_type(x, y, t)

    def reset(self):
        self.state = self._xy_to_state(self.start)
        return self.state

    # 判断是否是终止状态
    def _is_end_state(self, x, y=None):
        if y is not None:
            xx, yy = x, y
        elif isinstance(x, int):
            xx, yy = self._state_to_xy(x)
        else:
            assert (isinstance(x, tuple)), "坐标数据不完整"
            xx, yy = x[0], x[1]
        for end in self.ends:
            if xx == end[0] and yy == end[1]:
                return True
        return False

    # 图形化界面
    def render(self, mode='human', close=False):
        if close:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer = None
            return
        zero = (0, 0)
        u_size = self.u_size
        m = 2                                       # 格子之间的间隙尺寸

        # 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(self.width, self.height)

            # 绘制格子
            for x in range(self.n_width):
                for y in range(self.n_height):
                    v = [(x * u_size + m, y * u_size + m),
                         ((x + 1) * u_size - m, y * u_size + m),
                         ((x + 1) * u_size - m, (y + 1) * u_size - m),
                         (x * u_size + m, (y + 1) * u_size - m)]

                    rect = rendering.FilledPolygon(v)
                    r = self.grids.get_reward(x, y) / 10
                    if r < 0:
                        rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
                    elif r > 0:
                        rect.set_color(0.3, 0.5 + r, 0.3)
                    else:
                        rect.set_color(0.9, 0.9, 0.9)
                    self.viewer.add_geom(rect)
                    # 绘制边框
                    v_outline = [(x * u_size + m, y * u_size + m),
                                 ((x + 1) * u_size - m, y * u_size + m),
                                 ((x + 1) * u_size - m, (y + 1) * u_size - m),
                                 (x * u_size + m, (y + 1) * u_size - m)]
                    outline = rendering.make_polygon(v_outline, False)
                    outline.set_linewidth(3)

                    if self._is_end_state(x, y):
                        # 给终点方格添加金黄色边框
                        outline.set_color(0.9, 0.9, 0)
                        self.viewer.add_geom(outline)
                    if self.start[0] == x and self.start[1] == y:
                        outline.set_color(0.5, 0.5, 0.8)
                        self.viewer.add_geom(outline)
                    if self.grids.get_type(x, y) == 1:  # 障碍格子用深灰色表示
                        rect.set_color(0.3, 0.3, 0.3)
                    else:
                        pass
            # 绘制个体
            self.agent = rendering.make_circle(u_size / 4, 30, True)
            self.agent.set_color(1.0, 1.0, 0.0)
            self.viewer.add_geom(self.agent)
            self.agent_trans = rendering.Transform()
            self.agent.add_attr(self.agent_trans)

            # 更新个体位置
        x, y = self._state_to_xy(self.state)
        self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)

        return self.viewer.render(return_rgb_array= mode == 'rgb_array')
# 环境参数设定
class Agent():
    def __init__(self, env):
        self.episode = 1
        self.Q = {}
        self.actions = [0, 1, 2, 3]
        self.position = env.start
        self.model = {}
    # 建立模型
    def make_model(self, pos, act, reward, next_state):
        self.model["{0},{1}".format(pos, act)] = "{0},{1}".format(reward, next_state)
    # 模型规划
    def q_planning(self,n):
        for i in range(0,n):

            a = [i for i in self.model.keys()]
            done = False
            if a != []:
                str = choice(a)
                pos = str.split(",")[0]+","+str.split(",")[1]
                act = int(str.split(",")[2])
                reward = float(self.model[str].split(",")[0])
                next_state = self.model[str].split(",")[1]+","+self.model[str].split(",")[2]
                if next_state == "(0,0)" or next_state == "(4,3)":
                    done = True
                self.updateQ(pos, act, next_state, reward, done)

    def chaxunQ(self, pos, act):
        judge = False
        for i in self.Q:

            if i == "{0},{1}".format(pos, act):
                judge = True
                break
        if judge == True:
            return True
        else:
            self.Q["{0},{1}".format(pos, act)] = float(format(random()/10000, '.3f'))
            return
    # 更新状态动作值Q函数
    def updateQ(self, pos, action, next_pos, reward, done):
        if done == False:
            self.chaxunQ(pos, action)
            old_q = self.Q["{0},{1}".format(pos, action)]
            action1 = self.performmax(next_pos)
            # self.chaxunQ(next_pos, action1)
            new_q = self.Q["{0},{1}".format(next_pos, action1)]
            old_q = old_q + 0.1 * (reward+0.9 * new_q - old_q)
            self.Q["{0},{1}".format(pos, action)] = float(format(old_q, '.3f'))

        else:
            self.chaxunQ(pos, action)
            self.Q["{0},{1}".format(pos, action)] = float(format(reward, '.3f'))
            # print(pos, action,reward)
    # 动作选取策略
    def perform(self, pos):
        eplison = random()

        self.chaxunQ(pos, choice([0, 1, 2, 3]))
        if eplison > 1/self.episode:
            maxq = -1000
            act = ""
            for i in self.Q:
                list = i.split(",")
                state = list[0] + "," + list[1]
                if state == str(pos):
                    if self.Q[i] > maxq:
                        maxq = self.Q[i]
                        act = list[2]
            return int(act)
        else:
            return choice([0, 1, 2, 3])
    # argmaxQ
    def performmax(self, pos):
        maxq = -1000
        str1 = ""
        self.chaxunQ(pos,choice([0,1,2,3]))
        for i in self.Q:
            list = i.split(",")
            state = list[0]+","+list[1]
            if state == str(pos):
                if self.Q[i] > maxq:
                    maxq = self.Q[i]
                    str1 = list[2]
        return int(str1)

def run(n):
    agent = Agent(env)
    total_j = 0
    total_r = 0
    a = []
    b = []
    env.refresh_setting()
    for i in range(0, 300):
        done = False
        env.reset()
        r = 0
        j = 0
        while done == False:
            state = env._state_to_xy(env.state)
            action = agent.perform(state)
            next_state, reward, done, info = env.step(action)
            next_state = env._state_to_xy(next_state)
            # 更新Q值
            agent.updateQ(state, action, next_state, reward,done)
            # 更新模型
            agent.make_model(state, action, reward, next_state)
            r += reward
            # 模型规划
            agent.q_planning(n)
            j = j+1
        tpend(total_r)
        agent.episode += 1
        # print(agent.Q)
        total_j += j
        if i != 0:
            b.append(j)
        print("回合={0},步数={1},奖赏={2}".format(i, j, '%.3f' % r))
    # return (np.array(a)/np.array(total_j)).tolist()
    # return a
    return b
if __name__ == "__main__":
    n_width = 5
    n_height = 5
    default_reward = 0
    env = GridWorldEnv(n_width, n_height, default_reward=default_reward)
    env.types = [(2, 2, 1)]
    env.rewards = [(0, 0, 1), (4, 3, 1)]  # 奖赏值设定
    env.start = (0, 4)
    env.ends = [(0, 0), (4, 3)]
    env.refresh_setting()

    x = range(1, 300)
    ln1, = plt.plot(x, run(0), label=u"n=0")
    ln2, = plt.plot(x, run(5), label=u"n=5")
    ln3, = plt.plot(x, run(10), label=u"n=10")
    ln4, = plt.plot(x, run(30), label=u"n=30")
    font1 = FontProperties(fname=r"c:\windows\fonts\simsun.ttc", size=15)
ontproperties=font1)
    plt.ylabel(u'步数', fontproperties=font1)
    plt.show()

四、Dyan-Q算法对策略的影响

同样Agent在不同的情节中采用不同的n获得的策略也不一样,当n=50的时候明显快,而且策略更加广泛

 代码如下



from random import random, choice
import gym
from gym import spaces
from gym.utils import seeding

class Grid(object):
    def __init__(self, x:int = None,
                       y:int = None,
                       type:int = 0,
                       reward:float = 0.0):
        self.x = x                          # 坐标x
        self.y = y
        self.type = type                    # 类别值(0:空;1:障碍或边界)
        self.reward = reward                # 该格子的即时奖励
        self.name = None                    # 该格子的名称
        self._update_name()

    def _update_name(self):
        self.name = "X{0}-Y{1}".format(self.x, self.y)

    def __str__(self):
        return "name:{3}, x:{0}, y:{1}, type:{2}".format(self.x,
                                                                    self.y,
                                                                    self.type,
                                                                    self.name
                                                                    )

class GridMatrix(object):
    def __init__(self, n_width:int,                 # 水平方向格子数
                       n_height:int,                # 竖直方向格子数
                       default_type:int = 0,        # 默认类型
                       default_reward:float = 0.0,  # 默认即时奖励值
                       ):
        self.grids = None
        self.n_height = n_height
        self.n_width = n_width
        self.len = n_width * n_height
        self.default_reward = default_reward
        self.default_type = default_type
        self.reset()

    def reset(self):
        self.grids = []
        for x in range(self.n_height):
            for y in range(self.n_width):
                self.grids.append(Grid(x,
                                       y,
                                       self.default_type,
                                       self.default_reward))

    def get_grid(self, x, y=None):
        '''获取一个格子信息
        args:坐标信息,由x,y表示或仅有一个类型为tuple的x表示
        return:grid object
        '''
        xx, yy = None, None
        if isinstance(x, int):
            xx, yy = x, y
        elif isinstance(x, tuple):
            xx, yy = x[0], x[1]
        assert(xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), "任意坐标值应在合理区间"
        index = yy * self.n_width + xx
        return self.grids[index]

    def set_reward(self, x, y, reward):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.reward = reward
        else:
            raise("grid doesn't exist")

    def set_type(self, x, y, type):
        grid = self.get_grid(x, y)
        if grid is not None:
            grid.type = type
        else:
            raise("grid doesn't exist")

    def get_reward(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.reward


    def get_type(self, x, y):
        grid = self.get_grid(x, y)
        if grid is None:
            return None
        return grid.type


class GridWorldEnv(gym.Env):

    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 30
    }

    def __init__(self, n_width: int=5,
                       n_height: int = 5,
                       u_size=40,
                       default_reward: float = 0.0,
                       default_type=0):
        self.u_size = u_size                        # 当前格子绘制尺寸
        self.n_width = n_width                      # 格子世界宽度(以格子数计)
        self.n_height = n_height                    # 高度
        self.width = u_size * n_width               # 场景宽度 screen width
        self.height = u_size * n_height             # 场景长度
        self.default_reward = default_reward
        self.default_type = default_type

        self.grids = GridMatrix(n_width=self.n_width,
                                n_height=self.n_height,
                                default_reward=self.default_reward,
                                default_type=self.default_type)
        self.reward = 0                             # for rendering
        self.action = None                          # for rendering

        # 0,1,2,3 represent up, down, left, right
        self.action_space = spaces.Discrete(4)
        # 观察空间由low和high决定
        self.observation_space = spaces.Discrete(self.n_height * self.n_width)

        self.ends = [(0, 0)]  # 终止格子坐标,可以有多个
        self.start = (0, 4)  # 起始格子坐标,只有一个
        self.types = [(2, 2, 1)]
        self.rewards = []
        self.refresh_setting()
        self.viewer = None  # 图形接口对象
        self.seed()  # 产生一个随机子
        self.reset()

    def seed(self, seed=None):
        # 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
        self.action = action                        # action for rendering
        old_x, old_y = self._state_to_xy(self.state)
        new_x, new_y = old_x, old_y

        if action == 0: new_y += 1  # up
        elif action == 1: new_y -= 1  # down
        elif action == 2: new_x -= 1  # left
        elif action == 3: new_x += 1  # right

        # boundary effect
        if new_x < 0: new_x = 0
        if new_x >= self.n_width: new_x = self.n_width - 1
        if new_y < 0: new_y = 0
        if new_y >= self.n_height: new_y = self.n_height - 1

        # wall effect:
        # 类型为1的格子为障碍格子,不可进入
        if self.grids.get_type(new_x, new_y) == 1:
            new_x, new_y = old_x, old_y

        self.reward = self.grids.get_reward(new_x, new_y)
        done = self._is_end_state(new_x, new_y)
        self.state = self._xy_to_state(new_x, new_y)
        # 提供格子世界所有的信息在info内
        info = {"x": new_x, "y": new_y, "grids": self.grids}
        return self.state, self.reward, done, info

    # 将状态变为横纵坐标
    def _state_to_xy(self, s):
        x = s % self.n_width
        y = int((s - x) / self.n_width)
        return x, y

    def _xy_to_state(self, x, y=None):
        if isinstance(x, int):
            assert (isinstance(y, int)), "incomplete Position info"
            return x + self.n_width * y
        elif isinstance(x, tuple):
            return x[0] + self.n_width * x[1]
        return -1  # 未知状态

    def refresh_setting(self):
        '''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
        的设置,修改设置后通过调用该方法使得设置生效。
        '''
        for x, y, r in self.rewards:
            self.grids.set_reward(x, y, r)
        for x, y, t in self.types:
            self.grids.set_type(x, y, t)

    def reset(self):
        self.state = self._xy_to_state(self.start)
        return self.state

    # 判断是否是终止状态
    def _is_end_state(self, x, y=None):
        if y is not None:
            xx, yy = x, y
        elif isinstance(x, int):
            xx, yy = self._state_to_xy(x)
        else:
            assert (isinstance(x, tuple)), "坐标数据不完整"
            xx, yy = x[0], x[1]
        for end in self.ends:
            if xx == end[0] and yy == end[1]:
                return True
        return False

    # 图形化界面
    def render(self, mode='human', close=False):
        if close:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer = None
            return
        zero = (0, 0)
        u_size = self.u_size
        m = 2                                       # 格子之间的间隙尺寸

        # 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(self.width, self.height)

            # 绘制格子
            for x in range(self.n_width):
                for y in range(self.n_height):
                    v = [(x * u_size + m, y * u_size + m),
                         ((x + 1) * u_size - m, y * u_size + m),
                         ((x + 1) * u_size - m, (y + 1) * u_size - m),
                         (x * u_size + m, (y + 1) * u_size - m)]

                    rect = rendering.FilledPolygon(v)
                    r = self.grids.get_reward(x, y) / 10
                    if r < 0:
                        rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
                    elif r > 0:
                        rect.set_color(0.3, 0.5 + r, 0.3)
                    else:
                        rect.set_color(0.9, 0.9, 0.9)
                    self.viewer.add_geom(rect)
                    # 绘制边框
                    v_outline = [(x * u_size + m, y * u_size + m),
                                 ((x + 1) * u_size - m, y * u_size + m),
                                 ((x + 1) * u_size - m, (y + 1) * u_size - m),
                                 (x * u_size + m, (y + 1) * u_size - m)]
                    outline = rendering.make_polygon(v_outline, False)
                    outline.set_linewidth(3)

                    if self._is_end_state(x, y):
                        # 给终点方格添加金黄色边框
                        outline.set_color(0.9, 0.9, 0)
                        self.viewer.add_geom(outline)
                    if self.start[0] == x and self.start[1] == y:
                        outline.set_color(0.5, 0.5, 0.8)
                        self.viewer.add_geom(outline)
                    if self.grids.get_type(x, y) == 1:  # 障碍格子用深灰色表示
                        rect.set_color(0.3, 0.3, 0.3)
                    else:
                        pass
            # 绘制个体
            self.agent = rendering.make_circle(u_size / 4, 30, True)
            self.agent.set_color(1.0, 1.0, 0.0)
            self.viewer.add_geom(self.agent)
            self.agent_trans = rendering.Transform()
            self.agent.add_attr(self.agent_trans)

            # 更新个体位置
        x, y = self._state_to_xy(self.state)
        self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)

        return self.viewer.render(return_rgb_array= mode == 'rgb_array')

# 环境参数设定
class Agent():
    def __init__(self, env):
        self.episode = 1
        self.Q = {}
        self.actions = [0, 1, 2, 3]
        self.position = env.start
        self.model = {}
    def make_model(self, pos, act, reward, next_state):
        self.model["{0},{1}".format(pos, act)] = "{0},{1}".format(reward, next_state)
    def q_planning(self,n):
        for i in range(0, n):
            a = [i for i in self.model.keys()]
            done=False
            if a != []:
                str = choice(a)
                pos = str.split(",")[0]+","+str.split(",")[1]
                act = int(str.split(",")[2])
                reward = float(self.model[str].split(",")[0])
                next_state = self.model[str].split(",")[1]+","+self.model[str].split(",")[2]
                if next_state == "(8,5)" or next_state == "(1,6)":
                    done = True
                self.updateQ(pos,act,next_state,reward,done)

    def chaxunQ(self, pos, act):
        judge = False
        for i in self.Q:

            if i == "{0},{1}".format(pos, act):
                judge = True
                break
        if judge == True:
            return True
        else:
            self.Q["{0},{1}".format(pos, act)] = float(format(random()/10000, '.3f'))
            return
    # 更新状态动作值Q函数
    def updateQ(self, pos,action,next_pos,reward,done):
        if done == False:
            self.chaxunQ(pos, action)
            old_q = self.Q["{0},{1}".format(pos, action)]
            action1 = self.performmax(next_pos)
            new_q = self.Q["{0},{1}".format(next_pos, action1)]
            old_q = old_q + 0.1 * (reward+0.9 * new_q - old_q)
            self.Q["{0},{1}".format(pos, action)] = float(format(old_q, '.3f'))

        else:
            self.chaxunQ(pos, action)
            self.Q["{0},{1}".format(pos, action)] = float(format(reward, '.3f'))
    # 动作选取策略
    def perform(self, pos):
        eplison = random()

        self.chaxunQ(pos, choice([0, 1, 2, 3]))
        if eplison > 1/self.episode:
            maxq = -1000
            act = ""
            for i in self.Q:
                list = i.split(",")
                state = list[0] + "," + list[1]
                if state == str(pos):
                    if self.Q[i] > maxq:
                        maxq = self.Q[i]
                        act = list[2]
            return int(act)
        else:
            return choice([0, 1, 2, 3])
    # argmaxQ
    def performmax(self, pos):
        maxq = -1000
        str1 = ""
        self.chaxunQ(pos,choice([0,1,2,3]))
        for i in self.Q:
            list = i.split(",")
            state = list[0]+","+list[1]
            if state == str(pos):
                if self.Q[i] > maxq:
                    maxq = self.Q[i]
                    str1 = list[2]
        return int(str1)

def run(n):
    agent = Agent(env)
    total_j = 0
    total_r = 0
    a = []
    b = []
    env.refresh_setting()
    for i in range(0, 300):
        done = False
        env.reset()
        r = 0
        j = 0
        while done == False and j < 50:
            j = j + 1
            state = env._state_to_xy(env.state)
            action = agent.perform(state)
            next_state, reward, done, info = env.step(action)
            next_state = env._state_to_xy(next_state)
            # 更新模型
            agent.make_model(state, action, reward, next_state)
            # 更新Q值
            agent.updateQ(state, action, next_state, reward, done)
            r += reward
            # 模型规划
            agent.q_planning(n)

        if i >= 2:
            total_r += r
            a.append(total_r)
        agent.episode += 1
        action_Q = {}
        for i in sorted(list(agent.Q.keys()), key=lambda x: [x[1], x[4], x[5], x[-1]]):
            action_Q[i] = agent.Q[i]
        print("n={0}:Q值{1}".format(n, action_Q))
        # P_A:状态采取策略
        P_A = {}
        Q_keys = list(action_Q.keys())
        for i in range(0, len(Q_keys)-4, 4):
            temp_action_list = []
            max_a_value = max(action_Q[Q_keys[j]] for j in range(i, i+4))
            # temp_num:标记四个动作最大值
            temp_num = [0, 0, 0, 0]
            # PA:四个动作概率
            PA = [0, 0, 0, 0]
            for k in range(i, i+4):
                if action_Q[Q_keys[k]] == max_a_value:
                    temp_action_list.append(Q_keys[k])
                    temp_num[k-i] = 1
            valid_action_p = round(1/len(temp_action_list), 2)
            for m in range(4):
                if temp_num[m] == 1:
                    PA[m] = valid_action_p
            P_A[Q_keys[i][0:-2]] = PA
        print("Q_A: ", P_A)
        total_j += j
        b.append(j)
    return a
if __name__ == "__main__":
    n_width = 11
    n_height = 7
    default_reward = -0.1
    env = GridWorldEnv(n_width, n_height, default_reward=default_reward)
    env.types = [(1, 2, 1), (2, 2, 1), (3, 2, 1), (4, 2, 1), (5, 2, 1), (6, 2, 1), (7, 2, 1), (8, 2, 1), (9, 2, 1),
                 (10, 2, 1)]
    env.rewards = [(8, 5, 1), (1, 6, 1)]  # 奖赏值设定
    env.start = (4, 0)  # 机器人出发点坐标
    env.ends = [(8, 5), (1, 6)]  # 吸入状态坐标
    env.refresh_setting()

    for i in range(0, 2):
        print("episode=", i+1)
        run(0)
    for i in range(0, 2):
        print("episode=", i+1)
        run(50)

创作不易 觉得有帮助请点赞关注收藏~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/32394.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学生个人网页设计作品 学生个人网页模板 简单个人主页成品 个人网页制作 HTML学生个人网站作业设计 汉语言文学设计题材网页

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

在Postman中使用 FineBI提供的接口获取数据

FineBI 通过各种样式如表格、图表等来呈现数据&#xff0c;进行统计分析。 FineBI 是 B/S 架构的纯 Java 软件。 这些数据表格或图表&#xff0c;用户在开发系统的时候也可以自己编程来实现&#xff0c;FineBI也提供了相应的接口。 在Postman中使用 FineBI提供的接口获取数据…

数据可视化软件使用

一 前言 数据可视化平台是通过三维表示技术来表达复杂的信息&#xff0c;实现海量数据的立体体现。可视化技术借鉴人脑的视觉显示能力&#xff0c;通过挖掘重要数据之间的关系&#xff0c;揭示数据中隐藏的关联和发展趋势&#xff0c;从而提高数据的使用效率。可视化平台使人们…

在这个艰难的环境下,我裸辞了

2022年&#xff0c;疫情期间&#xff0c;工作了22年的我&#xff0c;从软件研发管理的相关工作中&#xff0c;辞职创业&#xff0c;开启我的独立咨询顾问生涯。很多人不解和迷惑&#xff0c;也有朋友关切的询问我的近况&#xff0c;就差用手来摸我的额头以判断我是否发烧了。因…

[附源码]SSM计算机毕业设计江苏人才信息管理系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

.net----委托和事件

委托和事件委托声明实例化调用将类型安全的函数指针(方法)作为其他方法的参数进行传递&#xff0c;从而实现函数回调方法委托&#xff1a;匿名方法委托多播委托委托&#xff1a;委托的异步调用委托&#xff1a;委托的兼容性事件事件实际上是委托的一种特殊形式&#xff0c;C#使…

软考-系统架构师-计算机与网络基础知识-数据库系统基础知识

文章目录1.关系数据库基础1.1关系型数据库基础1.1.1数据库的结构与模式1.1.2实体联系E-R模型1.1.3数据的规范化1.1.4事务管理1.1.5并发控制1.1.6数据库的备份和恢复2.关系数据库设计2.1数据库设计的特点2.2数据库的设计方法2.3数据库设计的基本步骤3.分布式数据库系统3.1分布式…

simulink中比scope模块还好用的平替出图工具?

今天在捣拾scope模块比较几个数据大小&#xff0c;拉坐标线非常的不方便&#xff0c;而且对于调参时几组数据的比较非常繁琐&#xff0c;这里介绍以下simulink中自带的数据检查器(Data Inspector)&#xff0c;个人认为比scope模块方便查看出图结果&#xff0c;有帮助的童鞋们赶…

算法设计与分析 SCAU11090 最大m段乘积和最小m段和(优先做)

11090 最大m段乘积和最小m段和&#xff08;优先做&#xff09; 时间限制:1000MS 代码长度限制:10KB 提交次数:0 通过次数:0 题型: 编程题 语言: G;GCC;VC;JAVA Description 一个n位十进制整数S&#xff0c;若将S划分为m个段&#xff0c;则可以得到m个整数。 (1)这m个整数的…

有限元在游乐设施中的应用-焊缝计算

作者 | 九峰知己千杯少 一、前言 游乐设施金属结构所采用的连接方式有焊接连接、铆钉连接、普通螺栓连接和高强螺栓连接4种&#xff0c;将两块分离的金属其接头部分局部加热到熔化或半熔化状态&#xff0c;采取施加压力或不加压&#xff0c;或填充其他金属&#xff0c;利用原…

C#上位机系列(4)—示波器一新窗口的建立

本文是讲解C#.net平台的Winform框架下的第四个内容&#xff0c;手把手介绍上位机项目的创建方式以及一些写软件时常用的功能&#xff0c;讲解从零开始的每一个步骤。 本次介绍上位机中新窗口的建立方式和软件示波器的代码原理。 从此节开始&#xff0c;所有代码附后 1.新窗口…

element-plus中menu的基本知识点

在vue后台管理系统中&#xff0c;menu是经常会用到的必不可少的导航组件&#xff0c;这个组件如果是单纯的去使用&#xff0c;很简单。但是在实际开发过程中&#xff0c;与其有关的路由相结合使用&#xff0c;还是容易搞混一些东邪&#xff0c;所以想在这里记录一下。 从产品的…

SuperMap 云原生常见问题解决办法-consul启动异常

在iManager for K8S产品中&#xff0c;如果创建了云套件站点&#xff0c;会有三个consul的服务&#xff0c;consul在云套件中充当的角色是服务发现&#xff0c;服务注册&#xff0c;以及配置共享。如果consul服务失效&#xff0c;云套件的整体服务将不能正常运行。客户在使用云…

用DIV+CSS技术设计的环保主题网站(web前端网页制作课作业)

&#x1f380; 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

教你如何在优麒麟上搭建 RISC-V 交叉编译环境

由于 RISC-V 设备价格昂贵、不易采购等诸多原因&#xff0c;许多小伙伴虽然很感兴趣&#xff0c;但仍无法参与 RISC-V 开发工作&#xff0c;今天就教大家如何在优麒麟上搭建 RISC-V 交叉编译环境&#xff0c;快学起来吧&#xff01; 交叉编译&#xff08;Cross Compile&#x…

收藏 | 机器学习公共数据集集锦(附下载链接)

>>>深度学习Tricks&#xff0c;第一时间送达<<< &#x1f680;&#x1f680;&#x1f680;近期&#xff0c;小海带在空闲之余&#xff0c;收集整理了一批机器学习公共数据集供大家参考。 整理不易&#xff0c;小伙伴们记得一键三连喔&#xff01;&#xff0…

cpu天梯图2022年11月 cpu排行榜天梯图2022

一、i9-13900K 1、13900K参数&#xff1a;24核32线程&#xff0c;睿频5.8GHz&#xff0c;基础功耗125W&#xff0c;最大睿频功耗253W。 2、推荐搭配主板&#xff1a;Z790、B760、Z690、B660。 3、目前单核性能最强的一款CPU&#xff0c;拥有超强的超频能力&#xff0c;为玩家带…

swift指针内存管理-引用

引用探究 首先看一个例子 那么这个 0x0000000000000003 是什么意思呢 回到swift源码 找到关键核心类型 HeapObject 就是 swift 分配内存获取到的结构类型 HeapObject 第一个8字节为 metadata, 接下来是宏 InlineRefCounts 其实 就是泛型真正类型 InlineRefCountBits 至此&am…

Android性能优化方法论

作为一名开发&#xff0c;性能优化是永远绕不过去的话题&#xff0c;在日常的开发中&#xff0c;我们可肯定都会接触过。Android 的性能优化其实是非常成熟的了&#xff0c;成熟的套路&#xff0c;成熟的方法论&#xff0c;成熟的开源框架等等。 对于接触性能优化经验较少的开…

大学生简单个人静态HTML网页设计作品 DIV布局个人介绍网页模板代码 DW学生个人网站制作成品下载

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…