一、DQN(Deep Q-Network)方法概述
DQN(Deep Q-Network)是一种强化学习方法,通过结合Q-learning算法和深度神经网络来解决强化学习问题。它是深度强化学习的里程碑之一,由DeepMind在2013年提出,被广泛应用于各种复杂的强化学习任务。DQN方法的概述如下:
1.强化学习问题:在强化学习中,智能体与环境进行交互,通过观察环境的状态并采取动作,来最大化累积奖励。智能体在环境中移动并与之交互,不断学习并优化策略,以在不同的状态下选择最优动作。
2.Q-learning算法:Q-learning是一种经典的强化学习算法,用于学习最优Q函数。Q函数表示在给定状态下采取某个动作的预期累积奖励值。Q-learning使用迭代更新的方式逼近最优Q函数,其核心思想是使用贝尔曼方程来更新Q值。贝尔曼方程表示当前状态下采取动作的Q值可以通过下一个状态的最大Q值和立即奖励来进行递归更新。
3.DQN的创新点:DQN的创新之处在于使用深度神经网络来逼近Q函数。传统的Q-learning方法使用表格存储Q值,但在大型状态空间问题中,表格变得不可行。DQN通过使用神经网络来表示Q函数,将状态作为输入,输出对应于每个动作的Q值,从而可以对大型状态空间进行近似求解。
4.Experience Replay(经验回放):DQN使用经验回放技术来存储智能体的经验,包括状态、动作、奖励和下一个状态。在训练过程中,DQN从经验回放缓冲区中随机抽样,以打破数据之间的关联性,从而更有效地使用经验数据进行训练。
5.Target Network(目标网络):为了稳定训练过程,DQN引入了目标网络。在训练过程中,有两个神经网络:一个是用于选择动作的主网络,另一个是用于计算目标Q值的目标网络。目标网络的参数比主网络的参数更新更慢,这有助于减少训练中的目标Q值估计的波动性。
6.Double Q-learning:DQN还采用了Double Q-learning的思想,用于更准确地估计Q值。在目标网络和主网络中分别选择最大动作,并结合它们的Q值来更新目标Q值。
DQN方法的训练过程是迭代的,通过反复与环境交互、更新神经网络权重和优化策略,使得智能体逐渐学习到最优的Q函数,并从中得到最佳决策策略。DQN在很多复杂的强化学习任务中取得了显著的成功,并为后续深度强化学习算法的发展奠定了基础。
二、强化学习代码实现及训练过程
本文将实现一个使用DQN算法和深度神经网络的强化学习代理,通过逼近Q函数来优化决策策略。代理以单一分数来表示每个状态的预期得分,并使用神经网络训练来逼近这些Q值。在训练过程中,代理通过经验回放和目标网络来稳定训练,并通过探索与利用策略优化决策能力。
首先,我们基于DQN实现一个DQNAgent类,该类实现了一个基于深度强化学习的代理,使用DQN(Deep Q-Network)算法来解决强化学习问题。代码逻辑功能概述如下:
1.这个代理使用DQN算法来学习最优的决策策略。
2.代理的目标是找到所有可能状态的最佳最终状态的组合,而不是传统方法中找到特定状态的最佳动作。
3.通过使用深度神经网络来逼近Q函数,代理可以处理大型状态空间的问题。
4.代码中使用经验回放技术和目标网络来优化训练过程,提高稳定性和效率。
实现的代码如下:
class DQNAgent:
def __init__(self, state_size, mem_size=10000, discount=0.95, epsilon=1, epsilon_min=0, epsilon_stop_episode=500,
n_neurons=[32, 32], activations=('relu', 'relu', 'linear'), loss='mse', optimizer='adam',
replay_start_size=None):
# 初始化DQNAgent代理
assert len(activations) == len(n_neurons) + 1
self.state_size = state_size
self.memory = deque(maxlen=mem_size)
self.discount = discount
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = (self.epsilon - self.epsilon_min) / epsilon_stop_episode
self.n_neurons = n_neurons
self.activations = activations
self.loss = loss
self.optimizer = optimizer
if not replay_start_size:
replay_start_size = mem_size / 2
self.replay_start_size = replay_start_size
self.model = self.build_model()
# 创建一个深度神经网络模型
def build_model(self) -> Model:
model = Sequential()
model.add(Dense(self.n_neurons[0], input_dim=self.state_size, activation=self.activations[0]))
for i in range(1, len(self.n_neurons)):
model.add(Dense(self.n_neurons[i], activation=self.activations[i]))
model.add(Dense(1, activation=self.activations[-1]))
model.compile(loss=self.loss, optimizer=self.optimizer)
return model
# 将动作过程添加到经验回放缓冲区中
def add_to_memory(self, current_state, next_state, reward, done):
self.memory.append((current_state, next_state, reward, done))
# 为某个动作分配一个随机得分
def random_value(self):
return random.random()
# 预测给定状态的得分
def predict_value(self, state: np.ndarray) -> float:
return self.model.predict(state)[0]
# 返回给定状态的预期得分
def act(self, state):
state = np.reshape(state, [1, self.state_size])
if random.random() <= self.epsilon:
return self.random_value()
else:
return self.predict_value(state)
# 返回给定状态集合中的最佳状态
def best_state(self, states):
max_value = None
best_state = None
if random.random() <= self.epsilon:
return random.choice(list(states))
else:
for state in states:
value = self.predict_value(np.reshape(state, [1, self.state_size]))
if not max_value or value > max_value:
max_value = value
best_state = state
return best_state
# 训练神经网络模型
def train(self, batch_size=32, epochs=3):
n = len(self.memory)
if n >= self.replay_start_size and n >= batch_size:
batch = random.sample(self.memory, batch_size)
# 获取下一个状态的预期得分
next_states = np.array([x[1] for x in batch])
next_qs = [x[0] for x in self.model.predict(next_states)]
x = []
y = []
# 构建训练数据的输入输出结构
for i, (state, _, reward, done) in enumerate(batch):
if not done:
new_q = reward + self.discount * next_qs[i] # 更新预期得分(Q值)
else:
new_q = reward
x.append(state)
y.append(new_q)
# 使用训练数据拟合模型
self.model.fit(np.array(x), np.array(y), batch_size=batch_size, epochs=epochs, verbose=0)
# 更新探索变量
if self.epsilon > self.epsilon_min:
self.epsilon -= self.epsilon_decay
代码中DQNAgent类:
state_size:输入域(状态空间)的大小。
mem_size:回放缓冲区的大小。
discount:未来奖励相对于即时奖励的重要性(折扣因子)[0,1]。
epsilon:开始时的探索概率(以给定概率执行随机动作)。
epsilon_min:当代理停止减少探索概率时的最小值。
epsilon_stop_episode:代理停止减少探索变量的回合数。
n_neurons:每个隐藏层中神经元的数量的列表。
activations:每个隐藏层和输出层中使用的激活函数的列表。
loss:用于训练神经网络的损失函数。
optimizer:用于训练神经网络的优化器。
build_model():方法用于创建和编译基于指定结构(神经元数量和激活函数)的Keras神经网络模型。
add_to_memory():方法用于将一个经验元组(当前状态、下一个状态、奖励、完成标志)添加到回放缓冲器中。
random_value():方法返回一个随机值(用于在探索时为某个动作分配随机分数)。
predict_value():方法接收一个状态作为输入,并预测该状态的预期分数,使用训练好的神经网络。
act():方法根据当前状态选择一个动作(值)。它根据探索概率(epsilon)决定是进行探索(随机动作)还是利用(预测动作)。
best_state():方法接收一个状态集合,并根据预测的最高分数返回最佳状态。它可以根据一定概率(epsilon)进行探索或利用最佳预测状态。
train():方法用于训练神经网络,使用从回放缓冲器中抽样的经验。它使用Q-learning的更新规则来调整神经网络权重,以更好地逼近Q函数。
将写好的俄罗斯方块游戏和DQNAgent类的强化学习策略进行结合,通过训练和评估在Tetris游戏中的性能,以寻找最佳策略来玩这个游戏。实现的过程如下:
1.定义了run_model
函数,用于训练和评估DQN代理的性能。在每个回合(episode)中,代理在Tetris游戏环境中执行动作,并收集游戏得分。代码实现如下:
def run_model(dir_name, episodes=100, render=False):
env = Tetris()
epsilon_stop_episode = 1500
mem_size = 20000
discount = 0.95
replay_start_size = 2000
n_neurons = [32, 32]
activations = ['relu', 'relu', 'linear']
agent = DQNAgent(env.get_state_size(), n_neurons=n_neurons, activations=activations,
epsilon_stop_episode=epsilon_stop_episode, mem_size=mem_size, discount=discount,
replay_start_size=replay_start_size)
model_path = '../checkpoints/' + dir_name + '/model.hdf'
agent.model = load_model(model_path)
agent.epsilon = 0
scores = []
for episode in range(episodes):
env.reset()
game_over = False
while not game_over:
next_states = env.get_next_states()
best_state = agent.best_state(next_states.values())
# find the action, that corresponds to the best state
best_action = None
for action, state in next_states.items():
if state == best_state:
best_action = action
break
_, game_over = env.hard_drop([best_action[0], 0], best_action[1], render)
scores.append(env.score)
print(f'episode {episode} => {env.score}')
return scores
2.定义run_model_helper
函数,用于运行多个训练过程并评估它们的性能。该函数加载预先训练好的模型,并在每个目录下执行run_model
函数,输出训练得分的最大值和对应目录的名称。代码如下:
def run_model_helper(episodes=128, render=False):
dirs = [name for name in os.listdir('../checkpoints') if os.path.isdir(os.path.join('../checkpoints', name))]
dirs.sort(reverse=True)
dirs = [dirs[0]]
max_scores = []
for directory in dirs:
print(f"Evaluating dir '{directory}'")
scores = run_model(directory, episodes, render)
max_scores.append((directory, max(scores)))
max_scores.sort(key=lambda t: t[1], reverse=True)
for k, v in max_scores:
print(f"{v}\t{k}")
训练过程如下:
三、实现自动玩俄罗斯方块游戏
首先实现一个简单的俄罗斯方块基本游戏,实现的逻辑过程如下:
-
定义游戏地图和方块的常量:在游戏中,定义了游戏地图的大小、方块的大小、颜色等常量,并存储在相应的类属性中。
-
初始化游戏状态:在Tetris类的构造函数中,初始化了游戏的各种状态,包括当前方块位置、旋转角度、游戏得分等,以及游戏地图和下一个方块的预览板状态。
-
重置游戏状态:通过reset方法,重置游戏状态,重新开始一局游戏。重置后,将清空游戏地图,生成新的随机方块,并更新下一个方块的预览板。
-
方块的旋转与移动:游戏中的方块可以通过W键进行顺时针旋转,A键向右移动一列,S键向下移动一行,D键向左移动一列。这些操作在游戏中通过调整当前方块的位置和旋转角度来实现。
-
方块的硬降:在游戏中,通过按下空格键,可以将当前方块快速降落到底部。为了实现这个功能,游戏会不断检测方块是否可以继续向下移动,直到无法移动为止。
-
方块的落地和消除:当方块无法再继续向下移动时,将方块固定在游戏地图上,并判断是否有可消除的行。如果有,则消除行并增加玩家得分。
-
下一个方块的预览:游戏界面上会显示下一个方块的预览板,让玩家提前了解下一个方块的形状。
-
游戏结束判断:游戏判断是否结束的条件是当前方块在初始位置无法继续向下移动。
-
获取游戏状态特征:游戏通过一系列特征函数来提取当前游戏状态的特征,例如已消除行数、空洞数量、高度差等,用于在强化学习中作为输入来训练智能体。
-
图形渲染:游戏界面使用cv2库进行图形渲染,将游戏状态以图形化形式显示在屏幕上。
该游戏可以实现自己玩,控制方法为:
W - 将方块顺时针旋转90度
A - 将方块向右移动一列
S - 将方块向下移动一行
D - 将方块向左移动一列
空格键 - 快速落下方块
ESC - 退出游戏
自己玩游戏的过程如下:
AI方法玩游戏的过程如下:(非常快)
全部代码链接:
https://download.csdn.net/download/weixin_40651515/88114773
运行配置环境:tensorflow==1.14.0 tensorboard==1.14.0 keras==2.2.4 opencv==4.7.0.72 numpy==1.21.6 pillow==5.4.1 tqdm==4.31.1等