Source code for maze_world.envs.maze

from copy import copy

import gymnasium as gym
import numpy as np
import pygame
from gymnasium import spaces


[docs]class MazeEnv(gym.Env): r"""The main Maze Environment class for implementing different maze environments The class encapsulates maze environments with arbitrary behind-the-scenes dynamics through the :meth:`step` and :meth:`reset` functions. :example: >>> import gymnasium as gym >>> def generate_maze_fn(): ... maze_map = np.array( ... [ ... [1, 1, 1, 1, 1, 1, 1], ... [1, 0, 0, 0, 0, 0, 1], ... [1, 0, 0, 0, 0, 0, 1], ... [1, 0, 0, 0, 0, 0, 1], ... [1, 1, 1, 1, 1, 1, 1], ... ] ... ) ... agent_loc = np.array([1, 1]) ... target_loc = np.array([3, 5]) ... return maze_map, agent_loc, target_loc >>> env = MazeEnv(generate_maze_fn, None, 5, 7) """ metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
[docs] def __init__( self, generate_maze_fn: callable, render_mode: str = None, maze_width: int = None, # columns maze_height: int = None, # rows ): """ :param generate_maze_fn: This function is called during every reset of the environment and is expected to return three items in following order: - maze-map: numpy array of map where "1" represents wall and "0" represents floor. - agent location: tuple (x,y) where x and y represent location of agent - target location: tuple (x,y) where x and y represent target location of the agent :param render_mode: specifies one of the following: - None (default): no render is computed. - “human”: The environment is continuously rendered in the current display or terminal, usually for human consumption. This rendering should occur during step() and render() doesn’t need to be called. Returns None. - “rgb_array”: Return a single frame representing the current state of the environment. A frame is a np.ndarray with shape (x, y, 3) representing RGB values for an x-by-y pixel image. - “ansi”: Return a strings (str) or StringIO.StringIO containing a terminal-style text representation for each time step. The text can include newlines and ANSI escape sequences (e.g. for colors). - “rgb_array_list” and “ansi_list”: List based version of render modes are possible (except Human) through the wrapper, gymnasium.wrappers.RenderCollection that is automatically applied during gymnasium.make(...,render_mode="rgb_array_list"). The frames collected are popped after render() is called or reset(). :param maze_width: The width of the maze :param maze_height: The height of the maze """ self.generate_maze_fn = generate_maze_fn self.maze_width = maze_width self.maze_height = maze_height # The size of the PyGame window self._window_pixel_size = 25 self._window_size = ( maze_width * self._window_pixel_size, maze_height * self._window_pixel_size, ) """ The following dictionary maps abstract actions from `self.action_space` to the direction we will walk in if that action is taken. I.e. 0 corresponds to "right", 1 to "up" etc. """ self._action_to_direction = { 0: np.array([0, 1]), # right 1: np.array([-1, 0]), # up 2: np.array([0, -1]), # left 3: np.array([1, 0]), # down } assert render_mode is None or render_mode in self.metadata["render_modes"] self.render_mode = render_mode self._canvas = None """ If human-rendering is used, `self._window` will be a reference to the window that we draw to. `self._clock` will be a clock that is used to ensure that the environment is rendered at the correct framerate in human-mode. They will remain `None` until human-mode is used for the first time. """ self._window = None self._clock = None
@property def action_space(self): """ Specifies available discrete action for the environment, where 0. "right" 1. "up" 2. "left" 3. "down" :return: Discrete action space object representing the possible actions. :rtype: gym.spaces.Discrete """ return spaces.Discrete(4) @property def observation_space(self): """ Defines the observation space of the 2D maze environment. The observation space consists of two elements: - 'agent': Represents the position of the agent in the maze. - 'target': Represents the position of the target in the maze. In the 2D maze: - 0 corresponds to an empty floor. - 1 corresponds to a wall. - 2 corresponds to the agent or the target. :return: Dictionary containing the observation space for the agent and the target. - 'agent': gym.spaces.Box object representing the agent's position. - 'target': gym.spaces.Box object representing the target's position. :rtype: gym.spaces.Dict """ return spaces.Dict( { "agent": spaces.Box( low=np.zeros((self.maze_height, self.maze_width)), high=np.ones((self.maze_height, self.maze_width)) * 2, shape=( self.maze_height, self.maze_width, ), dtype=int, ), "target": spaces.Box( low=np.zeros((self.maze_height, self.maze_width)), high=np.ones((self.maze_height, self.maze_width)) * 2, shape=( self.maze_height, self.maze_width, ), dtype=int, ), } ) def _get_obs(self): agent_maze = copy(self.maze_map) agent_maze[self._agent_location[0], self._agent_location[1]] = 2 target_maze = copy(self.maze_map) target_maze[self._target_location[0], self._target_location[1]] = 2 return {"agent": agent_maze, "target": target_maze} def _get_info(self): return { "distance": np.linalg.norm( self._agent_location - self._target_location, ord=1 ), "agent": self._agent_location, "target": self._target_location, }
[docs] def reset(self, seed: int = None, options=None): """ Resets the environment to its initial state and generates a new random maze configuration. :param seed: Seed for the random number generator. Defaults to None. :param options: Unused parameter. :return: - observation: Agent's observation of the initial environment state. - info (dict): Additional information about the environment. :rtype: tuple :raises ValueError: If the shape of the maze generated by generate_maze_fn() doesn't match the specified maze width and height. """ # We need the following line to seed self.np_random super().reset(seed=seed) # generate a maze: self._prev_agent_location = None self.maze_map, self._agent_location, self._target_location = ( self.generate_maze_fn() ) if not np.array_equal(self.maze_map.shape, [self.maze_height, self.maze_width]): raise ValueError( f"Shape of Generated Maze doesn't match with" f" specified maze width and height." f" Generate maze shape is {self.maze_map.shape}, " f"whereas specified maze width is {self.maze_width}" f" and height is {self.maze_height}" ) # return initial parameters observation = self._get_obs() info = self._get_info() if self.render_mode == "human": self._render_frame() self._canvas = None return observation, info
def _no_obstacle(self, location): return not (self.maze_map[location[0], location[1]] == 1) # check for walls
[docs] def step(self, action: int): """ Take a step in the environment. :param action: The action to take. :type action: int :return: - observation: Agent's current observation of the environment. - reward (float): Reward received after taking the step. - terminated (bool): Whether the episode has terminated or not. - truncated (bool): Whether the episode has been truncated due to max episode steps. - info (dict): Additional information about the step. :rtype: tuple """ terminated = False # Map the action (element of {0,1,2,3}) to the direction we walk in direction = self._action_to_direction[action] new_agent_location = self._agent_location + direction if self._no_obstacle(new_agent_location): self._prev_agent_location, self._agent_location = ( self._agent_location, new_agent_location, ) if np.array_equal( self._agent_location, self._target_location ): # goal location check reward = +1 # reward for reaching goal location terminated = True else: reward = -0.01 # step cost else: reward = -1 # high penalty for colliding with walls observation = self._get_obs() info = self._get_info() if self.render_mode == "human": self._render_frame() return observation, reward, terminated, False, info
[docs] def render(self): """ Compute the render frames as specified by `render_mode` during the initialization of the environment. """ if self.render_mode == "rgb_array": return self._render_frame()
def _render_frame(self): if self._window is None and self.render_mode == "human": pygame.init() pygame.display.init() pygame.display.set_caption(f"Maze - {self.maze_width} x {self.maze_height}") self._window = pygame.display.set_mode(self._window_size) if self._clock is None and self.render_mode == "human": self._clock = pygame.time.Clock() if self._canvas is None: self._canvas = pygame.Surface(self._window_size) self._canvas.fill((255, 255, 255)) # draw walls for x in range(self.maze_height): for y in range(self.maze_width): if self.maze_map[x, y] == 1: pygame.draw.rect( self._canvas, (169, 169, 169), pygame.Rect( np.array([y, x]) * self._window_pixel_size, (self._window_pixel_size, self._window_pixel_size), ), ) # draw the target pygame.draw.rect( self._canvas, (255, 0, 0), pygame.Rect( self._target_location[::-1] * self._window_pixel_size, (self._window_pixel_size, self._window_pixel_size), ), ) # Clean previous agent location if self._prev_agent_location is not None: pygame.draw.circle( self._canvas, (255, 255, 255), (self._prev_agent_location[::-1] + 0.5) * self._window_pixel_size, self._window_pixel_size / 4, ) # Draw new agent location pygame.draw.circle( self._canvas, (0, 0, 255), (self._agent_location[::-1] + 0.5) * self._window_pixel_size, self._window_pixel_size / 4, ) if self.render_mode == "human": # The following line copies our drawings from `canvas` to the visible window self._window.blit(self._canvas, self._canvas.get_rect()) pygame.event.pump() pygame.display.update() # We need to ensure that human-rendering occurs at the predefined framerate. # The following line will automatically add a delay to keep the framerate stable. self._clock.tick(self.metadata["render_fps"]) else: # rgb_array return np.transpose( np.array(pygame.surfarray.pixels3d(self._canvas)), axes=(1, 0, 2) )
[docs] def close(self): """ Closes the environment. This method shuts down the Pygame display if it was initialized. """ if self._window is not None: pygame.display.quit() pygame.quit()
ACTION_MEANING = { 0: "Left", 1: "Up", 2: "Right", 3: "Down", } OBSERVATION_MEANING = { 0: "empty", 1: "wall", 2: "agent", } OBJECT_ID = {v: k for k, v in OBSERVATION_MEANING.items()}