Source code for emdp.common

import numpy as np
from . import utils
from .exceptions import InvalidActionError, EpisodeDoneError

[docs]class Env(object): """ Abstract Environment wrapper. Args: seed(int): A seed for the random number generator. """ def __init__(self, seed): self.set_seed(seed)
[docs] def set_seed(self, seed): self.rng = np.random.RandomState(seed)
[docs]class MDP(Env): def __init__(self, P, R, gamma, p0, terminal_states, seed=1337, skip_check=False): """ A simple MDP simulator. :param P: The transition matrix of size |S|x|A|x|S| :param R: The reward criterion |S|x|A| :param gamma: the discount factor. :param p0: the distribution over starting states |S| (must sum to 1.) :param terminal_states: A list of integers which indicate terminal states, used to end episodes. Note that in the transition matrix these should be absorbing states to ensure calculations are correct. :param seed: the random seed for simulations. """ super().__init__(seed) if not skip_check: assert np.allclose(P.sum(axis=2), 1), 'Transition matrix does not seem to be a stochastic matrix ' \ '(i.e. the sum over states for each action doesn not equal 1' self.P = P self.R = R self.state_space = P.shape[0] self.action_space = R.shape[1] if not skip_check: assert self.state_space == P.shape[2], '3rd Dimension of Transition Matrix is not of size |S|' if not skip_check: assert self.action_space == P.shape[1], '2nd Dimension of Transition Matrix is not of size |A|' if not skip_check: assert self.state_space == R.shape[0], '1st Dimesnion of Reward Matrix is not of size |S|' self.gamma = gamma if not skip_check: assert self.state_space == p0.shape[0], 'Distribution over initial states is not over |S|' self.p0 = p0 self.terminal_states = terminal_states self.current_state = None self.reset()
[docs] def reset(self): integer_representation = np.random.choice(np.arange(self.state_space), p=self.p0) self.current_state = utils.convert_int_rep_to_onehot(integer_representation, self.state_space) self.done = False return self.current_state
[docs] def set_current_state_to(self, state): self.current_state = utils.convert_int_rep_to_onehot(state, self.state_space) self.done = False return self.current_state
[docs] def step(self, action): """ :param action: An integer representing the action taken. :return: """ if self.done: raise EpisodeDoneError('The episode has terminated. Use .reset() to restart the episode.') if action >= self.action_space or not isinstance(action, int): raise InvalidActionError('Invalid action {}. It must be an integer between 0 and {}'.format(action, self.action_space-1)) # we end from this episode onwards. # this check is done after entering terminal state # because we can only give the reward after leaving # a terminal state. if self.current_state.argmax() in self.terminal_states: self.done = True # get the vector representing the next state probabilities: current_state_idx = utils.convert_onehot_to_int(self.current_state) next_state_probs = self.P[current_state_idx, action] # sample the next state sampled_next_state = self.rng.choice(np.arange(self.state_space), p=next_state_probs) # observe the reward reward = self.R[current_state_idx, action] self.current_state = utils.convert_int_rep_to_onehot(sampled_next_state, self.state_space) return self.current_state, reward, self.done, {'gamma':self.gamma}