Skip to content

Commit

Permalink
reverting code back (will do rebase as a group later)
Browse files Browse the repository at this point in the history
  • Loading branch information
joey-obrien committed Dec 15, 2023
1 parent 128782a commit aa6ff4e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 264 deletions.
247 changes: 6 additions & 241 deletions tests/environment_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ def dummy_environment():
test_world = FireWorld(10, 10, populated_areas, paths, paths_to_pops)
return test_world






def test_initialization():
populated_areas = np.array([[2,2],[4,1]])
paths = [[[2,0],[2,1]], [[1,0],[1,1],[2,1],[3,1]]]
Expand Down Expand Up @@ -68,244 +73,4 @@ def test_setup():
# Advance the gridworld and get the reward
test_world.advance_to_next_timestep()
reward = test_world.get_state_utility()
print("Reward: " + str(reward) + "\n")

"""
Test to make sure that if a path goes on fire, they are removed from the state space for paths.
"""
def test_remove_path_on_fire():
populated_areas = np.array([[1,2]])
paths = [[[1,0],[1,1]]]
paths_to_pops = {0:[[1,2]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# Manually set path on fire
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))
path_cell = paths[0][0]
test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1

test_world.update_paths_and_evactuations()
assert np.array_equal(test_world.state_space[PATHS_INDEX], np.zeros((num_rows, num_cols)))
assert test_world.paths[0][1] == False


"""
Test to make sure that if multiple paths go on fire after a step, they are both removed from
the paths state space.
"""
def test_remove_multiple_paths_on_fire():
populated_areas = np.array([[1,2], [3,3]])
paths = [[[1,0],[1,1]], [[3,4]]]
paths_to_pops = {0:[[1,2]], 1:[[3,4]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# Manually set paths on fire
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))
first_path_cell = paths[0][0]
test_world.state_space[FIRE_INDEX, first_path_cell[0], first_path_cell[1]] = 1
second_path_cell = paths[1][0]
test_world.state_space[FIRE_INDEX, second_path_cell[0], second_path_cell[1]] = 1


test_world.update_paths_and_evactuations()
assert np.array_equal(test_world.state_space[PATHS_INDEX], np.zeros((num_rows, num_cols)))
assert test_world.paths[0][1] == False
assert test_world.paths[1][1] == False

"""
Test for if two paths intersect only the one on fire disappears
"""
def test_remove_path_on_fire_intersecting_paths():
populated_areas = np.array([[1,2], [2,1]])
paths = [[[1,0],[1,1]], [[0,1], [1,1]]]
paths_to_pops = {0:[[1,2]], 1:[2,1]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# Manually set path on fire
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))
path_cell = paths[0][0]
test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1

test_world.update_paths_and_evactuations()
check_grid = np.zeros((num_rows,num_cols))
check_grid[0,1] = 1
check_grid[1,1] = 1
assert np.array_equal(test_world.state_space[PATHS_INDEX], check_grid)


