Skip to content

Commit

Permalink
cleaning up code for deep scan
Browse files Browse the repository at this point in the history
  • Loading branch information
joey-obrien committed Apr 7, 2024
1 parent 7b52ec5 commit eacd1e0
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 52 deletions.
4 changes: 2 additions & 2 deletions examples/map_gen_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
Run basic environment.
"""
# Set up parameters
num_rows, num_cols = 20, 20
num_rows, num_cols = 200, 100
num_populated_areas = 5

# example of generating map (other parameters are set to their default values)
Expand Down Expand Up @@ -63,7 +63,7 @@

# Run a simple loop of the environment
env.reset()
for _ in range(10):
for _ in range(20):

# Take action and observation
action = env.action_space.sample()
Expand Down
109 changes: 75 additions & 34 deletions pyrorl/pyrorl/envs/map_helpers/create_map_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
"left": [[0, 1], "east"],
"right": [[0, -1], "west"],
"straight": [[1, 0], "south"],
},
},
"east": {
"left": [[-1, 0], "north"],
"right": [[1, 0], "south"],
"straight": [[0, 1], "east"],
},
},
"west": {
"left": [[1, 0], "south"],
"right": [[-1, 0], "north"],
"straight": [[0, -1], "west"],
}
},
}
MAP_DIRECTORY = "pyrorl_map_info"

Expand All @@ -47,7 +47,9 @@ def generate_pop_locations(num_rows, num_cols, num_populated_areas):
return populated_areas


def save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops):
def save_map_info(
num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops
):
# the map information is saved in the user's current working directory
user_working_directory = os.getcwd()
maps_info_directory = os.path.join(user_working_directory, MAP_DIRECTORY)
Expand All @@ -71,75 +73,107 @@ def save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, path
f.write(percent_pop_info)

# saved the populated areas array
populated_areas_filename = os.path.join(current_map_directory, "populated_areas_array.pkl")
with open(populated_areas_filename, 'wb') as f:
populated_areas_filename = os.path.join(
current_map_directory, "populated_areas_array.pkl"
)
with open(populated_areas_filename, "wb") as f:
pkl.dump(populated_areas, f)

# save the paths array
paths_filename = os.path.join(current_map_directory, "paths_array.pkl")
with open(paths_filename, 'wb') as f:
with open(paths_filename, "wb") as f:
pkl.dump(paths, f)

# save the paths to pops array
paths_to_pops_filename = os.path.join(current_map_directory, "paths_to_pops_array.pkl")
with open(paths_to_pops_filename, 'wb') as f:
paths_to_pops_filename = os.path.join(
current_map_directory, "paths_to_pops_array.pkl"
)
with open(paths_to_pops_filename, "wb") as f:
pkl.dump(paths_to_pops, f)

# save the number of rows, number of columns, and number of populated areas
map_size_and_percent_populated_list = [num_rows, num_cols, num_populated_areas]
map_size_and_percent_populated_list_filename = os.path.join(current_map_directory, "map_size_and_percent_populated_list.pkl")
with open(map_size_and_percent_populated_list_filename, 'wb') as f:
map_size_and_percent_populated_list_filename = os.path.join(
current_map_directory, "map_size_and_percent_populated_list.pkl"
)
with open(map_size_and_percent_populated_list_filename, "wb") as f:
pkl.dump(map_size_and_percent_populated_list, f)


def load_map_info(map_directory_path):
# load the populated areas array
populated_areas_filename = os.path.join(map_directory_path, "populated_areas_array.pkl")
with open(populated_areas_filename, 'rb') as f:
populated_areas_filename = os.path.join(
map_directory_path, "populated_areas_array.pkl"
)
with open(populated_areas_filename, "rb") as f:
populated_areas = pkl.load(f)

# load the paths array
paths_filename = os.path.join(map_directory_path, "paths_array.pkl")
with open(paths_filename, 'rb') as f:
with open(paths_filename, "rb") as f:
paths = pkl.load(f)

# load the paths to pops array
paths_to_pops_filename = os.path.join(map_directory_path, "paths_to_pops_array.pkl")
with open(paths_to_pops_filename, 'rb') as f:
with open(paths_to_pops_filename, "rb") as f:
paths_to_pops = pkl.load(f)

# load the number of rows, number of columns, and number of populated areas
map_size_and_percent_populated_list_filename = os.path.join(map_directory_path, "map_size_and_percent_populated_list.pkl")
with open(map_size_and_percent_populated_list_filename, 'rb') as f:
map_size_and_percent_populated_list_filename = os.path.join(
map_directory_path, "map_size_and_percent_populated_list.pkl"
)
with open(map_size_and_percent_populated_list_filename, "rb") as f:
map_size_and_percent_populated_list = pkl.load(f)
num_rows = map_size_and_percent_populated_list[0]
num_cols = map_size_and_percent_populated_list[1]
num_populated_areas = map_size_and_percent_populated_list[2]
return num_rows, num_cols, populated_areas, paths, paths_to_pops, num_populated_areas
return (
num_rows,
num_cols,
populated_areas,
paths,
paths_to_pops,
num_populated_areas,
)


def generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True, steps_lower_bound = 2, steps_upper_bound = 4, percent_go_straight = 50, num_paths_mean = 3, num_paths_stdev = 1):
if num_populated_areas > (num_rows*num_cols - (2 * num_rows + 2 * num_cols)):
def generate_map_info(
num_rows,
num_cols,
num_populated_areas,
save_map=True,
steps_lower_bound=2,
steps_upper_bound=4,
percent_go_straight=50,
num_paths_mean=3,
num_paths_stdev=1,
):
if num_populated_areas > (num_rows * num_cols - (2 * num_rows + 2 * num_cols)):
raise Exception("Cannot have more than 100 percent of the map be populated!")
if num_rows <= 0:
raise Exception("Number of rows must be a positive value!")
if num_cols <= 0:
raise Exception("Number of columns must be a positive value!")
if percent_go_straight > 99:
raise Exception("Cannot have the percent chance of going straight be greater than 99!")
raise Exception(
"Cannot have the percent chance of going straight be greater than 99!"
)
if num_paths_mean < 1:
raise Exception("The mean for the number of paths cannot be less than 1!")
if steps_lower_bound > steps_upper_bound:
raise Exception("The lower bound for the number of steps cannot be greater than the upper bound!")
raise Exception(
"The lower bound for the number of steps cannot be greater than the upper bound!"
)
if steps_lower_bound < 1 or steps_upper_bound < 1:
raise Exception("The bounds for the number of steps cannot be less than 1!")

paths_to_pops = {}
populated_areas = generate_pop_locations(
num_rows, num_cols, num_populated_areas
)
populated_areas = generate_pop_locations(num_rows, num_cols, num_populated_areas)

# the number of paths for each populated area is chosen from normal distribution
num_paths_array = np.random.normal(num_paths_mean, num_paths_stdev, num_populated_areas).astype(int)
num_paths_array = np.random.normal(
num_paths_mean, num_paths_stdev, num_populated_areas
).astype(int)
# each populated area must have at least one path
num_paths_array[num_paths_array < 1] = 1

Expand All @@ -148,7 +182,7 @@ def generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True,

for i in range(len(populated_areas)):
pop_row, pop_col = populated_areas[i]
# for cases where a path couldn't be made
# for cases where a path couldn't be made
num_pop_paths_created = 0
while num_pop_paths_created < num_paths_array[i]:
current_path = []
Expand Down Expand Up @@ -214,7 +248,7 @@ def generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True,
if cur_col < y_min:
y_min = cur_col

# the population center is on the edge of the map,
# the population center is on the edge of the map,
# so we don't want to add a path in this direction
if (
cur_row == -1
Expand Down Expand Up @@ -245,5 +279,12 @@ def generate_map_info(num_rows, num_cols, num_populated_areas, save_map = True,
# update orientation
orientation = ORIENTATONS[orientation][direction][1]
if save_map:
save_map_info(num_rows, num_cols, num_populated_areas, populated_areas, paths, paths_to_pops)
save_map_info(
num_rows,
num_cols,
num_populated_areas,
populated_areas,
paths,
paths_to_pops,
)
return populated_areas, np.array(paths, dtype=object), paths_to_pops
28 changes: 13 additions & 15 deletions tests/environment_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pyrorl.envs.environment.environment import (
FireWorld,
FIRE_INDEX,
FUEL_INDEX,
POPULATED_INDEX,
EVACUATING_INDEX,
PATHS_INDEX,
Expand All @@ -21,7 +20,6 @@ def dummy_environment():
"""
# Define hardcoded paramaters of the gridworld -- populated areas, paths,
# and which areas can use whicn paths.
# Note: discuss if these should be dynamically generated or others.
populated_areas = np.array([[1, 2], [4, 8], [6, 4], [8, 7]])
paths = np.array(
[
Expand Down Expand Up @@ -115,7 +113,7 @@ def test_setup():
Set up grid world and initiate loop of action-taking + updating.
"""
test_world = dummy_environment()
for i in range(25):
for _ in range(25):
# Take a random action
all_actions = test_world.get_actions()
action = random.randint(0, len(all_actions) - 1)
Expand Down Expand Up @@ -366,7 +364,7 @@ def test_invalid_fire_locations():

def test_remove_path_on_fire():
"""
Test to make sure that if a path goes on fire,
Test to make sure that if a path goes on fire,
they are removed from the state space for paths.
"""
populated_areas = np.array([[1, 2]])
Expand Down Expand Up @@ -458,7 +456,7 @@ def test_stop_evacuating():
# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
# Turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# Manually set popualted area to be evacuating
Expand Down Expand Up @@ -490,7 +488,7 @@ def test_multiple_stop_evacuating():
# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
# Turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# Manually set popualted area to be evacuating
Expand Down Expand Up @@ -530,10 +528,10 @@ def test_evacuation_decrement():
# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
# Turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# set populated area evacuation timstamp
# Set populated area evacuation timstamp
pop_area = populated_areas[0]
test_world.evacuating_timestamps[pop_area[0], pop_area[1]] = 10
test_world.evacuating_paths[0] = [pop_area]
Expand All @@ -557,10 +555,10 @@ def test_multiple_evacuation_decrement():
# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
# Turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# set populated areas evacuation timstamp
# Set populated areas evacuation timstamp
first_pop_area = populated_areas[0]
test_world.evacuating_timestamps[first_pop_area[0], first_pop_area[1]] = 10
test_world.evacuating_paths[0] = [first_pop_area]
Expand Down Expand Up @@ -588,15 +586,15 @@ def test_finished_evacuating():
# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
# Turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# set populated areas evacuation timstamp
# Set populated areas evacuation timstamp
pop_area = list(populated_areas[0])
test_world.evacuating_timestamps[pop_area[0], pop_area[1]] = 1
test_world.evacuating_paths[0] = [pop_area]

# set populated area to be evacuating
# Set populated area to be evacuating
test_world.state_space[EVACUATING_INDEX, pop_area[0], pop_area[1]] = 1

test_world.update_paths_and_evactuations()
Expand Down Expand Up @@ -732,7 +730,7 @@ def test_already_evacuating():
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

test_world.evacuating_timestamps[1, 2] = (
9 # intentially make this lower than default to see if it's reset
9 # Intentially make this lower than default to see if it's reset
)
test_world.state_space[EVACUATING_INDEX, 1, 2] = 1
test_world.evacuating_paths[0] = [[1, 2]]
Expand Down Expand Up @@ -770,7 +768,7 @@ def test_pop_taking_first_action():

def test_multiple_pop_cells_same_path():
"""
Test to make sure that taking an action works for one populated area
Test to make sure that taking an action works for one populated area
if another populated area is already taking the same path.
"""
populated_areas = np.array([[1, 2], [0, 1]])
Expand Down
2 changes: 1 addition & 1 deletion tests/map_gen_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_path_structure():

def test_paths_not_fold():
"""
Make sure that paths do not fold in on themselves.
Make sure that paths do not intersect with themselves.
"""
num_rows = 1000
num_cols = 1000
Expand Down

0 comments on commit eacd1e0

Please sign in to comment.