Lesson2-1 MDP、Q表格
一、强化学习MDP四元组<S,A,P,R>
- S:state 状态
- A:action 动作
- R:reward 奖励 r[st,at]
- P:probability 状态转移概率 p[st+1,rt|st,at]
强化学习是一系列决策的过程,通过当前的环境状态和收到的奖励来决定下一次执行的动作
- Model-based: P函数和R函数已知
- Model-free:P函数和R函数未知
二、Q表格:状态动作价值
Q(st,at) | 装死 | 跑 | action… |
---|---|---|---|
St | 0 | -90 | … |
St+1 | 0 | 10 | … |
state… | … | … | … |
未来总收益引出:如果一辆车闯红灯,他扣一分。但是如果它运送病人,闯红灯扣一分,把病人送到病人加一百分,所以这时候就应该闯红灯。强化学习目光放长远一点决策。
但是目光也不能太过长远,如股票,考虑太长远的反而不合理,所以需要引入衰减因子。
-
Reward 折扣因子γ,未来总收益计算需要从终点开始往前计算
- Gt=Rt+1+γRt+2+γ2Rt+2+γ3Rt+2+…
-
使用例子来说明强化学习更新Q表格
Q表格用来指导每一步动作。每走一步,会更新一次Q表格,用下一个状态的Q值去更新当前状态的Q值
- 大佬讲解
Lesson2-2-强化概念、TD更新、Sarsa引入
解释:食物对小狗有一种无条件的刺激,而铃声是中性的,一开始并不会对小狗有刺激;而如果在每次喂食前先响一下铃,重复多次后,响铃对小狗来说也会产生一种刺激,会开始流口水,也就是说,声音会代表着有食物,对小狗来说也就有了价值
巴普洛夫实验强调的是中性的条件刺激在跟无条件刺激紧紧挨着的时候,经过反复多次,这种中性的条件刺激也能引起和无条件刺激一样的条件反应。
中性刺激和无条件刺激紧紧挨着,中性刺激也能引起无条件反应。
解释:假设人走在树林里,先看到树上有熊爪后看到熊,接着就看到熊发怒了,经过很多次之后,原来要见到熊才瑟瑟发抖的,后来只要见到树上有熊爪就会有晕眩和害怕的感觉。也就是说,在不断地训练之后,下一个状态的价值可以不断地强化、影响上一个状态的价值
斯坦福大学的在线实验:状态价值迭代
原文链接:https://cs.stanford.edu/people/karpathy/reinforcejs/gridworld_td.html
Temporal Difference 时序查分(单步更新)
Q(St,At) ⭠ Q(St,At)+α[Rt+1+γQ(St+1,At+1)-Q(St,At)]
-
用下一个Q来更新当前Q
-
Target目标值 Rt+1+γQ(St+1,At+1)
-
当前值 Q(St,At)
-
软更新 用 Rt+1+γQ(St+1,At+1)-Q(St,At) 软更新Q(St,At)
-
逼近 Q 理想中的值
- Sarsa 算法
智能体每次跟环境交互一次以后,就可以从环境当中拿到一个状态和收益(智能体主要依据Q表格选动作),然后拿到这些值以后,就用来更新Q表格
Lesson 2-3 Sarsa算法介绍
- 伪代码
obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)
action = agent.sample(obs) # 根据 Q表格 选择一个动作
while True:
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
next_action = agent.sample(next_obs) # 根据算法选择一个动作
# 训练 Sarsa 算法
agent.learn(obs, action, reward, next_obs, next_action, done) # 更新 Q表格
action = next_action
obs = next_obs # 存储上一个观察值
- ε-greedy
有一定概率选择已有的最优动作也有一定的概率去将没有使用过的动作利用起来
- 伪代码
# 根据输入观察值,采样输出的动作值,带探索
def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作
action = self.predict(obs)
else:
action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作
return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个action ,所以随机选择一个
action = np.random.choice(action_list)
return action
- Sarsa与环境交互代码,每个Step都更新learn一下
def run_episode(env, agent, render=False):
total_steps= 0#记最每个episode走了多少step
total_reward = 0
obs = env.reset() # 重置环境,重新开一局(即开始新的一个episode)
action = agent.sample(obs)# 根据算法选择一个动作
while True:
next_obs, reward, done, _ = env.step(action) # 与环境进行一次交互
next_action = agent.sampte(next_obs) # 根据算法选择一个动作
# 训练 Sarsa 算法
agent.learn(obs, action, reward, next_obs, next_action, done)
action = next_action
obs = next_obs # 存储上一个观察值
total_reward += reward
total_steps += 1 #计算step数
if render:
env.render() #渲染新的一帧图片
if done:
break
return total_reward, total_steps
- Sarsa Agent ①根据Q表格选动作
class SarsaAgent(object):
def __init__(self,obs_n,act_n,learning_rate=0.01,gamma=0.9,e_greed=0.1):
self.act_n=act_n # 动作维度,有几个动作可选
self.lr=learning_rate # 学习率
self.gamma=gamma # reward的衰减率
self.epsilon=e_greed #按一定的概率随机选动作
self.Q=np.zeros((obs_n,act_n)) #Q表格
# 根据输入观察值,采样输出的动作值,带探索
def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作
action = self.predict(obs)
else:
action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作
return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个action ,所以随机选择一个
action = np.random.choice(action_list)
return action
- Sarsa Agent ②更新Q表格
# 学习方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, next_action, done):
""" on-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * self.Q[next_obs, next_action] # Sarsa
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
公式里的α就是学习率learning_rate,代码也是完全按照公式写的,另外,如果一个回合结束了,就没有下一个状态了,这一步代码要记得加上,否则可能无法收敛
- main函数写法
import gym
from gridworld import CliffwalkingWapper
from agent import SarsaAgent
def main():
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffwalkingWapper(env)
agent = SarsaAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
is render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward
%.1f' %(episode, ep_steps, ep_reward))
# 每隔 20个 episode 渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode (env, agent)
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done,= env.step(action)
total_ reward + reward
obs = next_ obs
time. sleep(0.5)
env. render()
if done:
print ('test reward = %.1f' %(total reward))
break
Sarsa 会远离悬崖走,比较小心
Off-Policy
实际上有两种不同的策略,期望得到最佳的目标策略和大胆探索的行为策略
On-Policy
on-policy优化的实际上是它实际执行的策略,用下一步一定会执行的动作action来优化Q表格,所以on-policy其实只存在一种策略 ,用同一种策略去选取和优化
Q-learning vs Sarsa
Q-learning解析
Q-learning也是采用Q表格的方式存储Q值(状态动作价值),决策部分与Sarsa是一样的,采用ε-greedy方式增加探索。
Q-learning跟Sarsa不一样的地方是更新Q表格的方式。
Sarsa是on-policy的更新方式,先做出动作再更新。
Q-learning是off-policy的更新方式,更新learn()时无需获取下一步实际做出的动作next_action,并假设下一步动作是取最大Q值的动作。
Q-learning 默认使用的是下一个状态的Q最大动作
Q-learning Agent①根据Q表格选懂动作
class QLearningAgent(object):
def __init__(self, obs_n, act_n, learning_ rate=0.01, gamma=0.9, e_greed=0.1):
self.act_n = act_n # 动作维度,有几个动作可选
self.lr = learning_rate # 学习率
self.gamma = gamma # reward的衰减率
self.epsilon = e_greed # 按一定的概率选动作
self.Q = np.zeros((obs_n, act_n))
# 根据输入的观察值,采样输出的动作值,带探索
def sample(self, obs):
if np. random. uniform(0, 1) < (1.0 - self.epsilon): # 根据table 的Q值选动作
action = self.predict (obs)
else:
action = np.random.choice(self.act_n) # 有一定概率随机探索选取一个动作
return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ=np.max(Q_list)
action_list = np.where(Q_list - maxQ)[0] # maxQ可能对应多个action
action = np.random.choice(action_list)
return action
Q-learning Agent ②更新Q表格
# 学习方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, done):
""" off-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * np.max(self.Q[next_obs, :]) # Q-learning
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
运行展示
import gym
from gridworld import CliffwalkingWapper
from agent import QLearningAgent
def main():
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffwalkingWapper(env)
agent = QLearningAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
is render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward
%.1f' %(episode, ep_steps, ep_reward))
# 每隔 20个 episode 渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode (env, agent)
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done,= env.step(action)
total_ reward + reward
obs = next_ obs
time. sleep(0.5)
env. render()
if done:
print ('test reward = %.1f' %(total reward))
break
代码lesson2/q_learning
总结
课后作业
- 使用Q-learning和Sarsa解决16宫格寻迹问题
- 可以使用FrozenLakeWapper美化render界面
- 注意is_slippery参数
[界面链接](#https://github.com/openai/gym/wiki/FrozenLake-v0
from gridworld import FrozenLake Wapper
env = gym.make(“FrozenLake-vO”, is_ slippery=False) # 0 left, 1 down, 2 right, 3 up
env = FrozenLake Wapper(env))
from gridworld import FrozenLakeWapper
env = gym.make("FrozenLake-v0",is_slippery=False)#0 left,1 down,2 right,3 up
env = FrozenLakeWapper(env)
gridworld.py 使用指南
# 环境1:FrozenLake,可以配置冰面是否是滑的
env = gym.make ("FrozenLake-v0", is_slippery=False) # 0 left, I down, 2 right, 3 up
env = FRozenLakeWapper(env)
# 环境2: CLiffWalking,悬崖环境
env = gym.make ("Cliffwalking-v0") # 0 up, 1 right, 2 down, 3 left
en = CliffwalkingWapper(env)
# 环境3:自定义格子世界,可以配置地固,S为出发点Start,F为平地Floor, H为洞Hole,G为出口目标Goal
gridmap = [
'SFFF'
'FHFF'
'FFFF'
'HFGF']
env = Gridworld(gridmap)
1.资料引用
原文链接:https://blog.csdn.net/zbp_12138/article/details/106837306
2.视频学习
原文链接:https://www.bilibili.com/video/BV1yv411i7xd?p=4&vd_source=4aa97afe0ec7d8f686f99d90d7c6f76e