forked from ggkfox/166-project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mdp.py
297 lines (232 loc) · 13.8 KB
/
mdp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import state
import objects
import itertools
import board
import copy
import time
class MDP:
def __init__ (self, startingState, reward, livingReward, iterations = None):
# Variables that change the AI behavior
self.rewardDiscount = reward
self.livingReward = livingReward
self.iterations = iterations if iterations != None else 10
self.intendedActionProbability = 0.8 # intended action succeeds 80% of the time
self.unintendedActionProbability = 0.2 # unintended action occurs 20% of the time. This will be split by the number of unintended actions available in a state
self.currentState = startingState
self.states = list() # contains all the valid states of the model
self.policyTable = dict() # this will be a dictionary of (state : action) pairs. This will be updated by value iteration and q-learning
self.currentStateValues = dict() # dictionary of (State : float) pairs. Used for value iteration. This will hold V_K values
self.nextStates = dict() # dictionary of (state, action) keys that returns the State that (s,a) will go to.
self.transitionFunctions = dict() # will contain (s,a) tuple for the key and returns a list of (action, probability) tuples
self.rewardFunctions = dict() # will contain (s,a,s') tuple for the key and a reward value
self.initializeStates() # initialize all the states that exist in the MDP
self.initializeNextStateTableAndRewardTable() # initialize table of next states given original state and R(s,a,s') for all states and actions
self.initializeTransitionFunctions() # initialize T(s,a,s') for all states and actions
start = time.time()
self.valueIteration()
end = time.time()
print (end - start)
def initializeStates(self):
# iterates over the board and creates every valid state that exists in the MDP.
# this is gonna be computationally expensive as shit.
# get all possible player, and key positions. Positions are stored as a tuple of (x, y) coordinates.
# Note that these are ALL positions each of these items can occupy.
playerPos = list()
keyPos = list()
# get player positions
for x in range(0, len(objects.board.tiles)):
for y in range(0, len(objects.board.tiles[0])):
if objects.board.tiles[x][y].isWall():
continue
# if this tile is avalid player position, add it to list.
playerPos.append((x, y))
# get all valid combinations of key positions.
keyList = self.getKeyPositionCombinations()
# populate list of all possible states
stateList = list(itertools.product(playerPos, keyList))
for newState in stateList:
stateObj = state.State(newState[0], newState[1])
self.states.append(stateObj)
self.currentStateValues[stateObj] = 0 # initialize state's value to 0 in the dictionary
self.currentStateValues[None] = 0 # this will be used for the exit state
# for key, val in self.currentStateValues.items():
# print key
# print val
def initializeNextStateTableAndRewardTable(self):
for currentState in self.states:
# iterate over all states
for action in self.getActionVector(currentState, objects.board):
nextState, reward = self.getNextStateAndReward(currentState, action)
self.nextStates[(currentState, action)] = nextState
self.rewardFunctions[(currentState, action, nextState)] = reward
def initializeTransitionFunctions(self):
# iterate over all the states
for newState in self.states:
actionVector = self.getActionVector(newState, objects.board)
if len(actionVector) == 1:
transitionProbabilityList = [(actionVector[0], 0.8)]
self.transitionFunctions[(newState, actionVector[0])] = transitionProbabilityList
else:
# set the main action to 80% probability and the rest to 20% / size of the remaining possible actions.
for action in actionVector:
mainAction = (action, self.intendedActionProbability)
remainingProbability = self.unintendedActionProbability / (len(actionVector) - 1) # calculates probability for any action that isn't the main action
transitionProbabilityList = [mainAction]
for newAction in actionVector:
if newAction == action:
continue
transitionProbabilityList.append((newAction, remainingProbability))
self.transitionFunctions[(newState, action)] = transitionProbabilityList
def getActionVector(self, currentState, board):
# helper function called by valueIteration() to get a vector of valid directions from the position passed.
# NOTE: playerPos is a tuple of (x, y) type. So playerPos[0] gives x value and playerPos[1] gives y value
x = currentState.playerPos[0]
y = currentState.playerPos[1]
validMoves = ["Left", "Right", "Up", "Down", "Stay"]
if board.tiles[x][y].isExit() and all(val == None for val in currentState.keyPositions):
validMoves = ["Exit"] # player is on exit and has all the keys, exit is the only right move.
return validMoves
# rule out which actions we can take depending on current and next x, y positions. Based on implementation in key.py
if x == 0 or board.tiles[x - 1][y].isWall():
validMoves.remove("Left")
if x == board.width - 1 or board.tiles[x + 1][y].isWall():
validMoves.remove("Right")
if y == 0 or board.tiles[x][y - 1].isWall():
validMoves.remove("Up")
if y == board.height - 1 or board.tiles[x][y + 1].isWall():
validMoves.remove("Down")
return validMoves
def getNextStateAndReward(self, originalState, action):
# calculates next state and reward value based on action taken in originalState
# value is returned as a tuple (nextState, rewardValue)
# these bools track if player died or got a key taking this action
if action == "Exit":
# if we can exit, return 100 reward
return (None, 100)
if action == "Stay":
# if we stay, return the state we were just in
return (originalState, self.livingReward)
totalReward = 0 # track the reward we end up getting for taking this action
xDirection = 0
yDirection = 0
# create temporary movement variables so player isn't actually effected
playerX = originalState.playerPos[0]
playerY = originalState.playerPos[1]
tempBoard = copy.deepcopy(objects.board) # create a copy of the board
nextState = None
resultingKeyPositionList = copy.deepcopy(originalState.keyPositions) # this list will be updated if keys are acquired taking this action
# assign keys to the board
for key in tempBoard.keys:
tempBoard.tiles[key.x][key.y].key = False
for keyPos in originalState.keyPositions:
if keyPos is None:
continue
tempBoard.tiles[keyPos[0]][keyPos[1]].key = True
tempKeyCount = 0
for key in originalState.keyPositions:
if key == None:
tempKeyCount += 1 # if a key is set to None in the key vector, add to the key count
# note that the x, y origin is at the top-left, which is why UP is -1 in the y direction.
xDirection = self.getXDirection(action)
yDirection = self.getYDirection(action)
while((yDirection != 0 or xDirection != 0)
and (playerY + yDirection >= 0 and playerY + yDirection < objects.board.height)
and (playerX + xDirection >= 0 and playerX + xDirection < objects.board.width)
and not (objects.board.tiles[playerX + xDirection][playerY + yDirection].isWall())):
playerX += xDirection
playerY += yDirection
if tempBoard.tiles[playerX][playerY].hasKey():
totalReward += 10
# iterate over key position list and set the key that was acquired to None
for index, keyPos in enumerate(resultingKeyPositionList):
if keyPos is None:
continue
if keyPos[0] == playerX and keyPos[1] == playerY:
resultingKeyPositionList[index] = None
if tempBoard.tiles[playerX][playerY].isLava():
totalReward -= 1000
if objects.board.tiles[playerX][playerY].isWormhole():
wormhole = objects.board.tiles[playerX][playerY]
playerX = wormhole.exit.exitX
playerY = wormhole.exit.exitY
xDirection = self.getXDirection(wormhole.translateDirection(action))
yDirection = self.getYDirection(wormhole.translateDirection(action))
continue
totalReward += self.livingReward
nextState = state.State((playerX, playerY), resultingKeyPositionList)
# self.rewardFunctions[(originalState, action, nextState)] = totalReward
return (nextState, totalReward)
def getXDirection(self, newDirection):
if newDirection == "Right":
return 1
elif newDirection == "Left":
return -1
return 0
def getYDirection(self, newDirection):
if newDirection == "Down":
return 1
elif newDirection == "Up":
return -1
return 0
def valueIteration(self):
# perform value iteration
# iterate 100 times over all states and update values
for i in range(0, self.iterations):
# grab our kth dictionary of values
previousStateValues = copy.deepcopy(self.currentStateValues)
for currentState in self.states:
actionVector = self.getActionVector(currentState, objects.board)
actionValues = dict() # will track (action : value) key-value pairs. Will take argmax after getting all values
# iterate over each action and append values to actionValues dictionary
for action in actionVector:
# calculate T(s, a, s') * (R(s, a, s') + rewardDiscount(V_K(s')))
actionValues[action] = 0
# T(s,a,s') returns 0.8 for intented action and the remaining actions that are possible take up the remaining 20% combined.
for newAction, probability in self.transitionFunctions[(currentState, action)]:
# if intended action is right, right will succeed at 0.8, up, down, left, and stay will succeed with 0.05
nextState = self.nextStates[(currentState, newAction)]
reward = self.rewardFunctions[(currentState, newAction, nextState)]
actionValues[action] += probability * (reward + (self.rewardDiscount * previousStateValues[nextState]))
self.currentStateValues[currentState] = max(actionValues.values()) # Update state value to new values that were just calculated.
self.policyTable[currentState] = max(actionValues, key=actionValues.get) # update policy of currentState to the best action from the calculation
def getKeyPositionCombinations(self):
# get key positions for each key. We have min and max x,y positions for each key that must be considered
counter = 0
keyPos = [list() for x in range(0, len(objects.board.keys))]
for key in objects.board.keys:
for x in range(key.xmin, key.xmax + 1):
for y in range(key.ymin, key.ymax + 1):
if objects.board.tiles[x][y].isWall():
# wall tiles are not valid key positions
continue
keyPos[counter].append((x, y)) # add a position for this specific key.
keyPos[counter].append(None)
counter += 1
# get cartesian product of all key positions. This gives us all possible key position combinations
keyCombos = list(set(itertools.product(*keyPos)))
# Find entries that contain duplicate positions. Keys cannot be on top of one another.
removeList = list()
for i in range(0, len(keyCombos)):
for j in range(0, len(objects.board.keys)):
for k in range(0, len(objects.board.keys)):
if j == k:
continue
if keyCombos[i][j] == keyCombos[i][k] and (keyCombos[i][j] and keyCombos[i][k] != None):
removeList.append(keyCombos[i])
break
return [list(positions) for positions in keyCombos if positions not in removeList]
# print keyList
def updateCurrentState(self, player, keys):
keyVector = list()
for key in keys:
# check if the key has been acquired. If it has, we want to append None so the state can keep track properly
if key.acquired:
keyVector.append(None)
else:
keyVector.append((key.x, key.y))
newState = state.State((player.x, player.y), keyVector)
self.currentState = newState
def getCurrentStateActionFromPolicy(self):
# returns the best action for the current state of the MDP
# print str(self.currentState) + " = " + str(self.currentStateValues[self.currentState])
return self.policyTable[self.currentState]