大家好,今天和各位分享一下深度强化学习中的近端策略优化算法(proximal policy optimization,PPO),并借助 OpenAI 的 gym 环境完成一个小案例,完整代码可以从我的 GitHub 中获得:
https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model
1. 算法原理
PPO 算法之所以被提出,根本原因在于 Policy Gradient 在处理连续动作空间时 Learning rate 取值抉择困难。Learning rate 取值过小,就会导致深度强化学习收敛性较差,陷入完不成训练的局面,取值过大则导致新旧策略迭代时数据不一致,造成学习波动较大或局部震荡。除此之外,Policy Gradient 因为在线学习的性质,进行迭代策略时原先的采样数据无法被重复利用,每次迭代都需要重新采样;
同样地置信域策略梯度算法(Trust Region Policy Optimization,TRPO)虽然利用重要性采样(Important-sampling)、共轭梯度法求解提升了样本效率、训练速率等,但在处理函数的二阶近似时会面临计算量过大,以及实现过程复杂、兼容性差等缺陷。
PPO 算法具备 Policy Gradient、TRPO 的部分优点,采样数据和使用随机梯度上升方法优化代替目标函数之间交替进行,虽然标准的策略梯度方法对每个数据样本执行一次梯度更新,但 PPO 提出新目标函数,可以实现小批量更新。
鉴于上述问题,该算法在迭代更新时,观察当前策略在 t 时刻智能体处于状态 s 所采取的行为概率,与之前策略所采取行为概率 ,计算概率的比值来控制新策略更新幅度,比值 记作:
若新旧策略差异明显且优势函数较大,则适当增加更新幅度;若 比值越接近 1,表明新旧策略差异越小。
优势函数代表,在状态 s 下,行为 a 相对于均值的偏差。在论文中,优势函数 使用 GAE(generalized advantage estimation)来计算:
PPO 算法可依据 Actor 网络的更新方式细化为含有自适应 KL-散度(KL Penalty)的 PPO-Penalty 和含有 Clippped Surrogate Objective 函数的 PPO-Clip。
(1)PPO-Penalty 基于KL 惩罚项优化目标函数,实验证明惩罚项系数 在迭代过程中并非固定值,需要动态调整惩罚权重,其目标函数 L 可以定义为:
惩罚项 的初始值的选择对算法几乎无影响,原因是它能在每次迭代时依据新旧策略的 KL 散度做适宜调整,首先设置 KL 散度阈值 ,再通过下面的表达式计算 :
如果 时,证明散度较小,需要弱化惩罚力度, 调整为 ;
如果 时,证明散度较大,需要增强惩罚力度, 调整为 。
(2)PPO-Clip 直接对新旧策略比例进行一定程度的 Clip 操作,以约束变化幅度。其目标函数的计算方式如下:
其中, 代表截断超参数,一般设定值为 0.2; 表示截断函数,负责限制比例 在 区间之内,以保证收敛性;最终 借助 函数选取未截断与截断目标之间的更小值,形成目标下限。 可以分为优势函数 A 为正数和负数两种情况,其变化趋势如下图所示:
如果优势函数为正数,需要增大新旧策略比值 ,然而当 时,将不提供额外的激励;如果优势函数是负数,需要减少新旧策略比值 ,但在 时,不提供额外的激励,这使得新旧策略的差异被限制在合理范围内。PPO 本质上基于 Actor-Critic 框架,算法流程如下:
PPO 算法主要由 Actor 和 Critic 两部分构成,Critic 部分更新方式与其他Actor-Critic 类型相似,通常采用计算 TD error(时序差分误差)形式。对于 Actor 的更新方式,PPO 可在KLPENL 、CLIPL 之间选择对于当前实验环境稳定性适用性更强的目标函数,经过 OpenAI 研究团队实验论证,PPO- Clip 比 PPO- Penalty有更好的数据效率和可行性。
2. 代码实现
下面我就采用 Clip 形式的 PPO。模型构建代码如下。下面的模型适用于 action 是离散的情况,连续情况的代码可以从我的 GitHub 中获取。
# 代码用于离散环境的模型import numpy as npimport torchfrom torch import nnfrom torch.nn import functional as F# ----------------------------------- ## 构建策略网络--actor# ----------------------------------- #class PolicyNet(nn.Module): def __init__(self, n_states, n_hiddens, n_actions): super(PolicyNet, self).__init__() self.fc1 = nn.Linear(n_states, n_hiddens) self.fc2 = nn.Linear(n_hiddens, n_actions) def forward(self, x): x = self.fc1(x) # [b,n_states]-->[b,n_hiddens] x = F.relu(x) x = self.fc2(x) # [b, n_actions] x = F.softmax(x, dim=1) # [b, n_actions] 计算每个动作的概率 return x# ----------------------------------- ## 构建价值网络--critic# ----------------------------------- #class ValueNet(nn.Module): def __init__(self, n_states, n_hiddens): super(ValueNet, self).__init__() self.fc1 = nn.Linear(n_states, n_hiddens) self.fc2 = nn.Linear(n_hiddens, 1) def forward(self, x): x = self.fc1(x) # [b,n_states]-->[b,n_hiddens] x = F.relu(x) x = self.fc2(x) # [b,n_hiddens]-->[b,1] 评价当前的状态价值state_value return x# ----------------------------------- ## 构建模型# ----------------------------------- #class PPO: def __init__(self, n_states, n_hiddens, n_actions, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device): # 实例化策略网络 self.actor = PolicyNet(n_states, n_hiddens, n_actions).to(device) # 实例化价值网络 self.critic = ValueNet(n_states, n_hiddens).to(device) # 策略网络的优化器 self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) # 价值网络的优化器 self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr = critic_lr) self.gamma = gamma # 折扣因子 self.lmbda = lmbda # GAE优势函数的缩放系数 self.epochs = epochs # 一条序列的数据用来训练轮数 self.eps = eps # PPO中截断范围的参数 self.device = device # 动作选择 def take_action(self, state): # 维度变换 [n_state]-->tensor[1,n_states] state = torch.tensor(state[np.newaxis, :]).to(self.device) # 当前状态下,每个动作的概率分布 [1,n_states] probs = self.actor(state) # 创建以probs为标准的概率分布 action_list = torch.distributions.Categorical(probs) # 依据其概率随机挑选一个动作 action = action_list.sample().item() return action # 训练 def learn(self, transition_dict): # 提取数据集 states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).to(self.device).view(-1,1) rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).to(self.device).view(-1,1) next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'], dtype=torch.float).to(self.device).view(-1,1) # 目标,下一个状态的state_value [b,1] next_q_target = self.critic(next_states) # 目标,当前状态的state_value [b,1] td_target = rewards + self.gamma * next_q_target * (1-dones) # 预测,当前状态的state_value [b,1] td_value = self.critic(states) # 目标值和预测值state_value之差 [b,1] td_delta = td_target - td_value # 时序差分值 tensor-->numpy [b,1] td_delta = td_delta.cpu().detach().numpy() advantage = 0 # 优势函数初始化 advantage_list = [] # 计算优势函数 for delta in td_delta[::-1]: # 逆序时序差分值 axis=1轴上倒着取 [], [], [] # 优势函数GAE的公式 advantage = self.gamma * self.lmbda * advantage + delta advantage_list.append(advantage) # 正序 advantage_list.reverse() # numpy --> tensor [b,1] advantage = torch.tensor(advantage_list, dtype=torch.float).to(self.device) # 策略网络给出每个动作的概率,根据action得到当前时刻下该动作的概率 old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach() # 一组数据训练 epochs 轮 for _ in range(self.epochs): # 每一轮更新一次策略网络预测的状态 log_probs = torch.log(self.actor(states).gather(1, actions)) # 新旧策略之间的比例 ratio = torch.exp(log_probs - old_log_probs) # 近端策略优化裁剪目标函数公式的左侧项 surr1 = ratio * advantage # 公式的右侧项,ratio小于1-eps就输出1-eps,大于1+eps就输出1+eps surr2 = torch.clamp(ratio, 1-self.eps, 1+self.eps) * advantage # 策略网络的损失函数 actor_loss = torch.mean(-torch.min(surr1, surr2)) # 价值网络的损失函数,当前时刻的state_value - 下一时刻的state_value critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) # 梯度清0 self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() # 反向传播 actor_loss.backward() critic_loss.backward() # 梯度更新 self.actor_optimizer.step() self.critic_optimizer.step()
3. 案例演示
基于 OpenAI 的 gym 环境完成一个推车游戏,一个离散的环境,目标是左右移动小车将黄色的杆子保持竖直。动作维度为2,属于离散值;状态维度为 4,分别是坐标、速度、角度、角速度。
import numpy as npimport matplotlib.pyplot as pltimport gymimport torchfrom RL_brain import PPOdevice = torch.device('cuda') if torch.cuda.is_available() \else torch.device('cpu')# ----------------------------------------- ## 参数设置# ----------------------------------------- #num_episodes = 100 # 总迭代次数gamma = 0.9 # 折扣因子actor_lr = 1e-3 # 策略网络的学习率critic_lr = 1e-2 # 价值网络的学习率n_hiddens = 16 # 隐含层神经元个数env_name = 'CartPole-v1'return_list = [] # 保存每个回合的return# ----------------------------------------- ## 环境加载# ----------------------------------------- #env = gym.make(env_name, render_mode="human")n_states = env.observation_space.shape[0] # 状态数 4n_actions = env.action_space.n # 动作数 2# ----------------------------------------- ## 模型构建# ----------------------------------------- #agent = PPO(n_states=n_states, # 状态数 n_hiddens=n_hiddens, # 隐含层数 n_actions=n_actions, # 动作数 actor_lr=actor_lr, # 策略网络学习率 critic_lr=critic_lr, # 价值网络学习率 lmbda = 0.95, # 优势函数的缩放因子 epochs = 10, # 一组序列训练的轮次 eps = 0.2, # PPO中截断范围的参数 gamma=gamma, # 折扣因子 device = device )# ----------------------------------------- ## 训练--回合更新 on_policy# ----------------------------------------- #for i in range(num_episodes): state = env.reset()[0] # 环境重置 done = False # 任务完成的标记 episode_return = 0 # 累计每回合的reward # 构造数据集,保存每个回合的状态数据 transition_dict = { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [], } while not done: action = agent.take_action(state) # 动作选择 next_state, reward, done, _, _ = env.step(action) # 环境更新 # 保存每个时刻的状态\动作\... transition_dict['states'].append(state) transition_dict['actions'].append(action) transition_dict['next_states'].append(next_state) transition_dict['rewards'].append(reward) transition_dict['dones'].append(done) # 更新状态 state = next_state # 累计回合奖励 episode_return += reward # 保存每个回合的return return_list.append(episode_return) # 模型训练 agent.learn(transition_dict) # 打印回合信息 print(f'iter:{i}, return:{np.mean(return_list[-10:])}')# -------------------------------------- ## 绘图# -------------------------------------- #plt.plot(return_list)plt.title('return')plt.show()
训练100回合,绘制每回合的 return