Source code for emdp.gridworld.builder_tools

"""
Utilities to help build more complex grid worlds.
"""
import numpy as np
from typing import List, Tuple, Dict
from . import GridWorldMDP
from .helper_utilities import (build_simple_grid,
                               flatten_state)


[docs]class TransitionMatrixBuilder(object): """ Builder object to build a transition matrix for a grid world """ def __init__(self, grid_size, action_space=4, has_terminal_state=True): self.has_terminal_state = has_terminal_state self.grid_size = grid_size self.action_space = action_space self.state_space = grid_size * grid_size + int(has_terminal_state) self._P = np.zeros((self.state_space, self.action_space, self.state_space)) self.grid_added = False self.P_modified = False
[docs] def add_grid(self, terminal_states: List[int] = None, p_success: float = 1): """Adds a grid so that you cant walk off the edges of the grid Args: terminal_states (List[int], optional): Terminal states. Defaults to ``[]``. p_success (float, optional): Defaults to 1. Raises: ValueError """ if terminal_states is None: terminal_states = [] if self.has_terminal_state and len(terminal_states) == 0: raise ValueError('has_terminal_states is true, but no terminal states supplied.') if self.grid_added: raise ValueError('Grid has already been added') if self.P_modified: raise ValueError('transition matrix has already been modified. ' 'Adding a grid now can lead to weird behaviour') self._P = build_simple_grid(size=self.grid_size, p_success=p_success, terminal_states=terminal_states) self.grid_added = True self.P_modified = True
[docs] def add_wall_at(self, tuple_location): """ Add a blockade at this position :param tuple_location: (x,y) location of the wall :return: """ target_state = flatten_state(tuple_location, self.grid_size, self.state_space) target_state = target_state.argmax() # find all the ways to go to "target_state" # from_states contains states that can lead you to target_state by executing from_action from_states, from_actions = np.where(self._P[:, :, target_state] != 0) # get the transition probability distributions that go from s--> t via some action transition_probs_from = self._P[from_states, from_actions, :] # TODO: optimize this loop for i, from_state in enumerate(from_states): # enumerate over states tmp = transition_probs_from[i, target_state] # get the prob of transitioning transition_probs_from[i, target_state] = 0 # set it to zero transition_probs_from[i, from_state] += tmp # add the transition prob to staying in the same place self._P[from_states, from_actions, :] = transition_probs_from # Get the probability of going to any state for all actions from target_state. transition_probs_from_wall = self._P[target_state, :, :] for i, probs_from_action in enumerate(transition_probs_from_wall): # Reset the probabilities. transition_probs_from_wall[i, :] = 0.0 # Set the probability of going to the target state to be 1.0 transition_probs_from_wall[i, target_state] = 1.0 # Now set the probs of going to any state from target state as above (i.e only targets). self._P[target_state, :, :] = transition_probs_from_wall # renormalize and update transition matrix. normalization = self._P.sum(2) # normalization[normalization == 0] = 1 normalization = 1 / normalization self._P = (self._P * np.repeat(normalization, self._P.shape[0]).reshape(*self._P.shape)) assert np.allclose(self._P.sum(2), 1), 'Normalization did not occur correctly: {}'.format(self._P.sum(2)) assert np.allclose(self._P[target_state, :, target_state], 1.0), 'All actions from wall should lead to wall!' self._P_modified = True
@property def P(self, nocopy=False): """Returns a new array with the transition matrix built so far. Args: nocopy (bool, optional): Defaults to False. Returns: np.array: the transition model matrix """ if nocopy: return self._P else: return self._P.copy()
[docs] def add_wall_between(self, start: Tuple[int, int], end: Tuple[int, int]): """ Adds a wall between the starting and ending location Args: start (Tuple[int,int]): tuple (x,y) representing the starting position of the wall end (Tuple[int,int]): tuple (x,y) representing the ending position of the wall Raises: ValueError """ if not (start[0] == end[0] or start[1] == end[1]): raise ValueError('Walls can only be drawn in straight lines. ' 'Therefore, at least one of the x or y between ' 'the states should match.') if start[0] == end[0]: direction = 1 else: direction = 0 constant_idx = start[int(not direction)] start_idx = start[direction] end_idx = end[direction] if end_idx < start_idx: # flip start and end directions # to ensure we can still draw walls start_idx, end_idx = end_idx, start_idx for i in range(start_idx, end_idx + 1): my_location = [None, None] my_location[direction] = i my_location[int(not direction)] = constant_idx print(my_location) self.add_wall_at(tuple(my_location))
[docs]def create_reward_matrix(state_space, size, reward_spec: Dict[Tuple[int, int], float], action_space=4): """ Abstraction to create reward matrices. Args: state_space(int): Size of the state space, :math:`|\mathcal{S}|`. size(int): size of the gird world (width or height). reward_spec(Dict[Tuple[int,int], float]): the reward specification. action_space(int): the size of the action space Returns: np.ndarray: the reward matrix. """ R = np.zeros((state_space, action_space)) for (reward_location, reward_value) in reward_spec.items(): reward_location = flatten_state(reward_location, size, state_space).argmax() R[reward_location, :] = reward_value return R
""" Simple builders for gridworlds """
[docs]def build_simple_grid_world_with_terminal_states(reward_spec, size, p_success=1, gamma=0.99, seed=2017, start_state=0): """ A simple size x size grid world where agents actions has a prob of p_success of executing correctly. rewards are given by a dict where the indices and the x,y positions and the value is the magnitude of the reward. Upon reaching a state with a reward, every action gives a reward. The episode then goes to an absorbing state and terminates. :param reward_spec: Reward specification :param size: Size of the gridworld (grid world will be size x size) :param p_success: The probability the action is successful. :param gamma: The discount factor. :param seed: Seed for the GridWorldMDP object. :param start_state: The index of the starding state. :return: """ P = build_simple_grid(size=size, terminal_states=reward_spec.keys(), p_success=p_success) R = create_reward_matrix(P.shape[0], size, reward_spec, action_space=4) p0 = np.zeros(P.shape[0]) p0[start_state] = 1 return GridWorldMDP(P, R, gamma, p0, terminal_states=reward_spec.keys(), size=size, seed=seed)
[docs]def build_simple_grid_world_without_terminal_states(reward_spec, size, p_success=1, gamma=0.99, seed=2017, start_state=0): """ A simple size x size grid world where agents actions has a prob of p_success of executing correctly. rewards are given by a dict where the indices and the x,y positions and the value is the magnitude of the reward. Upon reaching a state with a reward, every action gives a reward. The episode does not terminate. :param reward_spec: Reward specification :param size: Size of the gridworld (grid world will be size x size) :param p_success: The probability the action is successful. :param gamma: The discount factor. :param seed: Seed for the GridWorldMDP object. :param start_state: The index of the starting state. :return: """ P = build_simple_grid(size=size, terminal_states=[], p_success=p_success) R = create_reward_matrix(P.shape[0], size, reward_spec, action_space=4) p0 = np.zeros(P.shape[0]) p0[start_state] = 1 return GridWorldMDP(P, R, gamma, p0, terminal_states=[], size=size, seed=seed)