目录
1. 概要
2. 实现
3. 运行结果
CliffWalking-v0是gym库中的一个例子[1],是从Sutton-RLbook-2020的Example6.6改编而来。不过本文不是关于gym中的CliffWalking-v0如何玩的,而是关于基于策略迭代求该问题最优解的实现例。
CliffWalking-v0的游戏环境是一个4*12的网格(如上图【1】所示)。游戏规则如下:
Agent从左下角出发,在每个网格中,可以采取{UP,DOWN,RIGHT,LEFT}中任意一个动作。但是,如果采取动作后会越出边界的话,就退回原地。到达右下角的网格的话,一局游戏结束。
最下面一排网格中除了左下角(出发网格)和右下角(目标网格)以外,是所谓的悬崖网格,如果采取行动后掉入悬崖网格,会得到-100点的奖励(或者说惩罚),并且会被直接扔回出发点。其它情况下,每次行动有-1点的奖励(或者说惩罚)。Agent必需最小化到达目标网格的开销(最大化奖励,或者说最小化惩罚)。
这个游戏非常简单,不用计算,直觉就可以知道,最优策略是:在出发点向上走一格;然后在第3行一路右行;到达最右侧后向下移动一格后即到达目标网格。总的奖励是-13点。
以下给出基于策略迭代算法来求解这个问题的最优策略,看看能不能得出以上直觉上的最优策略。
CliffWalking-v0游戏的环境设定类似于GridWorld,所以这里采用了类似于GridWorld的状态表示方法。环境类对象创建时,用一个二维数组表示网格环境中各cell的类型,“1”表示Terminate cell;“-1”表示Cliff cells;“0”表示其它cells。如下所示:
grid = [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]grid[3][11] = 1 # Terminate cellfor k in range(1,11):grid[3][11] = -1 # Cliff cells
环境的转移状态函数P(s’,r|s,a)用Environment::transit_func()实现,如下所示:
def transit_func(self, state, action):"""Prob(s',r|s,a) stored in one dict[(s',reward)]."""transition_probs = {}if not self.can_action_at(state):# Already on the terminal cell.return transition_probsopposite_direction = Action(action.value * -1)for a in self.actions:prob = 0if a == action:prob = self.move_probelif a != opposite_direction:prob = (1 - self.move_prob) / 2next_state = self._move(state, a)if next_state.row == (self.row_length - 1) and 0 < next_state.column < (self.column_length - 1):reward = -100next_state = State(self.row_length - 1, 0) # Return to start grid when falls into cliff grid.else:reward = -1if (next_state,reward) not in transition_probs:transition_probs[(next_state,reward)] = probelse:transition_probs[(next_state,reward)] += probreturn transition_probsdef can_action_at(self, state):'''Assuming:grid[i][j] = 1: Terminate gridgrid[i][j] =-1: Cliff gridsgrid[i][j] = 0: Other grids'''if self.grid[state.row][state.column] == 0:return Trueelse:return Falsedef _move(self, state, action):"""Predict the next state upon the combination of {state, action}{state, action} --> next_stateCalled in transit_func()"""if not self.can_action_at(state):raise Exception("Can't move from here!")next_state = state.clone()# Execute an action (move).if action == Action.UP:next_state.row -= 1elif action == Action.DOWN:next_state.row += 1elif action == Action.LEFT:next_state.column -= 1elif action == Action.RIGHT:next_state.column += 1# Check whether a state is out of the grid.if not (0 <= next_state.row < self.row_length):next_state = stateif not (0 <= next_state.column < self.column_length):next_state = state# Entering into cliff grids is related to the correspong penalty and # reset to start grid, hence will be handled upper layer.return next_state
Planner类实现一个规划基类,进一步PolicyIterationPlanner类作为Planner子类实现了基于策略迭代的规划器,其中核心就是PolicyIterationPlanner:: policy_evaluation() 和 PolicyIterationPlanner::plan()。策略迭代算法在上一篇(RL笔记:动态规划(2): 策略迭代)中已经介绍,此处不再赘述。
PolicyIterationPlanner:: policy_evaluation()实现的是策略评估,如下所示:
def policy_evaluation(self, gamma, threshold):V = {}for s in self.env.states:# Initialize each state's expected reward.V[s] = 0while True:delta = 0for s in V:expected_rewards = []for a in self.policy[s]:action_prob = self.policy[s][a]r = 0for prob, next_state, reward in self.transitions_at(s, a):r += action_prob * prob * \(reward + gamma * V[next_state])expected_rewards.append(r)value = sum(expected_rewards)delta = max(delta, abs(value - V[s]))V[s] = valueif delta < threshold:breakreturn V
PolicyIterationPlanner::plan()则实现了完整的策略迭代算法(策略评估部分调用了policy_evaluation())代码如下所示:
def plan(self, gamma=0.9, threshold=0.0001):"""Implement the policy iteration algorithmgamma : discount factorthreshold: delta for policy evaluation convergency judge."""self.initialize()states = self.env.statesactions = self.env.actionsdef take_max_action(action_value_dict):return max(action_value_dict, key=action_value_dict.get)while True:update_stable = True# Estimate expected rewards under current policy.V = self.policy_evaluation(gamma, threshold)self.log.append(self.dict_to_grid(V))for s in states:# Get an action following to the current policy.policy_action = take_max_action(self.policy[s])# Compare with other actions.action_rewards = {}for a in actions:r = 0for prob, next_state, reward in self.transitions_at(s, a):r += prob * (reward + gamma * V[next_state])action_rewards[a] = rbest_action = take_max_action(action_rewards)if policy_action != best_action:update_stable = False# Update policy (set best_action prob=1, otherwise=0 (greedy))for a in self.policy[s]:prob = 1 if a == best_action else 0self.policy[s][a] = prob# Turn dictionary to gridself.V_grid = self.dict_to_grid(V)self.iters = self.iters + 1print('PolicyIteration: iters = {0}'.format(self.iters))self.print_value_grid()print('******************************')if update_stable:# If policy isn't updated, stop iterationbreak
运行结果如下(右下角可以忽视,因为到达右下角后游戏结束了,不会再有进一步的行动了):
由此可见,以上实现的确得出了跟直感相同的最优策略。
完整代码参见:reinforcement-learning/CliffWalking-v0.py
本强化学习之学习笔记系列总目录参见:强化学习笔记总目录
[1] Cliff Walking - Gym Documentation (gymlibrary.dev)