"""
Test to make sure that if a populated area was taking a path that caught on fire, it stops
evacuating.
"""
def test_stop_evacuating():
populated_areas = np.array([[1,2]])
paths = [[[1,0],[1,1]]]
paths_to_pops = {0:[[1,2]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# Manually set popualted area to be evacuating
populated_area = populated_areas[0]
test_world.state_space[EVACUATING_INDEX, populated_area[0], populated_area[1]] = 1
test_world.evacuating_paths[0] = [populated_area]

# Set path populated area is using to evacuate on fire
path_cell = paths[0][0]
test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1

test_world.update_paths_and_evactuations()
assert np.array_equal(test_world.state_space[EVACUATING_INDEX], np.zeros((num_rows, num_cols)))
assert 0 not in test_world.evacuating_paths

"""
Test to make sure that if two areas are taking the same path, they both stop evacuating.
"""
def test_multiple_stop_evacuating():
populated_areas = np.array([[1,2], [0,1]])
paths = [[[1,0],[1,1]]]
paths_to_pops = {0:[[1,2], [0,1]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# Manually set popualted area to be evacuating
first_populated_area = populated_areas[0]
test_world.state_space[EVACUATING_INDEX, first_populated_area[0], first_populated_area[1]] = 1
test_world.evacuating_paths[0] = [first_populated_area]
second_populated_area = populated_areas[1]
test_world.state_space[EVACUATING_INDEX, second_populated_area[0], second_populated_area[1]] = 1
test_world.evacuating_paths[0].append(second_populated_area)

# Set path populated areas are using to evacuate on fire
path_cell = paths[0][0]
test_world.state_space[FIRE_INDEX, path_cell[0], path_cell[1]] = 1

test_world.update_paths_and_evactuations()
assert np.array_equal(test_world.state_space[EVACUATING_INDEX], np.zeros((num_rows, num_cols)))
assert 0 not in test_world.evacuating_paths

"""
Test to make sure that if a path is evacuating its evacuating timestamp is decremented when a step
is taken.
"""
def test_evacuation_decrement():
populated_areas = np.array([[1,2]])
paths = [[[1,0],[1,1]]]
paths_to_pops = {0:[[1,2]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# set populated area evacuation timstamp
pop_area = populated_areas[0]
test_world.evacuating_timestamps[pop_area[0], pop_area[1]] = 10
test_world.evacuating_paths[0] = [pop_area]

test_world.update_paths_and_evactuations()

assert test_world.evacuating_timestamps[pop_area[0], pop_area[1]] == 9

"""
Test to make sure if two areas are evacuating on the same path, they both have their
timestamps decremented when a step is taken.
"""
def test_multiple_evacuation_decrement():
populated_areas = np.array([[1,2], [0,1]])
paths = [[[1,0],[1,1]]]
paths_to_pops = {0:[[1,2], [0,1]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# set populated areas evacuation timstamp
first_pop_area = populated_areas[0]
test_world.evacuating_timestamps[first_pop_area[0], first_pop_area[1]] = 10
test_world.evacuating_paths[0] = [first_pop_area]
second_pop_area = populated_areas[1]
test_world.evacuating_timestamps[second_pop_area[0], second_pop_area[1]] = 10
test_world.evacuating_paths[0].append(second_pop_area)

test_world.update_paths_and_evactuations()

assert test_world.evacuating_timestamps[first_pop_area[0], first_pop_area[1]] == 9
assert test_world.evacuating_timestamps[first_pop_area[0], first_pop_area[1]] == 9

"""
Test to make sure that if a populated area finishes evacuating it is removed from the evacuating
populated areas state.
"""
def test_finished_evacuating():
populated_areas = np.array([[1,2], [0,1]])
paths = [[[1,0],[1,1]]]
paths_to_pops = {0:[[1,2], [0,1]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

# turn off fires
test_world.state_space[FIRE_INDEX] = np.zeros((num_rows, num_cols))

# set populated areas evacuation timstamp
pop_area = list(populated_areas[0])
test_world.evacuating_timestamps[pop_area[0], pop_area[1]] = 1
test_world.evacuating_paths[0] = [pop_area]

# set populated area to be evacuating
test_world.state_space[EVACUATING_INDEX, pop_area[0], pop_area[1]] = 1

test_world.update_paths_and_evactuations()

assert test_world.state_space[EVACUATING_INDEX, pop_area[0], pop_area[1]] == 0
assert test_world.state_space[POPULATED_INDEX, pop_area[0], pop_area[1]] == 0
assert 0 not in test_world.evacuating_paths
assert test_world.evacuating_timestamps[pop_area[0], pop_area[1]] == np.inf

"""
Test to make sure self.actions is set up correctly.
"""
def test_set_actions():
populated_areas = np.array([[1,2], [0,1]])
paths = [[[1,0],[1,1]],[[0,0]]]
paths_to_pops = {0:[[1,2], [0,1]], 1:[[0,1]]}
num_rows = 5
num_cols = 5

# Initialize fire world
test_world = FireWorld(num_rows, num_cols, populated_areas, paths, paths_to_pops)

assert len(test_world.actions) == 4 #4 because action to do nothing adds 1
print("Reward: " + str(reward) + "\n")
34 changes: 11 additions & 23 deletions wildfire_evac/wildfire_evac/envs/environment/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import random
from scipy.stats import bernoulli
import torch

from .environment_constant import fire_mask

"""
Expand All @@ -29,22 +30,14 @@ class FireWorld:
"""

def __init__(self, num_rows, num_cols, populated_areas, paths, paths_to_pops, num_fire_cells = 2, custom_fire_locations = None):
# if they don't pass in populated areas and don't say they want auto-generated paths raise an exception, don't do an assert


"""
The constructor defines the state and action space, initializes the fires,
and sets the paths and populated areas.
"""
# Define the state and action space
self.reward = 0
self.state_space = np.zeros([5, num_rows, num_cols])

num_actions = 0
for key in paths_to_pops:
for _ in range(len(paths_to_pops[key])):
num_actions += 1
self.actions = list(np.arange(num_actions + 1)) # extra action for doing nothing
self.actions = list(np.arange(len(paths) + 1)) # extra action for doing nothing

# Associate paths with populated areas and actions
# Note: there seems to be an error that keeps popping up where this dictionary is not
Expand All @@ -54,13 +47,9 @@ def __init__(self, num_rows, num_cols, populated_areas, paths, paths_to_pops, nu
# We want to remember which action index corresponds to which population center
# and which path (because we just provide an array like [1,2,3,4,5,6,7]) which
# would each be mapped to a given population area taking a given path

self.action_to_pop_and_path = { self.actions[-1] : None}
action_val = 0
for key in self.paths_to_pops:
for pop_area in paths_to_pops[key]:
self.action_to_pop_and_path[key] = (pop_area, action_val)
action_val += 1
self.action_to_pop_and_path[key] = (paths_to_pops[key], key) # action index: list of pop x,y index and path index [[x,y],path_index]

# State for the evacuation of populated areas
self.evacuating_paths = {} # path_index : list of pop x,y indices that are evacuating [[x,y],[x,y],...]
Expand Down Expand Up @@ -88,7 +77,7 @@ def __init__(self, num_rows, num_cols, populated_areas, paths, paths_to_pops, nu
# Note: right now paths is different from self.paths but we can change this later if needed
self.paths = []
for path in paths:
path_array = np.array(path).astype(int)
path_array = np.array(path)
path_rows, path_cols = path_array[:,0], path_array[:,1]
self.state_space[PATHS_INDEX,path_rows,path_cols] += 1

Expand Down Expand Up @@ -139,6 +128,7 @@ def update_paths_and_evactuations(self):
2. Also stops evacuating any areas that were taking a burned down path
3. Also decrements the evacuation timestamps
"""
self.state_space[FIRE_INDEX][1,1] = 1
for i in range(len(self.paths)):
# Decrement path counts and remove path
if self.paths[i][1] and np.sum(np.logical_and(self.state_space[FIRE_INDEX], self.paths[i][0])) > 0:
Expand All @@ -147,8 +137,7 @@ def update_paths_and_evactuations(self):

# Stop evacuating an area if it was taking the removed path
if i in self.evacuating_paths:
# pop_centers = np.array(self.evacuating_paths[i])[0]
pop_centers = np.array(self.evacuating_paths[i])
pop_centers = np.array(self.evacuating_paths[i])[0]
pop_rows, pop_cols = pop_centers[:,0], pop_centers[:,1]

# Reset timestamp and evacuation index
Expand All @@ -161,7 +150,7 @@ def update_paths_and_evactuations(self):
# for the below, this code works for if multiple population centers are taking the same path and
# finish at the same time, but if we have it so that two population centers can't take the same
# path it could probably be simplified
pop_centers = np.array(self.evacuating_paths[i])
pop_centers = np.array(self.evacuating_paths[i])[0]
pop_rows, pop_cols = pop_centers[:,0], pop_centers[:,1]
self.evacuating_timestamps[pop_rows,pop_cols] -= 1
done_evacuating = np.where(self.evacuating_timestamps == 0)
Expand All @@ -176,7 +165,7 @@ def update_paths_and_evactuations(self):
done_evacuating = np.array([done_evacuating[0], done_evacuating[1]])
done_evacuating = np.transpose(done_evacuating)
for j in range(done_evacuating.shape[0]):
self.evacuating_paths[i].remove(list(done_evacuating[j]))
self.evacuating_paths[i][0].remove(list(done_evacuating[j]))

# this population center is done evacuating, so we can set its timestamp back to infinity
# (this is important so that we don't try to remove this from self.evacuating paths twice -
Expand All @@ -185,10 +174,10 @@ def update_paths_and_evactuations(self):
self.evacuating_timestamps[update_row, update_col] = np.inf

# no more population centers are using this path, so we delete it
if len(self.evacuating_paths[i]) == 0:
if len(self.evacuating_paths[i][0]) == 0:
del self.evacuating_paths[i]

def advance_to_next_timestep(self, manual_fire = None):
def advance_to_next_timestep(self):
"""
Take three steps:
1. Advance fire forward one timestep
Expand Down Expand Up @@ -272,9 +261,8 @@ def get_state(self):
returned_state[PATHS_INDEX] = np.clip(returned_state[PATHS_INDEX], 0, 1)
return returned_state


def get_terminated(self):
"""
Get the status of the simulation.
"""
return ( self.time_step >= 100 )
return ( self.time_step >= 100 )

0 comments on commit aa6ff4e

Please sign in to comment.