-
Notifications
You must be signed in to change notification settings - Fork 0
/
pacmanMdp.py
278 lines (209 loc) · 9.71 KB
/
pacmanMdp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# pacmanMdp.py
# IA UC3M 2017
# IA UC3M 2016
# -----------------------
##
from game import GameStateData
from game import Game
from game import Actions
from util import nearestPoint
import util, layout
import sys, types, time, random, os
import mdp
import pickle
from featureExtractors import *
class PacmanMdp(mdp.MarkovDecisionProcess):
"""
pacman MDP
"""
def __init__(self, extractor='FullStateExtractor',transitionTableFile=None):
# We set this to true to print the MDP in the first call of the ValueIteration
self.printMdpTable=True
self.transitionTableFile=transitionTableFile
# Feature extractor
self.featExtractor = util.lookup(extractor, globals())()
# Uncomment to get the full vector of state features
# self.stateFeatures=['posX','posY','IncFoodX','IncFoodY','IncGhostX','IncGhostY','foodX','foodY','wallsW','wallsH','GhostDist','ClosestGhostX','ClosestGhostY','ClosestGhostDist','GhostPos']
# Default State features
self.stateFeatures=['posX','posY','IncFoodX','IncFoodY','IncGhostX','IncGhostY']
# Transition function (data structure required for the transition function)
#*** YOUR CODE STARTS HERE ***"
# This variable MUST be used to reference your transition table so
# it can be saved and loaded from file
self.transitionTable=None
#"*** YOUR CODE FINISHES HERE ***"
# Dictionary with examples of a Low state for each High state: it serves to get possible actions
# and to check terminal states (though it is not required if the high level representation
# capture them)
self.states = util.Counter()
# Reward for each state at the high level representation
self.reward = util.Counter()
def isTerminalFeatures (self, featuresTuple, checkIndices=(0,2)):
"""
This function has to be called with the proper indices to check if a state
is terminal, based on the situation. Current rule is that: if either X position for the
pacman agent is "None", or X position for the closest food is "None", we are in a terminal
state. This function is used in getTransitionStatesAndProbabilities.
We assume that the featuresTuple contains that information in positions 0 and 2
"""
countNone = [i for i,x in enumerate(featuresTuple) if i in checkIndices and x==None]
return (len(countNone)>0)
def stateToHigh(self, stateMap):
"""
Returns the high level representation of an state
This function gets the full state given as a map as a parameter.
First extracts all the calculated features and then filters them
keeping only the ones provided as parameter. Also sorts features
so they are retrieved in the provided order.
Features non existent (for instance if ghosts are not present) are skipped
Note that isTerminalFeatures expects to have "posX" and "FoodX" in particular positions
Returns two tuples: tuple of features and tuple of the features names.
Features must be loaded in the self.stateFeatures variable
List of Features:
'posX','posY': Position of Pac-man
'foodX','foodY': Relative position of closest food
'wallsW','wallsH': Width and Height of the map (including walls)
'IncGhostX','IncGhostY': Relative position of closest ghost (Not counting walls)
'GhostDist': Distance of closest ghost (calculated not counting walls)
'ClosestGhostX','ClosestGhostY': Absolute position of closest ghost (Counting walls)
'ClosestGhostDist': Distance of closest ghost (True distance, counting walls)
'GhostPos': Positions of all ghosts (tuple of tuples)
"""
# Gets all features
fullState_values,fullState_names = self.featExtractor.getFeatures(stateMap)
# Skip features not present in the complete list even if selected
state_names=tuple (n for n in self.stateFeatures if n in fullState_names)
state=tuple (fullState_values[fullState_names.index(n)] for n in state_names)
return state,state_names
def addStateLow(self, stateH, stateMap):
"""
Adds a new pair stateH stateL to the dictionary of states
"""
# print "Added", stateH
if not stateH in self.states.keys():
self.states[stateH] = stateMap
self.reward[stateH] = [1, [stateMap.getScore()]]
else:
self.reward[stateH][0] += 1
self.reward[stateH][1].append(stateMap.getScore())
def updateTransitionFunction(self, initialMap, action, nextMap):
"""
Updates the transition function with a new case stateL, action, nextStateL
The states received as parameters have a low level representation. The transition function
should be stored over the high level (simplified) representation
"""
# Change the representation to the simplified one
# Some of these features will not be present in the state if there are no ghosts
state,state_names=self.stateToHigh(initialMap)
nextstate,nextstate_names=self.stateToHigh(nextMap)
# Set the start state in the first call
if len(self.states.keys())== 0:
self.setStartState(state)
# Add the received states to self.states
self.addStateLow(state, initialMap)
self.addStateLow(nextstate, nextMap)
#"*** YOUR CODE STARTS HERE ***"
util.raiseNotDefined()
#"*** YOUR CODE FINISHES HERE ***"
def getPossibleActions(self, state):
"""
Returns list of valid actions for 'state'.
Note that you can request moves into walls and
that "exit" states transition to the terminal
state under the special action "done".
"""
if not state in self.states.keys():
return []
return (self.states[state]).getLegalActions(0)
def getStates(self):
"""
Return list of all states.
"""
return self.states.keys()
def isKnownState(self, state):
"""
True if the state is in the dict of states.
"""
return state in self.states.keys()
def getAverageReward(self, state):
"""
Return average rewards of the known low level states represented by a high level state
"""
return sum(i for i in self.reward[state][1])/self.reward[state][0]
def getReward(self, state, action, nextState):
"""
Get reward for state, action, nextState transition.
"""
return self.getAverageReward(nextState) - self.getAverageReward(state)
def setStartState(self, state):
"""
set for start state
"""
self.startState = state
def getStartState(self):
"""
get for start state
"""
return self.startState
def isTerminal(self, state):
"""
Pacman terminal states
"""
if not state in self.states.keys():
return isTerminalFeatures(state)
else:
return self.states[state].isLose() or self.states[state].isWin()
def printMdp( self ):
"""
Shows the transition function of the MDP
"""
for state in sorted(self.states.keys()):
for action in self.getPossibleActions(state):
print state, action, self.getTransitionStatesAndProbabilities(state, action)
def setTransitionTableFile ( self , filename):
self.transitionTableFile = filename
def getTransitionTable (self ):
return self.transitionTable
def saveTransitionTable(self):
"""
Saves all the objects constructed during training
"""
if self.transitionTableFile is not None:
trainInfo = {'states' : self.states,
'reward' : self.reward,
'transitionTable' : self.transitionTable}
pickle.dump(trainInfo,open(self.transitionTableFile,'wb'))
print " MDP transition table saved to file ", self.transitionTableFile
def loadTransitionTable(self):
if self.transitionTableFile is not None:
if (os.path.isfile(self.transitionTableFile)):
trainInfo= pickle.load(open(self.transitionTableFile,'rb'))
self.states= trainInfo['states']
self.reward= trainInfo['reward']
self.transitionTable= trainInfo['transitionTable']
print " MDP transition table loaded from file ", self.transitionTableFile
# else:
# print " WARNING: MDP transition table file not found. Creating a new table in " , self.transitionTableFile
def getTransitionStatesAndProbabilities(self, state, action):
"""
Returns list of (nextState, prob) pairs
representing the states reachable
from 'state' by taking 'action' along
with their transition probabilities.
"""
if self.printMdpTable:
self.printMdpTable=False
print
print " Executing MDP with transition table:"
self.printMdp()
print " End of MDP transition table"
print
if action not in self.getPossibleActions(state):
raise "Illegal action!"
if self.isTerminal(state):
return []
successors = []
#"*** YOUR CODE STARTS HERE ***"
util.raiseNotDefined()
#"*** YOUR CODE FINISHES HERE ***"
return successors