diff --git a/examples/intro.py b/examples/intro.py index b4bec6b..1336efa 100644 --- a/examples/intro.py +++ b/examples/intro.py @@ -1,6 +1,7 @@ """ Example of how to use the wildfire evacuation RL environment. """ + import gymnasium import numpy as np import pyrorl @@ -11,19 +12,38 @@ """ # Set up parameters num_rows, num_cols = 10, 10 - populated_areas = np.array([[1,2],[4,8], [6,4], [8, 7]]) - paths = np.array([[[1,0],[1,1]], [[2,2],[3,2],[4,2],[4,1],[4,0]], [[2,9],[2,8],[3,8]], [[5,8],[6,8],[6,9]], [[7,7], [6,7], [6,8], [6,9]], [[8,6], [8,5], [9,5]], [[8,5], [9,5], [7,5],[7,4]]], dtype=object) - paths_to_pops = {0:[[1,2]], 1:[[1,2]], 2: [[4,8]], 3:[[4,8]], 4:[[8, 7]], 5:[[8, 7]], 6:[[6,4]]} + populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]]) + paths = np.array( + [ + [[1, 0], [1, 1]], + [[2, 2], [3, 2], [4, 2], [4, 1], [4, 0]], + [[2, 9], [2, 8], [3, 8]], + [[5, 8], [6, 8], [6, 9]], + [[7, 7], [6, 7], [6, 8], [6, 9]], + [[8, 6], [8, 5], [9, 5]], + [[8, 5], [9, 5], [7, 5], [7, 4]], + ], + dtype=object, + ) + paths_to_pops = { + 0: [[1, 2]], + 1: [[1, 2]], + 2: [[4, 8]], + 3: [[4, 8]], + 4: [[8, 7]], + 5: [[8, 7]], + 6: [[6, 4]], + } # Create environment kwargs = { - 'num_rows': num_rows, - 'num_cols': num_cols, - 'populated_areas': populated_areas, - 'paths': paths, - 'paths_to_pops': paths_to_pops + "num_rows": num_rows, + "num_cols": num_cols, + "populated_areas": populated_areas, + "paths": paths, + "paths_to_pops": paths_to_pops, } - env = gymnasium.make('pyrorl/PyroRL-v0', **kwargs) + env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs) # Run a simple loop of the environment env.reset() diff --git a/examples/stable_baselines.py b/examples/stable_baselines.py index 32edf8f..b09729e 100644 --- a/examples/stable_baselines.py +++ b/examples/stable_baselines.py @@ -1,6 +1,7 @@ """ An example of how to use our environment with Stable Baselines 3 """ + import gymnasium import numpy as np from stable_baselines3 import DQN @@ -12,19 +13,38 @@ """ # Set up parameters num_rows, num_cols = 10, 10 - populated_areas = np.array([[1,2],[4,8], [6,4], [8, 7]]) - paths = np.array([[[1,0],[1,1]], [[2,2],[3,2],[4,2],[4,1],[4,0]], [[2,9],[2,8],[3,8]], [[5,8],[6,8],[6,9]], [[7,7], [6,7], [6,8], [6,9]], [[8,6], [8,5], [9,5]], [[8,5], [9,5], [7,5],[7,4]]], dtype=object) - paths_to_pops = {0:[[1,2]], 1:[[1,2]], 2: [[4,8]], 3:[[4,8]], 4:[[8, 7]], 5:[[8, 7]], 6:[[6,4]]} + populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]]) + paths = np.array( + [ + [[1, 0], [1, 1]], + [[2, 2], [3, 2], [4, 2], [4, 1], [4, 0]], + [[2, 9], [2, 8], [3, 8]], + [[5, 8], [6, 8], [6, 9]], + [[7, 7], [6, 7], [6, 8], [6, 9]], + [[8, 6], [8, 5], [9, 5]], + [[8, 5], [9, 5], [7, 5], [7, 4]], + ], + dtype=object, + ) + paths_to_pops = { + 0: [[1, 2]], + 1: [[1, 2]], + 2: [[4, 8]], + 3: [[4, 8]], + 4: [[8, 7]], + 5: [[8, 7]], + 6: [[6, 4]], + } # Create environment kwargs = { - 'num_rows': num_rows, - 'num_cols': num_cols, - 'populated_areas': populated_areas, - 'paths': paths, - 'paths_to_pops': paths_to_pops + "num_rows": num_rows, + "num_cols": num_cols, + "populated_areas": populated_areas, + "paths": paths, + "paths_to_pops": paths_to_pops, } - env = gymnasium.make('pyrorl/PyroRL-v0', **kwargs) + env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs) # Train a model and delete model = DQN("MlpPolicy", env, verbose=1) diff --git a/pyrorl/pyrorl/envs/environment/environment.py b/pyrorl/pyrorl/envs/environment/environment.py index 02e1caf..12dc3b1 100644 --- a/pyrorl/pyrorl/envs/environment/environment.py +++ b/pyrorl/pyrorl/envs/environment/environment.py @@ -1,6 +1,7 @@ """ Environment for Wildfire Spread """ + from collections import namedtuple import copy from importlib.resources import path @@ -23,6 +24,7 @@ EVACUATING_INDEX = 3 PATHS_INDEX = 4 + class FireWorld: """ We represent the world as a 5 by n by m tensor. n by m is the size of the grid world, @@ -30,17 +32,17 @@ class FireWorld: """ def __init__( - self, - num_rows, - num_cols, - populated_areas, - paths, - paths_to_pops, - num_fire_cells = 2, - custom_fire_locations = None, - wind_speed = None, - wind_angle = None, - ): + self, + num_rows, + num_cols, + populated_areas, + paths, + paths_to_pops, + num_fire_cells=2, + custom_fire_locations=None, + wind_speed=None, + wind_angle=None, + ): """ The constructor defines the state and action space, initializes the fires, and sets the paths and populated areas. @@ -53,12 +55,14 @@ def __init__( for key in paths_to_pops: for _ in range(len(paths_to_pops[key])): num_actions += 1 - self.actions = list(np.arange(num_actions + 1)) # extra action for doing nothing + self.actions = list( + np.arange(num_actions + 1) + ) # extra action for doing nothing # We want to remember which action index corresponds to which population center # and which path (because we just provide an array like [1,2,3,4,5,6,7]) which # would each be mapped to a given population area taking a given path - self.action_to_pop_and_path = { self.actions[-1] : None} + self.action_to_pop_and_path = {self.actions[-1]: None} index = 0 for path in paths_to_pops: @@ -67,25 +71,33 @@ def __init__( index += 1 # State for the evacuation of populated areas - self.evacuating_paths = {} # path_index : list of pop x,y indices that are evacuating [[x,y],[x,y],...] + self.evacuating_paths = ( + {} + ) # path_index : list of pop x,y indices that are evacuating [[x,y],[x,y],...] self.evacuating_timestamps = np.full((num_rows, num_cols), np.inf) # Initialize placement of fire cells if custom_fire_locations: - fire_rows = custom_fire_locations[:,0] - fire_cols = custom_fire_locations[:,1] + fire_rows = custom_fire_locations[:, 0] + fire_cols = custom_fire_locations[:, 1] self.state_space[FIRE_INDEX, fire_rows, fire_cols] = 1 else: for _ in range(num_fire_cells): - self.state_space[FIRE_INDEX, random.randint(0, num_rows - 1), random.randint(0, num_cols - 1)] = 1 + self.state_space[ + FIRE_INDEX, + random.randint(0, num_rows - 1), + random.randint(0, num_cols - 1), + ] = 1 # Initialize fuel levels # Note: make the fire spread parameters to constants? num_values = num_rows * num_cols - self.state_space[FUEL_INDEX] = np.random.normal(8.5,3,num_values).reshape((num_rows,num_cols)) + self.state_space[FUEL_INDEX] = np.random.normal(8.5, 3, num_values).reshape( + (num_rows, num_cols) + ) # Initialize populated areas - pop_rows, pop_cols = populated_areas[:,0], populated_areas[:,1] + pop_rows, pop_cols = populated_areas[:, 0], populated_areas[:, 1] self.state_space[POPULATED_INDEX, pop_rows, pop_cols] = 1 # Initialize paths @@ -93,8 +105,8 @@ def __init__( self.paths = [] for path in paths: path_array = np.array(path) - path_rows, path_cols = path_array[:,0], path_array[:,1] - self.state_space[PATHS_INDEX,path_rows,path_cols] += 1 + path_rows, path_cols = path_array[:, 0], path_array[:, 1] + self.state_space[PATHS_INDEX, path_rows, path_cols] += 1 # each path in self.paths is a list that records what the path is and whether or not the path still exists (i.e. has # not been destroyed by a fire) @@ -104,10 +116,12 @@ def __init__( # Set the timestep self.time_step = 0 - #Factor in wind speeds + # Factor in wind speeds if wind_speed is not None or wind_angle is not None: if wind_speed is None or wind_angle is None: - raise TypeError("When setting wind details, wind speed and wind angle must both be provided") + raise TypeError( + "When setting wind details, wind speed and wind angle must both be provided" + ) global fire_mask fire_mask = linear_wind_transform(wind_speed, wind_angle) @@ -116,15 +130,15 @@ def sample_fire_propogation(self): Sample the next state of the wildfire model. """ # Drops fuel level of enflamed cells - self.state_space[FUEL_INDEX,self.state_space[FIRE_INDEX] == 1] -= 1 - self.state_space[FUEL_INDEX,self.state_space[FUEL_INDEX] < 0] = 0 + self.state_space[FUEL_INDEX, self.state_space[FIRE_INDEX] == 1] -= 1 + self.state_space[FUEL_INDEX, self.state_space[FUEL_INDEX] < 0] = 0 # Extinguishes cells that have run out of fuel - self.state_space[FIRE_INDEX,self.state_space[FUEL_INDEX,:] <= 0] = 0 + self.state_space[FIRE_INDEX, self.state_space[FUEL_INDEX, :] <= 0] = 0 # Runs kernel of neighborhing cells where each row corresponds to the neighborhood of a cell torch_rep = torch.tensor(self.state_space[FIRE_INDEX]).unsqueeze(0) - y = torch.nn.Unfold((5,5), dilation = 1, padding = 2) + y = torch.nn.Unfold((5, 5), dilation=1, padding=2) z = y(torch_rep) # The relative importance of each neighboring cell is weighted @@ -132,7 +146,7 @@ def sample_fire_propogation(self): # Unenflamed cells are set to 1 to eliminate their role to the fire spread equation z[z == 0] = 1 - z = z.prod(dim = 0) + z = z.prod(dim=0) z = 1 - z.reshape(self.state_space[FIRE_INDEX].shape) # From the probability of an ignition in z, new fire locations are randomly generated @@ -140,7 +154,9 @@ def sample_fire_propogation(self): new_fire = (z > prob_mask).float() # These new fire locations are added to the state - self.state_space[FIRE_INDEX] = np.maximum(np.array(new_fire), self.state_space[FIRE_INDEX]) + self.state_space[FIRE_INDEX] = np.maximum( + np.array(new_fire), self.state_space[FIRE_INDEX] + ) # Note to self: make sure to update evacuation timestep state def update_paths_and_evactuations(self): @@ -152,28 +168,36 @@ def update_paths_and_evactuations(self): """ for i in range(len(self.paths)): # Decrement path counts and remove path - if self.paths[i][1] and np.sum(np.logical_and(self.state_space[FIRE_INDEX], self.paths[i][0])) > 0: + if ( + self.paths[i][1] + and np.sum( + np.logical_and(self.state_space[FIRE_INDEX], self.paths[i][0]) + ) + > 0 + ): self.state_space[PATHS_INDEX] -= self.paths[i][0] self.paths[i][1] = False # Stop evacuating an area if it was taking the removed path if i in self.evacuating_paths: pop_centers = np.array(self.evacuating_paths[i]) - pop_rows, pop_cols = pop_centers[:,0], pop_centers[:,1] + pop_rows, pop_cols = pop_centers[:, 0], pop_centers[:, 1] # Reset timestamp and evacuation index - self.evacuating_timestamps[pop_rows,pop_cols] = np.inf + self.evacuating_timestamps[pop_rows, pop_cols] = np.inf self.state_space[EVACUATING_INDEX, pop_rows, pop_cols] = 0 del self.evacuating_paths[i] - elif i in self.evacuating_paths: # we need to decrement the evacuating paths timestep + elif ( + i in self.evacuating_paths + ): # we need to decrement the evacuating paths timestep # for the below, this code works for if multiple population centers are taking the same path and # finish at the same time, but if we have it so that two population centers can't take the same # path it could probably be simplified pop_centers = np.array(self.evacuating_paths[i]) - pop_rows, pop_cols = pop_centers[:,0], pop_centers[:,1] - self.evacuating_timestamps[pop_rows,pop_cols] -= 1 + pop_rows, pop_cols = pop_centers[:, 0], pop_centers[:, 1] + self.evacuating_timestamps[pop_rows, pop_cols] -= 1 done_evacuating = np.where(self.evacuating_timestamps == 0) self.state_space[EVACUATING_INDEX, done_evacuating] = 0 self.state_space[POPULATED_INDEX, done_evacuating] = 0 @@ -191,7 +215,10 @@ def update_paths_and_evactuations(self): # this population center is done evacuating, so we can set its timestamp back to infinity # (this is important so that we don't try to remove this from self.evacuating paths twice - # was causing a bug) - update_row, update_col = done_evacuating[j,0], done_evacuating[j,1] + update_row, update_col = ( + done_evacuating[j, 0], + done_evacuating[j, 1], + ) self.evacuating_timestamps[update_row, update_col] = np.inf # no more population centers are using this path, so we delete it @@ -220,12 +247,14 @@ def accumulate_reward(self): evacuating = self.state_space[EVACUATING_INDEX][populated_areas] # Mark enflamed areas as no longer populated or evacuating - enflamed_populated_areas = np.where(self.state_space[FIRE_INDEX][populated_areas] == 1)[0] + enflamed_populated_areas = np.where( + self.state_space[FIRE_INDEX][populated_areas] == 1 + )[0] enflamed_rows = populated_areas[0][enflamed_populated_areas] enflamed_cols = populated_areas[1][enflamed_populated_areas] # Depopulate enflamed areas and remove evacuations - self.state_space[POPULATED_INDEX, enflamed_rows, enflamed_cols]= 0 + self.state_space[POPULATED_INDEX, enflamed_rows, enflamed_cols] = 0 self.state_space[EVACUATING_INDEX, enflamed_rows, enflamed_cols] = 0 # Update reward @@ -237,20 +266,27 @@ def set_action(self, action): Allow the agent to take an action within the action space. """ # Check that there is an action to take - if action in self.action_to_pop_and_path and self.action_to_pop_and_path[action] is not None and len(self.action_to_pop_and_path[action]) > 0: + if ( + action in self.action_to_pop_and_path + and self.action_to_pop_and_path[action] is not None + and len(self.action_to_pop_and_path[action]) > 0 + ): pop_cell, path_index = self.action_to_pop_and_path[action] pop_cell_row, pop_cell_col = pop_cell[0], pop_cell[1] # Ensure that the path chosen and populated cell haven't burned down and it's not already evacuating - if (self.paths[path_index][1] and self.state_space[POPULATED_INDEX,pop_cell_row, pop_cell_col] == 1 - and self.evacuating_timestamps[pop_cell_row, pop_cell_col] == np.inf): + if ( + self.paths[path_index][1] + and self.state_space[POPULATED_INDEX, pop_cell_row, pop_cell_col] == 1 + and self.evacuating_timestamps[pop_cell_row, pop_cell_col] == np.inf + ): # Add to evacuating paths and update state + timestamp if path_index in self.evacuating_paths: self.evacuating_paths[path_index].append(pop_cell) else: self.evacuating_paths[path_index] = [pop_cell] - self.state_space[EVACUATING_INDEX,pop_cell_row, pop_cell_col] = 1 + self.state_space[EVACUATING_INDEX, pop_cell_row, pop_cell_col] = 1 self.evacuating_timestamps[pop_cell_row, pop_cell_col] = 10 def get_state_utility(self): @@ -285,4 +321,4 @@ def get_terminated(self): """ Get the status of the simulation. """ - return ( self.time_step >= 100 ) + return self.time_step >= 100 diff --git a/pyrorl/pyrorl/envs/environment/environment_constant.py b/pyrorl/pyrorl/envs/environment/environment_constant.py index ee5fe74..39d8a9b 100644 --- a/pyrorl/pyrorl/envs/environment/environment_constant.py +++ b/pyrorl/pyrorl/envs/environment/environment_constant.py @@ -3,35 +3,47 @@ # Mask used for calculating the probability a cell alighting following the same propagation formula from existing research # Distance along axis from origin. Origin is referring to the cell we are presently trying to determine if becomes enflamed in the next timestep -distance_to_probability_of_enflaming_ratio = .094 -distance_matrix = torch.tensor([[2,1,0,1,2],[2,1,0,1,2],[2,1,0,1,2],[2,1,0,1,2],[2,1,0,1,2]]) +distance_to_probability_of_enflaming_ratio = 0.094 +distance_matrix = torch.tensor( + [ + [2, 1, 0, 1, 2], + [2, 1, 0, 1, 2], + [2, 1, 0, 1, 2], + [2, 1, 0, 1, 2], + [2, 1, 0, 1, 2], + ] +) # Squaring of values for later calculating square of L2 norm -temp = distance_matrix ** 2 +temp = distance_matrix**2 # Calculate the probability an enflamed neighboring cell does not enflame the cell located at the origin distance_matrix = 1 - 1 / (temp + temp.T) * distance_to_probability_of_enflaming_ratio # As there is zero distance between the origin and itself, we set this value to 1, so the contribution of the origin is ignored in the product -distance_matrix[2,2] = 1 +distance_matrix[2, 2] = 1 # Flatten probably mask so it can be efficiently used as a kernel -base_fire_mask = distance_matrix.reshape((25,1)) +base_fire_mask = distance_matrix.reshape((25, 1)) fire_mask = np.copy(base_fire_mask) # Wind components # The rate with which speed of wind converts to a percent change in the chance of a neighbor cell igniting the center cell speed_to_percent_ratio = 0.1 -axis_distance = np.array([5 * [-i] for i in range(-2,3)]) +axis_distance = np.array([5 * [-i] for i in range(-2, 3)]) # a 5x5 matrix where each element represents a vector pointing in the direction of the corresponding neihboring cell -neighbor_vectors = np.stack((-axis_distance.T, axis_distance), axis=2).reshape((-1,2)) -neighbor_vectors[12,:] = 1 +neighbor_vectors = np.stack((-axis_distance.T, axis_distance), axis=2).reshape((-1, 2)) +neighbor_vectors[12, :] = 1 # Normalizes these vectors to unit vectors -normalizer = np.linalg.norm(neighbor_vectors, axis = 1).reshape((neighbor_vectors.shape[0], 1)) +normalizer = np.linalg.norm(neighbor_vectors, axis=1).reshape( + (neighbor_vectors.shape[0], 1) +) neighbor_vectors = neighbor_vectors / normalizer.reshape((normalizer.size, 1)) -neighbor_vectors[12,:] = 0 +neighbor_vectors[12, :] = 0 # Computes a simple linear transformation of fire propogation probabilities scaled linearly by the speed of the wind and the dot product # between the wind direction and the direction to the neighboring cell # Probabilities are clamped around 0 and 1 -def linear_wind_transform(wind_speed : float, wind_angle : float): +def linear_wind_transform(wind_speed: float, wind_angle: float): wind_vector = np.array([[np.cos(wind_angle)], [np.sin(wind_angle)]]) - scaling_term = -(neighbor_vectors @ wind_vector) * speed_to_percent_ratio * wind_speed + 1 - return np.clip(torch.from_numpy(scaling_term) * base_fire_mask, a_min=0, a_max=1) \ No newline at end of file + scaling_term = ( + -(neighbor_vectors @ wind_vector) * speed_to_percent_ratio * wind_speed + 1 + ) + return np.clip(torch.from_numpy(scaling_term) * base_fire_mask, a_min=0, a_max=1) diff --git a/pyrorl/pyrorl/envs/pyrorl.py b/pyrorl/pyrorl/envs/pyrorl.py index 5793954..b074918 100644 --- a/pyrorl/pyrorl/envs/pyrorl.py +++ b/pyrorl/pyrorl/envs/pyrorl.py @@ -1,6 +1,7 @@ """ OpenAI Gym Environment Wrapper Class """ + from pyrorl.envs.environment.environment import * import gymnasium as gym from gymnasium import spaces @@ -18,6 +19,7 @@ PATH_COLOR = pygame.Color("#ffd166") GRASS_COLOR = pygame.Color("#06d6a0") + class WildfireEvacuationEnv(gym.Env): def __init__( self, @@ -26,8 +28,8 @@ def __init__( populated_areas, paths, paths_to_pops, - wind_speed = None, - wind_angle = None, + wind_speed=None, + wind_angle=None, ): """ Set up the basic environment and its parameters. @@ -45,7 +47,7 @@ def __init__( paths, paths_to_pops, wind_speed=wind_speed, - wind_angle=wind_angle + wind_angle=wind_angle, ) # Set up action space @@ -54,23 +56,31 @@ def __init__( # Set up observation space observations = self.fire_env.get_state() - self.observation_space = spaces.Box(low=0, high=200, shape = observations.shape, dtype=np.float64) + self.observation_space = spaces.Box( + low=0, high=200, shape=observations.shape, dtype=np.float64 + ) # Set up grid constants self.grid_width = WIDTH // num_rows self.grid_height = HEIGHT // num_cols # Create directory to store screenshots - if (os.path.exists("grid_screenshots") is False): + if os.path.exists("grid_screenshots") is False: os.mkdir("grid_screenshots") - def reset(self, seed = None, options = None): + def reset(self, seed=None, options=None): """ Reset the environment to its initial state. """ - self.fire_env = FireWorld(self.num_rows, self.num_cols, self.populated_areas, self.paths, self.paths_to_pops) + self.fire_env = FireWorld( + self.num_rows, + self.num_cols, + self.populated_areas, + self.paths, + self.paths_to_pops, + ) state_space = self.fire_env.get_state() - return state_space, { "": "" } + return state_space, {"": ""} def step(self, action): """ @@ -84,7 +94,7 @@ def step(self, action): observations = self.fire_env.get_state() rewards = self.fire_env.get_state_utility() terminated = self.fire_env.get_terminated() - return observations, rewards, terminated, False, { "" : "" } + return observations, rewards, terminated, False, {"": ""} def render_hf(self, screen, font): """ @@ -146,7 +156,9 @@ def render(self): for event in pygame.event.get(): if event.type == pygame.QUIT: timestep = self.fire_env.get_timestep() - pygame.image.save(screen, "grid_screenshots/" + str(timestep) + ".png") + pygame.image.save( + screen, "grid_screenshots/" + str(timestep) + ".png" + ) running = False # Iterate through all of the squares @@ -156,17 +168,22 @@ def render(self): # Set color of the square color = GRASS_COLOR - if (state_space[0][x][y] == 1): + if state_space[0][x][y] == 1: color = FIRE_COLOR - if (state_space[2][x][y] == 1): + if state_space[2][x][y] == 1: color = POPULATED_COLOR - if (state_space[3][x][y] > 0): + if state_space[3][x][y] > 0: color = EVACUATING_COLOR - if (state_space[4][x][y] > 0): + if state_space[4][x][y] > 0: color = PATH_COLOR # Draw the square - square_rect = pygame.Rect(50 + x * (self.grid_width + 2), 50 + y * (self.grid_height + 2), self.grid_width, self.grid_height) + square_rect = pygame.Rect( + 50 + x * (self.grid_width + 2), + 50 + y * (self.grid_height + 2), + self.grid_width, + self.grid_height, + ) pygame.draw.rect(screen, color, square_rect) # Render and then quit outside diff --git a/tests/environment_test.py b/tests/environment_test.py index 109edbc..fae73d6 100644 --- a/tests/environment_test.py +++ b/tests/environment_test.py @@ -1,11 +1,13 @@ """ Unit tests for each of the functions in environment.py """ + import sys from pyrorl.envs.environment.environment import * import numpy as np import random + def dummy_environment(): """ Set up environment for the grid world. @@ -13,9 +15,25 @@ def dummy_environment(): # Define hardcoded paramaters of the gridworld -- populated areas, paths, # and which areas can use whicn paths. # Note: discuss if these should be dynamically generated or others. - populated_areas = np.array([[1,2],[4,8], [6,4], [8, 7]]) - paths = [[[1,0],[1,1]], [[2,2],[3,2],[4,2],[4,1],[4,0]], [[2,9],[2,8],[3,8]], [[5,8],[6,8],[6,9]], [[7,7], [6,7], [6,8], [6,9]], [[8,6], [8,5], [9,5]], [[8,5], [9,5], [7,5],[7,4]]] - paths_to_pops = {0:[[1,2]], 1:[[1,2]], 2: [[4,8]], 3:[[4,8]], 4:[[8, 7]], 5:[[8, 7]], 6:[[6,4]]} + populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]]) + paths = [ + [[1, 0], [1, 1]], + [[2, 2], [3, 2], [4, 2], [4, 1], [4, 0]], + [[2, 9], [2, 8], [3, 8]], + [[5, 8], [6, 8], [6, 9]], + [[7, 7], [6, 7], [6, 8], [6, 9]], + [[8, 6], [8, 5], [9, 5]], + [[8, 5], [9, 5], [7, 5], [7, 4]], + ] + paths_to_pops = { + 0: [[1, 2]], + 1: [[1, 2]], + 2: [[4, 8]], + 3: [[4, 8]], + 4: [[8, 7]], + 5: [[8, 7]], + 6: [[6, 4]], + } # Initialize fire world test_world = FireWorld(10, 10, populated_areas, paths, paths_to_pops) @@ -26,33 +44,41 @@ def test_initialization(): """ Test to make sure that initializing the environment goes smoothly. """ - populated_areas = np.array([[2,2],[4,1]]) - paths = [[[2,0],[2,1]], [[1,0],[1,1],[2,1],[3,1]]] - paths_to_pops = {0:[2,2], 1:[4,1]} + populated_areas = np.array([[2, 2], [4, 1]]) + paths = [[[2, 0], [2, 1]], [[1, 0], [1, 1], [2, 1], [3, 1]]] + paths_to_pops = {0: [2, 2], 1: [4, 1]} test_world = FireWorld(5, 5, populated_areas, paths, paths_to_pops) - expected_population_array = np.array([ - [0,0,0,0,0], - [0,0,0,0,0], - [0,0,1,0,0], - [0,0,0,0,0], - [0,1,0,0,0] - ]) - expected_path_array = np.array([ - [0,0,0,0,0], - [1,1,0,0,0], - [1,2,0,0,0], - [0,1,0,0,0], - [0,0,0,0,0] - ]) - expected_returned_path_array = np.array([ - [0,0,0,0,0], - [1,1,0,0,0], - [1,1,0,0,0], - [0,1,0,0,0], - [0,0,0,0,0] - ]) - assert np.array_equal(test_world.state_space[POPULATED_INDEX], expected_population_array) + expected_population_array = np.array( + [ + [0, 0, 0, 0, 0], + [0, 0, 0, 0, 0], + [0, 0, 1, 0, 0], + [0, 0, 0, 0, 0], + [0, 1, 0, 0, 0], + ] + ) + expected_path_array = np.array( + [ + [0, 0, 0, 0, 0], + [1, 1, 0, 0, 0], + [1, 2, 0, 0, 0], + [0, 1, 0, 0, 0], + [0, 0, 0, 0, 0], + ] + ) + expected_returned_path_array = np.array( + [ + [0, 0, 0, 0, 0], + [1, 1, 0, 0, 0], + [1, 1, 0, 0, 0], + [0, 1, 0, 0, 0], + [0, 0, 0, 0, 0], + ] + ) + assert np.array_equal( + test_world.state_space[POPULATED_INDEX], expected_population_array + ) assert np.array_equal(test_world.state_space[PATHS_INDEX], expected_path_array) returned_state = test_world.get_state() assert np.array_equal(returned_state[PATHS_INDEX], expected_returned_path_array) @@ -79,11 +105,11 @@ def test_setup(): def test_remove_path_on_fire(): """ - Test to make sure that if a path goes on fire, they are removed from the state space for paths. + Test to make sure that if a path goes on fire, they are removed from the state space for paths. """ - populated_areas = np.array([[1,2]]) - paths = [[[1,0],[1,1]]] - paths_to_pops = {0:[[1,2]]} + populated_areas = np.array([[1, 2]]) + paths = [[[1, 0], [1, 1]]] + paths_to_pops = {0: [[1, 2]]} num_rows = 5 num_cols = 5 @@ -96,18 +122,20 @@ def test_remove_path_on_fire(): test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1 test_world.update_paths_and_evactuations() - assert np.array_equal(test_world.state_space[PATHS_INDEX], np.zeros((num_rows, num_cols))) + assert np.array_equal( + test_world.state_space[PATHS_INDEX], np.zeros((num_rows, num_cols)) + ) assert test_world.paths[0][1] == False def test_remove_multiple_paths_on_fire(): """ - Test to make sure that if multiple paths go on fire after a step, they are both removed from + Test to make sure that if multiple paths go on fire after a step, they are both removed from the paths state space. """ - populated_areas = np.array([[1,2], [3,3]]) - paths = [[[1,0],[1,1]], [[3,4]]] - paths_to_pops = {0:[[1,2]], 1:[[3,4]]} + populated_areas = np.array([[1, 2], [3, 3]]) + paths = [[[1, 0], [1, 1]], [[3, 4]]] + paths_to_pops = {0: [[1, 2]], 1: [[3, 4]]} num_rows = 5 num_cols = 5 @@ -120,10 +148,11 @@ def test_remove_multiple_paths_on_fire(): test_world.state_space[FIRE_INDEX, first_path_cell[0], first_path_cell[1]] = 1 second_path_cell = paths[1][0] test_world.state_space[FIRE_INDEX, second_path_cell[0], second_path_cell[1]] = 1 - test_world.update_paths_and_evactuations() - assert np.array_equal(test_world.state_space[PATHS_INDEX], np.zeros((num_rows, num_cols))) + assert np.array_equal( + test_world.state_space[PATHS_INDEX], np.zeros((num_rows, num_cols)) + ) assert test_world.paths[0][1] == False assert test_world.paths[1][1] == False @@ -132,9 +161,9 @@ def test_remove_path_on_fire_intersecting_paths(): """ Test for if two paths intersect only the one on fire disappears """ - populated_areas = np.array([[1,2], [2,1]]) - paths = [[[1,0],[1,1]], [[0,1], [1,1]]] - paths_to_pops = {0:[[1,2]], 1:[2,1]} + populated_areas = np.array([[1, 2], [2, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 1], [1, 1]]] + paths_to_pops = {0: [[1, 2]], 1: [2, 1]} num_rows = 5 num_cols = 5 @@ -147,20 +176,20 @@ def test_remove_path_on_fire_intersecting_paths(): test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1 test_world.update_paths_and_evactuations() - check_grid = np.zeros((num_rows,num_cols)) - check_grid[0,1] = 1 - check_grid[1,1] = 1 + check_grid = np.zeros((num_rows, num_cols)) + check_grid[0, 1] = 1 + check_grid[1, 1] = 1 assert np.array_equal(test_world.state_space[PATHS_INDEX], check_grid) def test_stop_evacuating(): """ - Test to make sure that if a populated area was taking a path that caught on fire, it stops - evacuating. + Test to make sure that if a populated area was taking a path that caught on fire, it stops + evacuating. """ - populated_areas = np.array([[1,2]]) - paths = [[[1,0],[1,1]]] - paths_to_pops = {0:[[1,2]]} + populated_areas = np.array([[1, 2]]) + paths = [[[1, 0], [1, 1]]] + paths_to_pops = {0: [[1, 2]]} num_rows = 5 num_cols = 5 @@ -180,7 +209,9 @@ def test_stop_evacuating(): test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1 test_world.update_paths_and_evactuations() - assert np.array_equal(test_world.state_space[EVACUATING_INDEX], np.zeros((num_rows, num_cols))) + assert np.array_equal( + test_world.state_space[EVACUATING_INDEX], np.zeros((num_rows, num_cols)) + ) assert 0 not in test_world.evacuating_paths @@ -188,24 +219,28 @@ def test_multiple_stop_evacuating(): """ Test to make sure that if two areas are taking the same path, they both stop evacuating. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]]] - paths_to_pops = {0:[[1,2], [0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]]] + paths_to_pops = {0: [[1, 2], [0, 1]]} num_rows = 5 num_cols = 5 # Initialize fire world test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - # turn off fires + # turn off fires test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols)) # Manually set popualted area to be evacuating first_populated_area = populated_areas[0] - test_world.state_space[EVACUATING_INDEX, first_populated_area[0], first_populated_area[1]] = 1 + test_world.state_space[ + EVACUATING_INDEX, first_populated_area[0], first_populated_area[1] + ] = 1 test_world.evacuating_paths[0] = [first_populated_area] second_populated_area = populated_areas[1] - test_world.state_space[EVACUATING_INDEX, second_populated_area[0], second_populated_area[1]] = 1 + test_world.state_space[ + EVACUATING_INDEX, second_populated_area[0], second_populated_area[1] + ] = 1 test_world.evacuating_paths[0].append(second_populated_area) # Set path populated areas are using to evacuate on fire @@ -213,25 +248,27 @@ def test_multiple_stop_evacuating(): test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1 test_world.update_paths_and_evactuations() - assert np.array_equal(test_world.state_space[EVACUATING_INDEX], np.zeros((num_rows, num_cols))) + assert np.array_equal( + test_world.state_space[EVACUATING_INDEX], np.zeros((num_rows, num_cols)) + ) assert 0 not in test_world.evacuating_paths def test_evacuation_decrement(): """ Test to make sure that if a path is evacuating its evacuating timestamp is decremented when a step - is taken. + is taken. """ - populated_areas = np.array([[1,2]]) - paths = [[[1,0],[1,1]]] - paths_to_pops = {0:[[1,2]]} + populated_areas = np.array([[1, 2]]) + paths = [[[1, 0], [1, 1]]] + paths_to_pops = {0: [[1, 2]]} num_rows = 5 num_cols = 5 # Initialize fire world test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - # turn off fires + # turn off fires test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols)) # set populated area evacuation timstamp @@ -246,19 +283,19 @@ def test_evacuation_decrement(): def test_multiple_evacuation_decrement(): """ - Test to make sure if two areas are evacuating on the same path, they both have their + Test to make sure if two areas are evacuating on the same path, they both have their timestamps decremented when a step is taken. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]]] - paths_to_pops = {0:[[1,2], [0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]]] + paths_to_pops = {0: [[1, 2], [0, 1]]} num_rows = 5 num_cols = 5 # Initialize fire world test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - # turn off fires + # turn off fires test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols)) # set populated areas evacuation timstamp @@ -277,19 +314,19 @@ def test_multiple_evacuation_decrement(): def test_finished_evacuating(): """ - Test to make sure that if a populated area finishes evacuating it is removed from the evacuating - populated areas state. + Test to make sure that if a populated area finishes evacuating it is removed from the evacuating + populated areas state. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]]] - paths_to_pops = {0:[[1,2], [0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]]] + paths_to_pops = {0: [[1, 2], [0, 1]]} num_rows = 5 num_cols = 5 # Initialize fire world test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - # turn off fires + # turn off fires test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols)) # set populated areas evacuation timstamp @@ -312,25 +349,25 @@ def test_set_actions(): """ Test to make sure self.actions is set up correctly. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 # Initialize fire world test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - - assert len(test_world.actions) == 4 #4 because action to do nothing adds 1 + + assert len(test_world.actions) == 4 # 4 because action to do nothing adds 1 def test_bad_action(): """ Test to make sure that if an action that doesn't exist is given, nothing happens """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -339,16 +376,16 @@ def test_bad_action(): old_state_space = np.copy(test_world.state_space) test_world.set_action(4) - assert(np.equal(test_world.state_space, old_state_space).all()) + assert np.equal(test_world.state_space, old_state_space).all() def test_do_nothing_action(): """ Test to make sure that if the "do nothing" action is given, nothing happens """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -357,16 +394,16 @@ def test_do_nothing_action(): old_state_space = np.copy(test_world.state_space) test_world.set_action(3) - assert(np.equal(test_world.state_space, old_state_space).all()) + assert np.equal(test_world.state_space, old_state_space).all() def test_burned_down_path(): """ Test to make sure that if the chosen path has burned down, nothing happens. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -384,18 +421,18 @@ def test_burned_down_path(): # Set action to take path on fire test_world.set_action(0) - assert(np.equal(old_evacuating_paths, test_world.evacuating_paths).all()) - assert(np.equal(old_state_space, test_world.state_space).all()) - assert(np.equal(old_evacuating_timestamps, test_world.evacuating_timestamps).all()) + assert np.equal(old_evacuating_paths, test_world.evacuating_paths).all() + assert np.equal(old_state_space, test_world.state_space).all() + assert np.equal(old_evacuating_timestamps, test_world.evacuating_timestamps).all() def test_burned_down_pop(): """ Test to make sure that if the chosen population center has burned down, nothing happens. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -413,27 +450,29 @@ def test_burned_down_pop(): # Set action for populated cell on fire test_world.set_action(0) - assert(np.equal(old_evacuating_paths, test_world.evacuating_paths).all()) - assert(np.equal(old_state_space, test_world.state_space).all()) - assert(np.equal(old_evacuating_timestamps, test_world.evacuating_timestamps).all()) + assert np.equal(old_evacuating_paths, test_world.evacuating_paths).all() + assert np.equal(old_state_space, test_world.state_space).all() + assert np.equal(old_evacuating_timestamps, test_world.evacuating_timestamps).all() def test_already_evacuating(): """ Test to make sure that if populated cell is already evacuating, nothing happens when it is told to evacuate again """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 # Initialize fire world test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - test_world.evacuating_timestamps[1, 2] = 9 # intentially make this lower than default to see if it's reset + test_world.evacuating_timestamps[1, 2] = ( + 9 # intentially make this lower than default to see if it's reset + ) test_world.state_space[EVACUATING_INDEX, 1, 2] = 1 - test_world.evacuating_paths[0] = [[1,2]] + test_world.evacuating_paths[0] = [[1, 2]] old_evacuating_paths = np.copy(test_world.evacuating_paths) old_state_space = np.copy(test_world.state_space) @@ -441,18 +480,18 @@ def test_already_evacuating(): test_world.set_action(0) - assert(np.equal(old_evacuating_paths, test_world.evacuating_paths).all()) - assert(np.equal(old_state_space, test_world.state_space).all()) - assert(np.equal(old_evacuating_timestamps, test_world.evacuating_timestamps).all()) + assert np.equal(old_evacuating_paths, test_world.evacuating_paths).all() + assert np.equal(old_state_space, test_world.state_space).all() + assert np.equal(old_evacuating_timestamps, test_world.evacuating_timestamps).all() def test_pop_taking_first_action(): """ Test to make sure that taking an action for the first time for a populated cell works """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -461,18 +500,18 @@ def test_pop_taking_first_action(): test_world.set_action(1) - assert(test_world.evacuating_paths[0] == [[0,1]]) - assert(test_world.state_space[EVACUATING_INDEX, 0, 1] == 1) - assert(test_world.evacuating_timestamps[0,1] == 10) + assert test_world.evacuating_paths[0] == [[0, 1]] + assert test_world.state_space[EVACUATING_INDEX, 0, 1] == 1 + assert test_world.evacuating_timestamps[0, 1] == 10 def test_pop_taking_first_action(): """ Test to make sure that taking an action for the first time for a populated cell works """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -481,17 +520,18 @@ def test_pop_taking_first_action(): test_world.set_action(1) - assert(test_world.evacuating_paths[0] == [[0,1]]) - assert(test_world.state_space[EVACUATING_INDEX, 0, 1] == 1) - assert(test_world.evacuating_timestamps[0,1] == 10) + assert test_world.evacuating_paths[0] == [[0, 1]] + assert test_world.state_space[EVACUATING_INDEX, 0, 1] == 1 + assert test_world.evacuating_timestamps[0, 1] == 10 + def test_multiple_pop_cells_same_path(): """ Test to make sure that taking an action works for one populated area if another populated area is already taking the same path. """ - populated_areas = np.array([[1,2], [0,1]]) - paths = [[[1,0],[1,1]],[[0,0]]] - paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]} + populated_areas = np.array([[1, 2], [0, 1]]) + paths = [[[1, 0], [1, 1]], [[0, 0]]] + paths_to_pops = {0: [[1, 2], [0, 1]], 1: [[0, 1]]} num_rows = 5 num_cols = 5 @@ -501,8 +541,8 @@ def test_multiple_pop_cells_same_path(): test_world.set_action(0) test_world.set_action(1) - assert(test_world.evacuating_paths[0] == [[1,2], [0,1]]) - assert(test_world.state_space[EVACUATING_INDEX, 0, 1] == 1) - assert(test_world.evacuating_timestamps[0,1] == 10) - assert(test_world.state_space[EVACUATING_INDEX, 1, 2] == 1) - assert(test_world.evacuating_timestamps[1,2] == 10) \ No newline at end of file + assert test_world.evacuating_paths[0] == [[1, 2], [0, 1]] + assert test_world.state_space[EVACUATING_INDEX, 0, 1] == 1 + assert test_world.evacuating_timestamps[0, 1] == 10 + assert test_world.state_space[EVACUATING_INDEX, 1, 2] == 1 + assert test_world.evacuating_timestamps[1, 2] == 10 diff --git a/tests/package_test.py b/tests/package_test.py index b5cb3e6..e8f617d 100644 --- a/tests/package_test.py +++ b/tests/package_test.py @@ -1,45 +1,70 @@ """ Testing the OpenAI Gym Package itself. """ + import gymnasium import numpy as np import pygame -import pytest +import pytest import pyrorl + def PGEvent(): def __init__(self): self.type = pygame.QUIT + def test_constructor(): """ Test the constructor to make sure all variables are accounted for. """ # Set up parameters num_rows, num_cols = 10, 10 - populated_areas = np.array([[1,2],[4,8], [6,4], [8, 7]]) - paths = np.array([[[1,0],[1,1]], [[2,2],[3,2],[4,2],[4,1],[4,0]], [[2,9],[2,8],[3,8]], [[5,8],[6,8],[6,9]], [[7,7], [6,7], [6,8], [6,9]], [[8,6], [8,5], [9,5]], [[8,5], [9,5], [7,5],[7,4]]], dtype=object) - paths_to_pops = {0:[[1,2]], 1:[[1,2]], 2: [[4,8]], 3:[[4,8]], 4:[[8, 7]], 5:[[8, 7]], 6:[[6,4]]} + populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]]) + paths = np.array( + [ + [[1, 0], [1, 1]], + [[2, 2], [3, 2], [4, 2], [4, 1], [4, 0]], + [[2, 9], [2, 8], [3, 8]], + [[5, 8], [6, 8], [6, 9]], + [[7, 7], [6, 7], [6, 8], [6, 9]], + [[8, 6], [8, 5], [9, 5]], + [[8, 5], [9, 5], [7, 5], [7, 4]], + ], + dtype=object, + ) + paths_to_pops = { + 0: [[1, 2]], + 1: [[1, 2]], + 2: [[4, 8]], + 3: [[4, 8]], + 4: [[8, 7]], + 5: [[8, 7]], + 6: [[6, 4]], + } # Create environment kwargs = { - 'num_rows': num_rows, - 'num_cols': num_cols, - 'populated_areas': populated_areas, - 'paths': paths, - 'paths_to_pops': paths_to_pops + "num_rows": num_rows, + "num_cols": num_cols, + "populated_areas": populated_areas, + "paths": paths, + "paths_to_pops": paths_to_pops, } - env = gymnasium.make('pyrorl/PyroRL-v0', **kwargs) - + env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs) + # Make basic checks for the constructor - assert(env.num_rows == num_rows) - assert(env.num_cols == num_cols) + assert env.num_rows == num_rows + assert env.num_cols == num_cols np.testing.assert_array_equal(env.populated_areas, populated_areas) np.testing.assert_array_equal(env.paths, paths) # Special check for paths to populated areas for key in paths_to_pops: - np.testing.assert_array_equal(np.array(env.paths_to_pops[key]), np.array(paths_to_pops[key])) + np.testing.assert_array_equal( + np.array(env.paths_to_pops[key]), np.array(paths_to_pops[key]) + ) + def test_reset(): """ @@ -47,19 +72,38 @@ def test_reset(): """ # Set up parameters num_rows, num_cols = 10, 10 - populated_areas = np.array([[1,2],[4,8], [6,4], [8, 7]]) - paths = np.array([[[1,0],[1,1]], [[2,2],[3,2],[4,2],[4,1],[4,0]], [[2,9],[2,8],[3,8]], [[5,8],[6,8],[6,9]], [[7,7], [6,7], [6,8], [6,9]], [[8,6], [8,5], [9,5]], [[8,5], [9,5], [7,5],[7,4]]], dtype=object) - paths_to_pops = {0:[[1,2]], 1:[[1,2]], 2: [[4,8]], 3:[[4,8]], 4:[[8, 7]], 5:[[8, 7]], 6:[[6,4]]} + populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]]) + paths = np.array( + [ + [[1, 0], [1, 1]], + [[2, 2], [3, 2], [4, 2], [4, 1], [4, 0]], + [[2, 9], [2, 8], [3, 8]], + [[5, 8], [6, 8], [6, 9]], + [[7, 7], [6, 7], [6, 8], [6, 9]], + [[8, 6], [8, 5], [9, 5]], + [[8, 5], [9, 5], [7, 5], [7, 4]], + ], + dtype=object, + ) + paths_to_pops = { + 0: [[1, 2]], + 1: [[1, 2]], + 2: [[4, 8]], + 3: [[4, 8]], + 4: [[8, 7]], + 5: [[8, 7]], + 6: [[6, 4]], + } # Create environment kwargs = { - 'num_rows': num_rows, - 'num_cols': num_cols, - 'populated_areas': populated_areas, - 'paths': paths, - 'paths_to_pops': paths_to_pops + "num_rows": num_rows, + "num_cols": num_cols, + "populated_areas": populated_areas, + "paths": paths, + "paths_to_pops": paths_to_pops, } - env = gymnasium.make('pyrorl/PyroRL-v0', **kwargs) + env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs) # Run a simple loop of the environment env.reset() @@ -68,17 +112,20 @@ def test_reset(): # Take action and observation action = env.action_space.sample() observation, reward, terminated, truncated, info = env.step(action) - + # Check that reset makes it all the same env.reset() - assert(env.num_rows == num_rows) - assert(env.num_cols == num_cols) + assert env.num_rows == num_rows + assert env.num_cols == num_cols np.testing.assert_array_equal(env.populated_areas, populated_areas) np.testing.assert_array_equal(env.paths, paths) # Special check for paths to populated areas for key in paths_to_pops: - np.testing.assert_array_equal(np.array(env.paths_to_pops[key]), np.array(paths_to_pops[key])) + np.testing.assert_array_equal( + np.array(env.paths_to_pops[key]), np.array(paths_to_pops[key]) + ) + def test_reset(mocker): """ @@ -86,30 +133,46 @@ def test_reset(mocker): """ # Set up parameters num_rows, num_cols = 10, 10 - populated_areas = np.array([[1,2],[4,8], [6,4], [8, 7]]) - paths = np.array([[[1,0],[1,1]], [[2,2],[3,2],[4,2],[4,1],[4,0]], [[2,9],[2,8],[3,8]], [[5,8],[6,8],[6,9]], [[7,7], [6,7], [6,8], [6,9]], [[8,6], [8,5], [9,5]], [[8,5], [9,5], [7,5],[7,4]]], dtype=object) - paths_to_pops = {0:[[1,2]], 1:[[1,2]], 2: [[4,8]], 3:[[4,8]], 4:[[8, 7]], 5:[[8, 7]], 6:[[6,4]]} + populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]]) + paths = np.array( + [ + [[1, 0], [1, 1]], + [[2, 2], [3, 2], [4, 2], [4, 1], [4, 0]], + [[2, 9], [2, 8], [3, 8]], + [[5, 8], [6, 8], [6, 9]], + [[7, 7], [6, 7], [6, 8], [6, 9]], + [[8, 6], [8, 5], [9, 5]], + [[8, 5], [9, 5], [7, 5], [7, 4]], + ], + dtype=object, + ) + paths_to_pops = { + 0: [[1, 2]], + 1: [[1, 2]], + 2: [[4, 8]], + 3: [[4, 8]], + 4: [[8, 7]], + 5: [[8, 7]], + 6: [[6, 4]], + } # Create environment kwargs = { - 'num_rows': num_rows, - 'num_cols': num_cols, - 'populated_areas': populated_areas, - 'paths': paths, - 'paths_to_pops': paths_to_pops + "num_rows": num_rows, + "num_cols": num_cols, + "populated_areas": populated_areas, + "paths": paths, + "paths_to_pops": paths_to_pops, } - env = gymnasium.make('pyrorl/PyroRL-v0', **kwargs) + env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs) # Render and make simple test check env.reset() - display_mock = mocker.patch('pygame.display') - draw_mock = mocker.patch('pygame.draw') - image_mock = mocker.patch('pygame.image') - event_mock = mocker.patch('pygame.event.get') + display_mock = mocker.patch("pygame.display") + draw_mock = mocker.patch("pygame.draw") + image_mock = mocker.patch("pygame.image") + event_mock = mocker.patch("pygame.event.get") event_mock.return_value = [pygame.event.Event(pygame.QUIT)] env.render() - #pygame.quit() + # pygame.quit() pygame.display.set_mode.assert_called_once_with([600, 725]) - - -