From 3d6ffbd97e6c5a690cc233f0ecd96b64ed571166 Mon Sep 17 00:00:00 2001 From: joey-obrien Date: Wed, 3 Apr 2024 15:40:54 -0700 Subject: [PATCH] completed map code with testing --- examples/map_gen_example.py | 58 +++++ pyrorl/pyrorl/envs/environment/environment.py | 2 +- pyrorl/pyrorl/envs/map_helpers/__init__.py | 0 .../envs/map_helpers/create_map_info.py | 211 +++++++++++------- tests/map_gen_test.py | 96 ++++++++ 5 files changed, 280 insertions(+), 87 deletions(-) create mode 100644 examples/map_gen_example.py create mode 100644 pyrorl/pyrorl/envs/map_helpers/__init__.py create mode 100644 tests/map_gen_test.py diff --git a/examples/map_gen_example.py b/examples/map_gen_example.py new file mode 100644 index 0000000..0721ff5 --- /dev/null +++ b/examples/map_gen_example.py @@ -0,0 +1,58 @@ +""" +Example of how to use the wildfire evacuation RL environment. +""" + +import gymnasium +import numpy as np +import pyrorl +import shutil +from pyrorl.envs.map_helpers.create_map_info import * + + +if __name__ == "__main__": + """ + Run basic environment. + """ + # Set up parameters + num_rows, num_cols = 20, 20 + num_populated_areas = 5 + + # example of generating map (other parameters are set to their default values) + populated_areas, paths, paths_to_pops = generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True, steps_lower_bound = 2, steps_upper_bound = 4, percent_go_straight = 50, num_paths_mean = 3, num_paths_stdev = 1) + + # showing how to load in map just created for good measure, would otherwise provide the + # desired map pth to load_map_info + map_info_root = os.path.join(os.getcwd(), MAP_DIRECTORY) + current_map_directory = max(os.listdir(map_info_root), key=lambda f: os.path.getctime(os.path.join(map_info_root, f))) + map_info_path = os.path.join(map_info_root, current_map_directory) + num_rows, num_cols, populated_areas, paths, paths_to_pops, num_populated_areas = load_map_info(map_info_path) + + # destroy the saved map info created for this example + shutil.rmtree(map_info_path) + if len(os.listdir(map_info_root)) == 0: + shutil.rmtree(map_info_root) + + # Create environment + kwargs = { + "num_rows": num_rows, + "num_cols": num_cols, + "populated_areas": populated_areas, + "paths": paths, + "paths_to_pops": paths_to_pops, + } + env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs) + + # Run a simple loop of the environment + env.reset() + for _ in range(10): + + # Take action and observation + action = env.action_space.sample() + observation, reward, terminated, truncated, info = env.step(action) + + # Render environment and print reward + env.render() + print("Reward: " + str(reward)) + + # Generate the gif + env.generate_gif() \ No newline at end of file diff --git a/pyrorl/pyrorl/envs/environment/environment.py b/pyrorl/pyrorl/envs/environment/environment.py index 5e310b9..7bf0617 100644 --- a/pyrorl/pyrorl/envs/environment/environment.py +++ b/pyrorl/pyrorl/envs/environment/environment.py @@ -164,7 +164,7 @@ def __init__( self.paths: List[List[Any]] = [] for path in paths: path_array = np.array(path) - path_rows, path_cols = path_array[:, 0], path_array[:, 1] + path_rows, path_cols = path_array[:, 0].astype(int), path_array[:, 1].astype(int) self.state_space[PATHS_INDEX, path_rows, path_cols] += 1 # Each path in self.paths is a list that records what the path is and diff --git a/pyrorl/pyrorl/envs/map_helpers/__init__.py b/pyrorl/pyrorl/envs/map_helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyrorl/pyrorl/envs/map_helpers/create_map_info.py b/pyrorl/pyrorl/envs/map_helpers/create_map_info.py index d489f46..0c1f0a2 100644 --- a/pyrorl/pyrorl/envs/map_helpers/create_map_info.py +++ b/pyrorl/pyrorl/envs/map_helpers/create_map_info.py @@ -2,116 +2,154 @@ import numpy as np import pickle as pkl import os -from pyrorl.envs.environment.environment import FireWorld +from datetime import datetime -# randomly generate populated areas -def generate_pop_locations(num_rows, num_cols, num_populated_areas): - populated_areas = set() - for _ in range(num_populated_areas): - pop_row = random.randint(0, num_rows - 1) - pop_col = random.randint(0, num_cols - 1) - while (pop_row, pop_col) in populated_areas: - pop_row = random.randint(0, num_rows - 1) - pop_col = random.randint(0, num_cols - 1) - populated_areas.add((pop_row, pop_col)) - populated_areas = np.array(list(populated_areas)) - return populated_areas - -def create_orientations(): - orientations = {} - orientations["north"] = { +DIRECTIONS = {0: "straight", 1: "right", 2: "left"} +ORIENTATONS = { + "north": { "left": [[0, -1], "west"], "right": [[0, 1], "east"], "straight": [[-1, 0], "north"], - } - orientations["south"] = { + }, + "south": { "left": [[0, 1], "east"], "right": [[0, -1], "west"], "straight": [[1, 0], "south"], - } - orientations["east"] = { + }, + "east": { "left": [[-1, 0], "north"], "right": [[1, 0], "south"], "straight": [[0, 1], "east"], - } - orientations["west"] = { + }, + "west": { "left": [[1, 0], "south"], "right": [[-1, 0], "north"], "straight": [[0, -1], "west"], } - return orientations +} +MAP_DIRECTORY = "pyrorl_map_info" + + +def generate_pop_locations(num_rows, num_cols, num_populated_areas): + """ + Randomly generate populated areas. + """ + populated_areas = set() + for _ in range(num_populated_areas): + pop_row = random.randint(1, num_rows - 2) + pop_col = random.randint(1, num_cols - 2) + # make sure that n + while (pop_row, pop_col) in populated_areas: + pop_row = random.randint(1, num_rows - 2) + pop_col = random.randint(1, num_cols - 2) + populated_areas.add((pop_row, pop_col)) + populated_areas = np.array(list(populated_areas)) + return populated_areas -def save_map_info(num_rows, num_cols, percent_map_populated, populated_areas, paths, paths_to_pops): + +def save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops): + # the map information is saved in the user's current working directory user_working_directory = os.getcwd() - maps_info_directory = os.path.join(user_working_directory, "pyrorl_map_info") + maps_info_directory = os.path.join(user_working_directory, MAP_DIRECTORY) if not os.path.exists(maps_info_directory): os.makedirs(maps_info_directory) - current_map_info = str(num_rows)+ "_rows_" + str(num_cols) + "_cols_" + str(percent_map_populated) + "_percent_map_populated" - generation = 0 - current_map_directory = os.path.join(maps_info_directory, current_map_info + "_generation" + str(generation)) - while os.path.exists(current_map_directory): - generation += 1 - current_map_directory = os.path.join(maps_info_directory, current_map_info + "_generation" + str(generation)) + + # make a new subdirectory for the current map's information + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_map_directory = os.path.join(maps_info_directory, timestamp) os.makedirs(current_map_directory) - populated_areas_filename = os.path.join(current_map_directory, "popualted_areas_array.pkl") + + # put the number of rows, number of columns, and number of populated areas + # in text file for user to reference data + map_info_filename = os.path.join(current_map_directory, "map_info.txt") + with open(map_info_filename, "w") as f: + row_info = "num_rows: " + str(num_rows) + "\n" + f.write(row_info) + col_info = "num_cols: " + str(num_cols) + "\n" + f.write(col_info) + percent_pop_info = "num_populated_areas: " + str(num_populated_areas) + f.write(percent_pop_info) + + # saved the populated areas array + populated_areas_filename = os.path.join(current_map_directory, "populated_areas_array.pkl") with open(populated_areas_filename, 'wb') as f: pkl.dump(populated_areas, f) + + # save the paths array paths_filename = os.path.join(current_map_directory, "paths_array.pkl") with open(paths_filename, 'wb') as f: pkl.dump(paths, f) + + # save the paths to pops array paths_to_pops_filename = os.path.join(current_map_directory, "paths_to_pops_array.pkl") with open(paths_to_pops_filename, 'wb') as f: pkl.dump(paths_to_pops, f) - map_size_and_percent_popualted_list = [num_rows, num_cols, percent_map_populated] - map_size_and_percent_popualted_list_filename = os.path.join(current_map_directory, "map_size_and_percent_popualted_list.pkl") - with open(map_size_and_percent_popualted_list_filename, 'wb') as f: - pkl.dump(map_size_and_percent_popualted_list, f) + + # save the number of rows, number of columns, and number of populated areas + map_size_and_percent_populated_list = [num_rows, num_cols, num_populated_areas] + map_size_and_percent_populated_list_filename = os.path.join(current_map_directory, "map_size_and_percent_populated_list.pkl") + with open(map_size_and_percent_populated_list_filename, 'wb') as f: + pkl.dump(map_size_and_percent_populated_list, f) def load_map_info(map_directory_path): - populated_areas_filename = os.path.join(map_directory_path, "popualted_areas_array.pkl") + # load the populated areas array + populated_areas_filename = os.path.join(map_directory_path, "populated_areas_array.pkl") with open(populated_areas_filename, 'rb') as f: populated_areas = pkl.load(f) + + # load the paths array paths_filename = os.path.join(map_directory_path, "paths_array.pkl") with open(paths_filename, 'rb') as f: paths = pkl.load(f) + + # load the paths to pops array paths_to_pops_filename = os.path.join(map_directory_path, "paths_to_pops_array.pkl") with open(paths_to_pops_filename, 'rb') as f: paths_to_pops = pkl.load(f) - map_size_and_percent_popualted_list_filename = os.path.join(map_directory_path, "map_size_and_percent_popualted_list.pkl") - with open(map_size_and_percent_popualted_list_filename, 'rb') as f: - map_size_and_percent_popualted_list = pkl.load(f) - num_rows = map_size_and_percent_popualted_list[0] - num_cols = map_size_and_percent_popualted_list[1] - percent_map_populated = map_size_and_percent_popualted_list[2] - return num_rows, num_cols, populated_areas, paths, paths_to_pops, percent_map_populated - - -def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True): - if percent_map_populated > 100: + + # load the number of rows, number of columns, and number of populated areas + map_size_and_percent_populated_list_filename = os.path.join(map_directory_path, "map_size_and_percent_populated_list.pkl") + with open(map_size_and_percent_populated_list_filename, 'rb') as f: + map_size_and_percent_populated_list = pkl.load(f) + num_rows = map_size_and_percent_populated_list[0] + num_cols = map_size_and_percent_populated_list[1] + num_populated_areas = map_size_and_percent_populated_list[2] + return num_rows, num_cols, populated_areas, paths, paths_to_pops, num_populated_areas + + +def generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True, steps_lower_bound = 2, steps_upper_bound = 4, percent_go_straight = 50, num_paths_mean = 3, num_paths_stdev = 1): + if num_populated_areas > (num_rows*num_cols - (2 * num_rows + 2 * num_cols)): raise Exception("Cannot have more than 100 percent of the map be populated!") - - orientations = create_orientations() - directions = {0: "left", 1: "right", 2: "straight"} + if num_rows <= 0: + raise Exception("Number of rows must be a positive value!") + if num_cols <= 0: + raise Exception("Number of columns must be a positive value!") + if percent_go_straight > 99: + raise Exception("Cannot have the percent chance of going straight be greater than 99!") + if num_paths_mean < 1: + raise Exception("The mean for the number of paths cannot be less than 1!") + if steps_lower_bound > steps_upper_bound: + raise Exception("The lower bound for the number of steps cannot be greater than the upper bound!") + if steps_lower_bound < 1 or steps_upper_bound < 1: + raise Exception("The bounds for the number of steps cannot be less than 1!") paths_to_pops = {} - num_populated_areas = int(num_rows * num_cols * percent_map_populated * 0.01) populated_areas = generate_pop_locations( num_rows, num_cols, num_populated_areas ) # the number of paths for each populated area is chosen from normal distribution - num_paths_array = np.random.normal(3, 1, num_populated_areas).astype(int) + num_paths_array = np.random.normal(num_paths_mean, num_paths_stdev, num_populated_areas).astype(int) # each populated area must have at least one path - while 0 in num_paths_array: - num_paths_array = np.random.normal(3, 1, num_populated_areas).astype(int) + num_paths_array[num_paths_array < 1] = 1 paths = [] path_num = 0 for i in range(len(populated_areas)): pop_row, pop_col = populated_areas[i] - - num_pop_paths_created = 0 # for cases where a path couldn't be made + # for cases where a path couldn't be made + num_pop_paths_created = 0 while num_pop_paths_created < num_paths_array[i]: current_path = [] @@ -131,37 +169,42 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True # we want to make sure that the current path will not intersect with itself direction_chosen = False - num_steps = random.randint(2, 4) + num_steps = random.randint(steps_lower_bound, steps_upper_bound) while not direction_chosen: - # have a bias toward going straight - direction_index = random.randint(0, 1) - if direction_index == 2: + # choose whether to go straight, left, or right based + # on percent_go_straight -> if we don't go straight, + # we go left or right with equal probability + direction_index = 0 + percent_value = random.randint(0, 100) + if percent_value > percent_go_straight: direction_index = random.randint(1, 2) - direction = directions[direction_index] + direction = DIRECTIONS[direction_index] - if orientation == "north" and direction == "left": - if cur_row - num_steps < x_min: + if orientation == "north" and direction != "straight": + if cur_row == x_min: direction_chosen = True - elif orientation == "south" and direction == "right": - if cur_row + num_steps > x_max: + elif orientation == "south" and direction != "straight": + if cur_row == x_max: direction_chosen = True - elif orientation == "east" and direction == "left": - if cur_col + num_steps > y_max: + elif orientation == "east" and direction != "straight": + if cur_col == y_max: direction_chosen = True - elif orientation == "west" and direction == "right": - if cur_col - num_steps < y_min: + elif orientation == "west" and direction != "straight": + if cur_col == y_min: direction_chosen = True else: direction_chosen = True - row_update = orientations[orientation][direction][0][0] - col_update = orientations[orientation][direction][0][1] + row_update = ORIENTATONS[orientation][direction][0][0] + col_update = ORIENTATONS[orientation][direction][0][1] for _ in range(num_steps): cur_row += row_update cur_col += col_update + # update bounds if necessary + # (so that paths do not intersect with themselves) if cur_row > x_max: x_max = cur_row if cur_row < x_min: @@ -171,7 +214,8 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True if cur_col < y_min: y_min = cur_col - # the population center is on the edge of the map, so we don't want to add a path in this direction + # the population center is on the edge of the map, + # so we don't want to add a path in this direction if ( cur_row == -1 or cur_row == num_cols @@ -184,11 +228,14 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True current_path.append([cur_row, cur_col]) if ( cur_row == 0 - or cur_row == num_cols - 1 + or cur_row == num_rows - 1 or cur_col == 0 - or cur_col == num_rows - 1 + or cur_col == num_cols - 1 ): + # we want unique paths done = True + if current_path in paths: + break paths.append(current_path) paths_to_pops[path_num] = [[pop_row, pop_col]] path_num += 1 @@ -196,15 +243,7 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True break # update orientation - orientation = orientations[orientation][direction][1] + orientation = ORIENTATONS[orientation][direction][1] if save_map: - save_map_info(num_rows, num_cols, percent_map_populated, populated_areas, paths, paths_to_pops) + save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops) return populated_areas, np.array(paths, dtype=object), paths_to_pops - -# populated_areas, paths, paths_to_pops = generate_map_info(10, 10, 5) - -num_rows, num_cols, populated_areas, paths, paths_to_pops, percent_map_populated = load_map_info("./pyrorl_map_info/10_rows_10_cols_5_percent_map_populated_generation0") - -example_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) - -print(example_world.state_space[4]) diff --git a/tests/map_gen_test.py b/tests/map_gen_test.py new file mode 100644 index 0000000..5904678 --- /dev/null +++ b/tests/map_gen_test.py @@ -0,0 +1,96 @@ +""" +Unit tests for each of the functions in create_map_info.py +""" +import numpy as np +from pyrorl.envs.map_helpers.create_map_info import * +from pyrorl.envs.environment.environment import * +import shutil +import pytest + +def test_path_structure(): + """ + Make sure that paths are continuous and reach the end of the map. + """ + num_rows = 200 + num_cols = 200 + num_populated_areas = 20 + _, paths, _ = generate_map_info(num_rows, num_cols, num_populated_areas, save_map=False) + for path in paths: + previous_cell = None + for cell in path: + if previous_cell is None: + previous_cell = cell + continue + if (previous_cell[0] != cell[0] - 1 and previous_cell[0] != cell[0] + 1 + and previous_cell[1] != cell[1] - 1 and previous_cell[1] != cell[1] + 1): + raise Exception("Path is not continuous!") + previous_cell = cell + if (previous_cell[0] != 0 and previous_cell[1] != 0 + and previous_cell[0] != num_rows - 1 and previous_cell[1] != num_cols -1): + raise Exception("Path does not reach end of map!") + +def test_paths_not_fold(): + """ + Make sure that paths do not fold in on themselves. + """ + num_rows = 1000 + num_cols = 1000 + num_populated_areas = 1 + for _ in range(100): + populated_areas, paths, paths_to_pops = generate_map_info(num_rows, num_cols, num_populated_areas, save_map=False, num_paths_mean = 1, num_paths_stdev = 0) + test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) + test_world.state_space[PATHS_INDEX][test_world.state_space[PATHS_INDEX] == 1] = 0 + if np.sum(test_world.state_space[PATHS_INDEX]) > 0: + raise Exception("Paths should not fold in on themselves!") + +def test_path_each_area(): + """ + Make sure that each populated area has at least one path. + """ + for i in range(5000): + num_rows = 50 + num_cols = 50 + num_populated_areas = 10 + _, _, paths_to_pops = generate_map_info(num_rows, num_cols, num_populated_areas, save_map=False) + pops_seen = set() + for path in paths_to_pops: + for pop_cell in paths_to_pops[path]: + pops_seen.add(tuple(pop_cell)) + if len(pops_seen) != num_populated_areas: + raise Exception("Every populated area does not have at least one path!") + +def test_each_path_has_pop(): + """ + Make sure that each path has at least one populated area. + """ + num_rows = 50 + num_cols = 50 + num_populated_areas = 10 + _, _, paths_to_pops = generate_map_info(num_rows, num_cols, num_populated_areas, save_map=False) + for path in paths_to_pops: + if len(paths_to_pops[path]) == 0: + raise Exception("Every path does not have a populated area!") + +def test_map_loading_and_saving(): + """ + Make sure that loading and saving a map works properly. + """ + num_rows = 10 + num_cols = 10 + num_populated_areas = 10 + populated_areas, paths, paths_to_pops = generate_map_info(num_rows, num_cols, num_populated_areas) + original_fireworld = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops) + map_info_root = os.path.join(os.getcwd(), MAP_DIRECTORY) + current_map_directory = max(os.listdir(map_info_root), key=lambda f: os.path.getctime(os.path.join(map_info_root, f))) + map_info_path = os.path.join(map_info_root, current_map_directory) + loaded_num_rows, loaded_num_cols, loaded_populated_areas, loaded_paths, loaded_paths_to_pops, loaded_num_populated_areas = load_map_info(map_info_path) + shutil.rmtree(map_info_path) + if len(os.listdir(map_info_root)) == 0: + shutil.rmtree(map_info_root) + loaded_fireworld = FireWorld(loaded_num_rows, loaded_num_cols, loaded_populated_areas, loaded_paths, loaded_paths_to_pops) + + assert num_rows == loaded_num_rows + assert num_cols == loaded_num_cols + assert num_populated_areas == loaded_num_populated_areas + assert np.equal(original_fireworld.state_space[POPULATED_INDEX], loaded_fireworld.state_space[POPULATED_INDEX]).all() + assert np.equal(original_fireworld.state_space[PATHS_INDEX], loaded_fireworld.state_space[PATHS_INDEX]).all() \ No newline at end of file