大家好,本文将实现一种强化学习算法来解决迷宫问题,并完成以下步骤:创建迷宫环境、定义迷宫类,以及使用值迭代算法(Value Iteration algorithm)找到穿越迷宫的最优策略。为了使这一过程可视化,使用PyGame来模拟迷宫求解过程,从而使学习过程更加有趣和愉悦。
设置项目
在开始之前,通过创建虚拟环境和安装所需的依赖项来设置项目。
# 创建虚拟环境
python -m venv venv
# 在 Windows 上激活虚拟环境
venv\Scripts\activate.bat
# 在 macOS 上激活虚拟环境
source venv/bin/activate
# 安装依赖项
pip install -r requirements.txt
创建迷宫类
将首先定义迷宫类,它将代表迷宫环境。该类将包括与迷宫创建和强化学习相关的属性和方法。
import numpy as np
import random
from typing import Tuple
class Maze:
def __init__(self, level, goal_pos: Tuple[int, int], MAZE_HEIGHT=600, MAZE_WIDTH=600, SIZE=25):
# 迷宫配置的属性
self.goal = goal_pos
self.number_of_tiles = SIZE
self.tile_size = MAZE_HEIGHT // self.number_of_tiles
self.walls = self.create_walls(level)
self.goal_pos = goal_pos
self.state = self.get_init_state(level)
self.maze = self.create_maze(level)
# 强化学习的属性
self.state_values = np.zeros((self.number_of_tiles, self.number_of_tiles))
self.policy_probs = np.full((self.number_of_tiles, self.number_of_tiles, 4), 0.25)
创建迷宫
接下来,介绍实现创建迷宫和定义初始状态的方法。
def create_maze(self, level):
maze = []
walls = []
for row in range(len(level)):
for col in range(len(level[row])):
if level[row][col] == " ":
maze.append((row, col))
elif level[row][col] == "X":
walls.append((row, col))
return maze, walls
def get_init_state(self, level):
for row in range(len(level)):
for col in range(len(level[row])):
if level[row][col] == "P":
return (row, col)
进行步骤和计算奖励
现在,定义计算奖励、模拟步骤和获取下一个状态的方法。
def compute_reward(self, state: Tuple[int, int], action: int):
next_state = self._get_next_state(state, action)
return -float(state != self.goal_pos)
def step(self, action):
next_state = self._get_next_state(self.state, action)
reward = self.compute_reward(self.state, action)
done = next_state == self.goal
return next_state, reward, done
def simulate_step(self, state, action):
next_state = self._get_next_state(state, action)
reward = self.compute_reward(state, action)
done = next_state == self.goal
return next_state, reward, done
def _get_next_state(self, state: Tuple[int, int], action: int):
if action == 0:
next_state = (state[0], state[1] - 1)
elif action == 1:
next_state = (state[0] - 1, state[1])
elif action == 2:
next_state = (state[0], state[1] + 1)
elif action == 3:
next_state = (state[0] + 1, state[1])
else:
raise ValueError("Action value not supported:", action)
if (next_state[0], next_state[1]) not in self.walls:
return next_state
return state
解迷宫
最后,使用值迭代算法来解决迷宫问题并找到最优策略。
def solve(self, gamma=0.99, theta=1e-6):
delta = float("inf")
while delta > theta:
delta = 0
for row in range(self.number_of_tiles):
for col in range(self.number_of_tiles):
if (row, col) not in self.walls:
old_value = self.state_values[row, col]
q_max = float("-inf")
for action in range(4):
next_state, reward, done = self.simulate_step((row, col), action)
value = reward + gamma * self.state_values[next_state]
if value > q_max:
q_max = value
action_probs = np.zeros(shape=(4))
action_probs[action] = 1
self.state_values[row, col] = q_max
self.policy_probs[row, col] = action_probs
delta = max(delta, abs(old_value - self.state_values[row, col]))
solve
方法是强化学习算法的核心,用于找到穿越迷宫的最佳策略。它采用了值迭代技术,这是一种迭代算法,可以逐步更新每个状态的估计值,直到它们收敛到最佳值。
1. 初始化:
-
gamma
:未来奖励的折扣系数(默认值:0.99)。该系数决定了即时奖励相对于未来奖励的重要性。gamma
值越高,代理程序就越重视长期奖励,而gamma
值越低,代理程序就越重视即时奖励。 -
theta
:收敛阈值(默认值:1e-6)。该参数定义了被视为收敛状态值的最小变化量。一旦状态值的变化低于这个阈值,算法就会停止迭代。 -
delta
:初始化为无穷大的变量,用于测量迭代过程中状态值的变化量。
2. 迭代值更新:
-
该方法使用一个
while
循环,一直持续到状态值的变化量(delta
)小于指定的收敛阈值(theta
)为止。 -
在循环中,每次迭代前都会将
delta
变量重置为0
,以跟踪所有状态下状态值的最大变化量。
3. 值迭代:
-
对于迷宫中的每个单元(不包括墙壁),算法会计算所有可能行动(上、下、左、右)的Q值(
q_max
)。 -
Q值表示代理程序在给定状态下采取特定行动并在此后遵循最优策略时所能获得的预期累积奖励。
-
Q值是根据
simulate_step
方法得到的即时奖励和下一个状态值的折现值(gamma * self.state_values[next_state]
)确定的。 -
选择具有最大Q值(
q_max
)的行动作为当前状态下采取的最佳行动。 -
为了更新策略,相应的策略概率(
action_probs
)会被更新,以反映该行动被选中的概率最高。所有其他行动的概率都设置为0
。
4. 更新状态值和策略概率:
-
在计算给定状态的所有行动的Q值后,将该状态的状态值(
self.state_values[row, col]
)更新为所有动作中的最大Q值(q_max
)。 -
将该状态的策略概率(
self.policy_probs[row, col]
)更新为具有最大Q值的动作的高概率(1.0),并将所有其他动作的概率设置为0
。 -
此过程对迷宫中的所有状态进行迭代,不断更新状态值和策略概率,直到满足收敛阈值为止。
5. 收敛检查:
-
在更新迷宫中所有状态的状态值和策略概率后,该方法会检查此迭代过程中状态值的最大变化量(
delta
)是否小于收敛阈值(theta
)。 -
如果满足条件,循环终止,迷宫求解过程被认为已收敛。此时,
self.state_values
数组包含了每个状态的最优值,self.policy_probs
数组反映了在迷宫中导航的最优策略。
通过使用值迭代(Value Iteration)算法,代理程序可以学习最优的迷宫导航策略,最大化预期的累积奖励,同时避开墙壁并高效地到达目标状态。最终的策略概率会指导代理程序的行动,从而实现智能和高效的迷宫解决策略。
使用PyGame模拟
在本节中,将使用PyGame可视化机器人在迷宫中的导航,将建造墙壁、放置宝藏,并控制玩家的移动,以观察它是如何通过实际操作解决迷宫问题的。
首先,创建一个名为main.py
的新Python脚本,并添加以下代码:
import pygame
import numpy as np
from maze import Maze
import threading
# 常量
GAME_HEIGHT = 600
GAME_WIDTH = 600
NUMBER_OF_TILES = 25
SCREEN_HEIGHT = 700
SCREEN_WIDTH = 700
TILE_SIZE = GAME_HEIGHT // NUMBER_OF_TILES
# 迷宫布局
level = [
"XXXXXXXXXXXXXXXXXXXXXXXXX",
"X XXXXXXXX XXXXX",
"X XXXXXXXX XXXXXX XXXXX",
"X XXX XXXXXX XXXXX",
"X XXX XXX PX",
"XXXXXX XX XXX XX",
"XXXXXX XX XXXXXX XXXXX",
"XXXXXX XX XXXXXX XXXXX",
"X XXX XXXXXXXXXXXXX",
"X XXX XXXXXXXXXXXXXXXXX",
"X XXXXXXXXXXXXXXX",
"X XXXXXXXXXXX",
"XXXXXXXXXXX XXXXX X",
"XXXXXXXXXXXXXXX XXXXX X",
"XXX XXXXXXXXXX X",
"XXX X",
"XXX XXXXXXXXXXXXX",
"XXXXXXXXXX XXXXXXXXXXXXX",
"XXXXXXXXXX X",
"XX XXXXX X",
"XX XXXXXXXXXXXXX XXXXX",
"XX XXXXXXXXXXXX XXXXX",
"XX XXXX X",
"XXXX X",
"XXXXXXXXXXXXXXXXXXXXXXXXX",
]
# 创建迷宫环境
env = Maze(
level,
goal_pos=(23, 20),
MAZE_HEIGHT=GAME_HEIGHT,
MAZE_WIDTH=GAME_WIDTH,
SIZE=NUMBER_OF_TILES,
)
env.reset()
env.solve()
# 初始化Pygame
pygame.init()
# 创建游戏窗口
screen = pygame.display.set_mode((SCREEN_HEIGHT, SCREEN_WIDTH))
pygame.display.set_caption("Maze Solver")
# 创建绘制迷宫的表面
surface = pygame.Surface((GAME_HEIGHT, GAME_WIDTH))
# 设置帧率
clock = pygame.time.Clock()
# 游戏循环的运行标志
running = True
# 获取初始玩家和目标位置
treasure_pos = env.goal_pos
player_pos = env.state
def reset_goal():
# 检查玩家是否到达目标位置,然后重置目标位置
if env.state == env.goal_pos:
env.reset()
env.solve()
# 游戏循环
while running:
# 启动一个新线程来检查和重置目标位置
x = threading.Thread(target=reset_goal)
x.daemon = True
x.start()
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
# 清空表面
surface.fill((27, 64, 121))
# 绘制迷宫中的墙壁
for row in range(len(level)):
for col in range(len(level[row])):
if level[row][col] == "X":
pygame.draw.rect(
surface,
(241, 162, 8),
(col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE),
)
# 绘制玩家的位置
pygame.draw.rect(
surface,
(255, 51, 102),
pygame.Rect(
player_pos[1] * TILE_SIZE,
player_pos[0] * TILE_SIZE,
TILE_SIZE,
TILE_SIZE,
).inflate(-TILE_SIZE / 3, -TILE_SIZE / 3),
border_radius=3,
)
# 绘制目标位置
pygame.draw.rect(
surface,
"green",
pygame.Rect(
env.goal_pos[1] * TILE_SIZE,
env.goal_pos[0] * TILE_SIZE,
TILE_SIZE,
TILE_SIZE,
).inflate(-TILE_SIZE / 3, -TILE_SIZE / 3),
border_radius=TILE_SIZE,
)
# 更新屏幕
screen.blit(
surface, ((SCREEN_HEIGHT - GAME_HEIGHT) / 2
代码解释
导入库并创建常量:
-
代码导入了所需的库,包括Pygame、NumPy和自定义
Maze
类。 -
还定义了一些常量来设置迷宫环境、屏幕尺寸和磁贴大小。
创建迷宫环境:
-
迷宫布局使用名为
level
的字符串列表定义。 -
创建一个
Maze
类实例,并将level
和目标位置作为参数传递。 -
然后使用
env
对象重置迷宫,并使用solve
方法找到最佳策略。
初始化PyGame
游戏主循环:
-
程序进入一个循环,一直运行到
running
变量变为“False
”为止。 -
reset_goal
函数会在一个单独的线程中调用,以检查玩家是否已达到目标,并在必要时重置目标。
绘制迷宫:
-
在屏幕上绘制迷宫时,会遍历
level
列表,并根据墙上的字符("X")绘制地砖。 -
玩家的位置被绘制成一个填充特定颜色(红色)的矩形,并位于迷宫单元的中心。
-
目标位置则绘制为带圆角的绿色矩形。
更新游戏状态和玩家移动:
-
根据玩家所在位置的当前策略(
env.policy_probs
)确定玩家的行动。使用np.argmax
选择概率最高的行动。 -
根据选择的行动,如果是有效的移动(不撞墙),就会更新玩家的位置。
-
玩家在
env
对象中的位置也会更新。
退出游戏:
-
当
running
变量变为“False
”时,游戏循环退出,通常是由用户关闭游戏窗口触发。 -
Pygame会通过
pygame.quit()
关闭。 - 现在只需运行
main.py
即可看到代码的运行。
综上,现在已经成功地使用强化学习实现了值迭代算法来解决迷宫问题,还使用PyGame将迷宫求解过程可视化,使其更具互动性和趣味性。大家可以随意尝试不同的迷宫配置、学习率和折扣系数,观察它们对学习过程的影响。