Skip to content

Commit

Permalink
completed map code with testing
Browse files Browse the repository at this point in the history
  • Loading branch information
joey-obrien committed Apr 3, 2024
1 parent 1eb3cf1 commit 3d6ffbd
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 87 deletions.
58 changes: 58 additions & 0 deletions examples/map_gen_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Example of how to use the wildfire evacuation RL environment.
"""

import gymnasium
import numpy as np
import pyrorl
import shutil
from pyrorl.envs.map_helpers.create_map_info import *


if __name__ == "__main__":
"""
Run basic environment.
"""
# Set up parameters
num_rows, num_cols = 20, 20
num_populated_areas = 5

# example of generating map (other parameters are set to their default values)
populated_areas, paths, paths_to_pops = generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True, steps_lower_bound = 2, steps_upper_bound = 4, percent_go_straight = 50, num_paths_mean = 3, num_paths_stdev = 1)

# showing how to load in map just created for good measure, would otherwise provide the
# desired map pth to load_map_info
map_info_root = os.path.join(os.getcwd(), MAP_DIRECTORY)
current_map_directory = max(os.listdir(map_info_root), key=lambda f: os.path.getctime(os.path.join(map_info_root, f)))
map_info_path = os.path.join(map_info_root, current_map_directory)
num_rows, num_cols, populated_areas, paths, paths_to_pops, num_populated_areas = load_map_info(map_info_path)

# destroy the saved map info created for this example
shutil.rmtree(map_info_path)
if len(os.listdir(map_info_root)) == 0:
shutil.rmtree(map_info_root)

# Create environment
kwargs = {
"num_rows": num_rows,
"num_cols": num_cols,
"populated_areas": populated_areas,
"paths": paths,
"paths_to_pops": paths_to_pops,
}
env = gymnasium.make("pyrorl/PyroRL-v0", **kwargs)

# Run a simple loop of the environment
env.reset()
for _ in range(10):

# Take action and observation
action = env.action_space.sample()
observation, reward, terminated, truncated, info = env.step(action)

# Render environment and print reward
env.render()
print("Reward: " + str(reward))

# Generate the gif
env.generate_gif()
2 changes: 1 addition & 1 deletion pyrorl/pyrorl/envs/environment/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def __init__(
self.paths: List[List[Any]] = []
for path in paths:
path_array = np.array(path)
path_rows, path_cols = path_array[:, 0], path_array[:, 1]
path_rows, path_cols = path_array[:, 0].astype(int), path_array[:, 1].astype(int)
self.state_space[PATHS_INDEX, path_rows, path_cols] += 1

# Each path in self.paths is a list that records what the path is and
Expand Down
Empty file.
211 changes: 125 additions & 86 deletions pyrorl/pyrorl/envs/map_helpers/create_map_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,116 +2,154 @@
import numpy as np
import pickle as pkl
import os
from pyrorl.envs.environment.environment import FireWorld
from datetime import datetime

# randomly generate populated areas
def generate_pop_locations(num_rows, num_cols, num_populated_areas):
populated_areas = set()
for _ in range(num_populated_areas):
pop_row = random.randint(0, num_rows - 1)
pop_col = random.randint(0, num_cols - 1)
while (pop_row, pop_col) in populated_areas:
pop_row = random.randint(0, num_rows - 1)
pop_col = random.randint(0, num_cols - 1)
populated_areas.add((pop_row, pop_col))
populated_areas = np.array(list(populated_areas))
return populated_areas

def create_orientations():
orientations = {}
orientations["north"] = {
DIRECTIONS = {0: "straight", 1: "right", 2: "left"}
ORIENTATONS = {
"north": {
"left": [[0, -1], "west"],
"right": [[0, 1], "east"],
"straight": [[-1, 0], "north"],
}
orientations["south"] = {
},
"south": {
"left": [[0, 1], "east"],
"right": [[0, -1], "west"],
"straight": [[1, 0], "south"],
}
orientations["east"] = {
},
"east": {
"left": [[-1, 0], "north"],
"right": [[1, 0], "south"],
"straight": [[0, 1], "east"],
}
orientations["west"] = {
},
"west": {
"left": [[1, 0], "south"],
"right": [[-1, 0], "north"],
"straight": [[0, -1], "west"],
}
return orientations
}
MAP_DIRECTORY = "pyrorl_map_info"


def generate_pop_locations(num_rows, num_cols, num_populated_areas):
"""
Randomly generate populated areas.
"""
populated_areas = set()
for _ in range(num_populated_areas):
pop_row = random.randint(1, num_rows - 2)
pop_col = random.randint(1, num_cols - 2)
# make sure that n
while (pop_row, pop_col) in populated_areas:
pop_row = random.randint(1, num_rows - 2)
pop_col = random.randint(1, num_cols - 2)
populated_areas.add((pop_row, pop_col))
populated_areas = np.array(list(populated_areas))
return populated_areas

def save_map_info(num_rows, num_cols, percent_map_populated, populated_areas, paths, paths_to_pops):

def save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops):
# the map information is saved in the user's current working directory
user_working_directory = os.getcwd()
maps_info_directory = os.path.join(user_working_directory, "pyrorl_map_info")
maps_info_directory = os.path.join(user_working_directory, MAP_DIRECTORY)
if not os.path.exists(maps_info_directory):
os.makedirs(maps_info_directory)
current_map_info = str(num_rows)+ "_rows_" + str(num_cols) + "_cols_" + str(percent_map_populated) + "_percent_map_populated"
generation = 0
current_map_directory = os.path.join(maps_info_directory, current_map_info + "_generation" + str(generation))
while os.path.exists(current_map_directory):
generation += 1
current_map_directory = os.path.join(maps_info_directory, current_map_info + "_generation" + str(generation))

# make a new subdirectory for the current map's information
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_map_directory = os.path.join(maps_info_directory, timestamp)
os.makedirs(current_map_directory)
populated_areas_filename = os.path.join(current_map_directory, "popualted_areas_array.pkl")

# put the number of rows, number of columns, and number of populated areas
# in text file for user to reference data
map_info_filename = os.path.join(current_map_directory, "map_info.txt")
with open(map_info_filename, "w") as f:
row_info = "num_rows: " + str(num_rows) + "\n"
f.write(row_info)
col_info = "num_cols: " + str(num_cols) + "\n"
f.write(col_info)
percent_pop_info = "num_populated_areas: " + str(num_populated_areas)
f.write(percent_pop_info)

# saved the populated areas array
populated_areas_filename = os.path.join(current_map_directory, "populated_areas_array.pkl")
with open(populated_areas_filename, 'wb') as f:
pkl.dump(populated_areas, f)

# save the paths array
paths_filename = os.path.join(current_map_directory, "paths_array.pkl")
with open(paths_filename, 'wb') as f:
pkl.dump(paths, f)

# save the paths to pops array
paths_to_pops_filename = os.path.join(current_map_directory, "paths_to_pops_array.pkl")
with open(paths_to_pops_filename, 'wb') as f:
pkl.dump(paths_to_pops, f)
map_size_and_percent_popualted_list = [num_rows, num_cols, percent_map_populated]
map_size_and_percent_popualted_list_filename = os.path.join(current_map_directory, "map_size_and_percent_popualted_list.pkl")
with open(map_size_and_percent_popualted_list_filename, 'wb') as f:
pkl.dump(map_size_and_percent_popualted_list, f)

# save the number of rows, number of columns, and number of populated areas
map_size_and_percent_populated_list = [num_rows, num_cols, num_populated_areas]
map_size_and_percent_populated_list_filename = os.path.join(current_map_directory, "map_size_and_percent_populated_list.pkl")
with open(map_size_and_percent_populated_list_filename, 'wb') as f:
pkl.dump(map_size_and_percent_populated_list, f)

def load_map_info(map_directory_path):
populated_areas_filename = os.path.join(map_directory_path, "popualted_areas_array.pkl")
# load the populated areas array
populated_areas_filename = os.path.join(map_directory_path, "populated_areas_array.pkl")
with open(populated_areas_filename, 'rb') as f:
populated_areas = pkl.load(f)

# load the paths array
paths_filename = os.path.join(map_directory_path, "paths_array.pkl")
with open(paths_filename, 'rb') as f:
paths = pkl.load(f)

# load the paths to pops array
paths_to_pops_filename = os.path.join(map_directory_path, "paths_to_pops_array.pkl")
with open(paths_to_pops_filename, 'rb') as f:
paths_to_pops = pkl.load(f)
map_size_and_percent_popualted_list_filename = os.path.join(map_directory_path, "map_size_and_percent_popualted_list.pkl")
with open(map_size_and_percent_popualted_list_filename, 'rb') as f:
map_size_and_percent_popualted_list = pkl.load(f)
num_rows = map_size_and_percent_popualted_list[0]
num_cols = map_size_and_percent_popualted_list[1]
percent_map_populated = map_size_and_percent_popualted_list[2]
return num_rows, num_cols, populated_areas, paths, paths_to_pops, percent_map_populated


def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True):
if percent_map_populated > 100:

# load the number of rows, number of columns, and number of populated areas
map_size_and_percent_populated_list_filename = os.path.join(map_directory_path, "map_size_and_percent_populated_list.pkl")
with open(map_size_and_percent_populated_list_filename, 'rb') as f:
map_size_and_percent_populated_list = pkl.load(f)
num_rows = map_size_and_percent_populated_list[0]
num_cols = map_size_and_percent_populated_list[1]
num_populated_areas = map_size_and_percent_populated_list[2]
return num_rows, num_cols, populated_areas, paths, paths_to_pops, num_populated_areas


def generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True, steps_lower_bound = 2, steps_upper_bound = 4, percent_go_straight = 50, num_paths_mean = 3, num_paths_stdev = 1):
if num_populated_areas > (num_rows*num_cols - (2 * num_rows + 2 * num_cols)):
raise Exception("Cannot have more than 100 percent of the map be populated!")

orientations = create_orientations()
directions = {0: "left", 1: "right", 2: "straight"}
if num_rows <= 0:
raise Exception("Number of rows must be a positive value!")
if num_cols <= 0:
raise Exception("Number of columns must be a positive value!")
if percent_go_straight > 99:
raise Exception("Cannot have the percent chance of going straight be greater than 99!")
if num_paths_mean < 1:
raise Exception("The mean for the number of paths cannot be less than 1!")
if steps_lower_bound > steps_upper_bound:
raise Exception("The lower bound for the number of steps cannot be greater than the upper bound!")
if steps_lower_bound < 1 or steps_upper_bound < 1:
raise Exception("The bounds for the number of steps cannot be less than 1!")

paths_to_pops = {}
num_populated_areas = int(num_rows * num_cols * percent_map_populated * 0.01)
populated_areas = generate_pop_locations(
num_rows, num_cols, num_populated_areas
)

# the number of paths for each populated area is chosen from normal distribution
num_paths_array = np.random.normal(3, 1, num_populated_areas).astype(int)
num_paths_array = np.random.normal(num_paths_mean, num_paths_stdev, num_populated_areas).astype(int)
# each populated area must have at least one path
while 0 in num_paths_array:
num_paths_array = np.random.normal(3, 1, num_populated_areas).astype(int)
num_paths_array[num_paths_array < 1] = 1

paths = []
path_num = 0

for i in range(len(populated_areas)):
pop_row, pop_col = populated_areas[i]

num_pop_paths_created = 0 # for cases where a path couldn't be made
# for cases where a path couldn't be made
num_pop_paths_created = 0
while num_pop_paths_created < num_paths_array[i]:
current_path = []

Expand All @@ -131,37 +169,42 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True

# we want to make sure that the current path will not intersect with itself
direction_chosen = False
num_steps = random.randint(2, 4)
num_steps = random.randint(steps_lower_bound, steps_upper_bound)

while not direction_chosen:
# have a bias toward going straight
direction_index = random.randint(0, 1)
if direction_index == 2:
# choose whether to go straight, left, or right based
# on percent_go_straight -> if we don't go straight,
# we go left or right with equal probability
direction_index = 0
percent_value = random.randint(0, 100)
if percent_value > percent_go_straight:
direction_index = random.randint(1, 2)
direction = directions[direction_index]
direction = DIRECTIONS[direction_index]

if orientation == "north" and direction == "left":
if cur_row - num_steps < x_min:
if orientation == "north" and direction != "straight":
if cur_row == x_min:
direction_chosen = True
elif orientation == "south" and direction == "right":
if cur_row + num_steps > x_max:
elif orientation == "south" and direction != "straight":
if cur_row == x_max:
direction_chosen = True
elif orientation == "east" and direction == "left":
if cur_col + num_steps > y_max:
elif orientation == "east" and direction != "straight":
if cur_col == y_max:
direction_chosen = True
elif orientation == "west" and direction == "right":
if cur_col - num_steps < y_min:
elif orientation == "west" and direction != "straight":
if cur_col == y_min:
direction_chosen = True
else:
direction_chosen = True

row_update = orientations[orientation][direction][0][0]
col_update = orientations[orientation][direction][0][1]
row_update = ORIENTATONS[orientation][direction][0][0]
col_update = ORIENTATONS[orientation][direction][0][1]

for _ in range(num_steps):
cur_row += row_update
cur_col += col_update

# update bounds if necessary
# (so that paths do not intersect with themselves)
if cur_row > x_max:
x_max = cur_row
if cur_row < x_min:
Expand All @@ -171,7 +214,8 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True
if cur_col < y_min:
y_min = cur_col

# the population center is on the edge of the map, so we don't want to add a path in this direction
# the population center is on the edge of the map,
# so we don't want to add a path in this direction
if (
cur_row == -1
or cur_row == num_cols
Expand All @@ -184,27 +228,22 @@ def generate_map_info(num_rows, num_cols, percent_map_populated, save_map = True
current_path.append([cur_row, cur_col])
if (
cur_row == 0
or cur_row == num_cols - 1
or cur_row == num_rows - 1
or cur_col == 0
or cur_col == num_rows - 1
or cur_col == num_cols - 1
):
# we want unique paths
done = True
if current_path in paths:
break
paths.append(current_path)
paths_to_pops[path_num] = [[pop_row, pop_col]]
path_num += 1
num_pop_paths_created += 1
break

# update orientation
orientation = orientations[orientation][direction][1]
orientation = ORIENTATONS[orientation][direction][1]
if save_map:
save_map_info(num_rows, num_cols, percent_map_populated, populated_areas, paths, paths_to_pops)
save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops)
return populated_areas, np.array(paths, dtype=object), paths_to_pops

# populated_areas, paths, paths_to_pops = generate_map_info(10, 10, 5)

num_rows, num_cols, populated_areas, paths, paths_to_pops, percent_map_populated = load_map_info("./pyrorl_map_info/10_rows_10_cols_5_percent_map_populated_generation0")

example_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

print(example_world.state_space[4])
Loading

0 comments on commit 3d6ffbd

Please sign in to comment.