import numpy as np
from ..actions import LEFT, RIGHT, UP, DOWN
from ..exceptions import InvalidActionError
from typing import List, Tuple
n_actions = 4
[docs]def is_P_valid_stochastic(P: np.ndarray) -> bool:
"""return ``True`` is transition model ``P`` is a valid stochastic transition model.
:math:`P` is a valid stochastic transition model if
.. math::
\sum_{s'\in\mathcal{S}} Pr(s'|s,a) = 1
Args:
P (np.ndarray): transition model.
Returns:
bool:
"""
return np.allclose(P.sum(axis=2), 1)
[docs]def flatten_state(state, size, state_space):
"""Flatten state as (row, col) pair into a one-hot vector.
Example:
>>> flatten_state((1,2), 3, 9)
array([0, 0, 0, 0, 0, 1, 0, 0, 0], dtype=int32)
Args:
state (Tuple[int, int]): (row, col) pair
size (int): width (number of columns) of the grid world.
state_space (int): size of the state space, i.e. :math:`|\mathcal{S}|`.
Returns:
np.ndarray: one-hot representation of the state.
"""
idx = size * state[0] + state[1]
one_hot = np.zeros(state_space, dtype=np.int32)
one_hot[idx] = 1
return one_hot
[docs]def unflatten_state(onehot: np.ndarray, size,
has_absorbing_state: bool) -> Tuple[int, int]:
"""Unflatten a one-hot vector into a (row, col) pair.
Examples:
>>> unflatten_state(np.array([0,0,0,1]), 2, False)
(1, 1)
>>> unflatten_state(np.array([0, 0, 0, 0, 0, 1, 0, 0, 0]), 3, False)
(1, 2)
Args:
onehot (np.ndarray): one hot representation of a state
size (int): size of the grid world
has_absorbing_state (bool): whether the grid world has an absorbing state
Returns:
Tuple[int,int]: (row, col) pair
"""
if has_absorbing_state:
onehot = onehot[:-1]
onehot = onehot.reshape(size, size)
x = onehot.argmax(axis=0).max()
y = onehot.argmax(axis=1).max()
return (x, y)
[docs]def get_state_after_executing_action(action, state, grid_size):
"""
Gets the state after executing an action
:param action:
:param state:
:param grid_size:
:return:
"""
if check_can_take_action(action, state, grid_size):
if action == LEFT:
return state-1
elif action == RIGHT:
return state+1
elif action == UP:
return state - grid_size
elif action == DOWN:
return state + grid_size
else:
# cant execute action, stay in the same place.
return state
[docs]def check_can_take_action(action, state, grid_size):
"""
checks if you can take an action in a state.
:param action:
:param state:
:param grid_size:
:return:
"""
LAST_ROW = list(range(grid_size*(grid_size-1), grid_size*grid_size))
FIRST_ROW = list(range(0, grid_size))
LEFT_EDGE = list(range(0, grid_size*grid_size, grid_size))
RIGHT_EDGE = list(range(grid_size-1, grid_size*grid_size, grid_size))
if action == DOWN:
if state in LAST_ROW:
return False
elif action == RIGHT:
if state in RIGHT_EDGE:
return False
elif action == UP:
if state in FIRST_ROW:
return False
elif action == LEFT:
if state in LEFT_EDGE:
return False
else:
raise InvalidActionError('Cannot take action {} in a grid world of size {}x{}'.format(action, grid_size, grid_size))
return True
[docs]def get_possible_actions(state, grid_size):
"""Gets all possible actions at a given state.
Args:
state (_type_): _description_
grid_size (_type_): _description_
Returns:
_type_: _description_
"""
LAST_ROW = list(range(grid_size*(grid_size-1), grid_size*grid_size))
FIRST_ROW = list(range(0, grid_size))
LEFT_EDGE = list(range(0, grid_size*grid_size, grid_size))
RIGHT_EDGE = list(range(grid_size-1, grid_size*grid_size, grid_size))
available_actions = [LEFT, RIGHT, UP, DOWN]
if state in LAST_ROW:
available_actions.remove(DOWN)
if state in FIRST_ROW:
available_actions.remove(UP)
if state in RIGHT_EDGE:
available_actions.remove(RIGHT)
if state in LEFT_EDGE:
available_actions.remove(LEFT)
return available_actions
# def flatten_state(state, n_states, grid_size):
# """Flatten state (x,y) into a one hot vector"""
# idx =
# one_hot = np.zeros(n_states)
# one_hot[idx] = 1
# return one_hot
[docs]def build_simple_grid(size=5, terminal_states: List = None, p_success=1):
"""
Builds a simple grid where an agent can move *LEFT*, *RIGHT*, *UP* or *DOWN*
and actions success with probability ``p_success``.
A terminal state is added if :code:`len(terminal_states) > 0` and will return matrix of
size :math:`(|S|+1)\\times|A|\\times(|S|+1)`.
Moving into walls does nothing.
Examples:
Builds a simple 5x5 grid world where there is a terminal state at (0, 4).
The probability of successfully executing the action is 0.9.
This function returns the transition matrix.
>>> grid = build_simple_grid(size=5, terminal_states=[(0, 4)], p_success=0.9)
>>> print(grid.shape)
(26, 4, 26)
Args:
size (int, optional): size of the grid world. Defaults to 5.
:math:`|S| = size \\times size`
terminal_states (list, optional): the location of terminal states: a list of (x, y) tuples. Defaults to [].
p_success (int, optional): the probabilty that an action will be successful. Defaults to 1.
Raises:
InvalidActionError
Returns:
np.ndarray: the transition matrix of the given grid world.
The shape is :math:`\left(|S|+1,|A|,|S|+1\\right)`,
or :math:`\left(|S|,|A|,|S|\\right)` if there is no terminal state.
"""
if terminal_states is None:
terminal_states = []
p_fail = 1 - p_success
n_states = size*size
grid_states = n_states # the number of entries of the state vector
# corresponding to the grid itself.
if len(terminal_states) > 0:
n_states += 1 # add an entry to state vector for terminal state
terminal_states = list(map(lambda tupl: int(size * tupl[0] + tupl[1]), terminal_states))
# this helper function creates the state transition list for
# taking an action in a state
def create_state_list_for_action(state_idx, action):
transition_probs = np.zeros(n_states)
if state_idx in terminal_states:
# no matter what action you take you should go to the absorbing state
transition_probs[-1] = 1
elif state_idx == n_states-1 and len(terminal_states) > 0:
# absorbing state, you should just transition back here whatever action you take.
transition_probs[-1] = 1
elif action in [LEFT, RIGHT, UP, DOWN]:
# valid action, now see if we can actually execute this action
# in this state:
# TODO: distinguish between capability of slipping and taking wrong action vs failing to execute action.
if check_can_take_action(action, state_idx, size):
# yes we can
possible_actions = get_possible_actions(state_idx, size)
if action in possible_actions:
transition_probs[get_state_after_executing_action(action, state_idx, size)] = p_success
possible_actions.remove(action)
for other_action in possible_actions:
transition_probs[get_state_after_executing_action(other_action, state_idx, size)] = p_fail/len(possible_actions)
else:
possible_actions = get_possible_actions(state_idx, size)
transition_probs[state_idx] = p_success # cant take action, stay in same place
for other_action in possible_actions:
transition_probs[get_state_after_executing_action(other_action, state_idx, size)] = p_fail/len(possible_actions)
else:
raise InvalidActionError('Invalid action {} in the 2D gridworld'.format(action))
return transition_probs
P = np.zeros((n_states, n_actions, n_states))
for s in range(n_states):
for a in range(n_actions):
P[s, a, :] = create_state_list_for_action(s, a)
#
# T = {s: {a: create_state_list_for_action(s, a) for a in range(n_actions)} for s in range(n_states)}
# T[0][LEFT][0], T[0][RIGHT][0], T[0][DOWN][0], T[0][UP][0] = 1, 1, 1, 1
# T[15][LEFT][15], T[15][RIGHT][15], T[15][DOWN][15], T[15][UP][15] = 1, 1, 1, 1
return P