-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhelper.py
103 lines (92 loc) · 3.72 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from copy import deepcopy
from collections import defaultdict
import random
import numpy as np
class TabularQLearner:
def __init__(self,
gamma=0.99,
max_iter=1000,
c=16,
alpha=0.5,
verbose=10):
self.gamma = gamma
self.c = c
self.alpha = alpha
self.epsilon = 1.
self.max_iter = max_iter
self.verbose = verbose
def fit(self, env):
terminal_states = deepcopy(env.pos_reward_states)
terminal_states.extend(env.neg_reward_states)
terminal_reward_map = deepcopy(env.pos_reward_vals)
terminal_reward_map.update(env.neg_reward_vals)
self._fit_helper(
env.states,
env.init_state,
terminal_states,
terminal_reward_map,
env.state_action_state_probs,
env.actions_available,
env.get_reward
)
def get_next_state(self,
cur_state,
action_to_take,
state_action_state_probs):
states = [
state for state, _ in state_action_state_probs[cur_state][action_to_take]
]
probs =[
prob for _, prob in state_action_state_probs[cur_state][action_to_take]
]
new_state_i = np.argmax(probs)
return states[new_state_i]
def fetch_max_q(self, state):
return max(zip(self.Q[state].values(), self.Q[state].keys()))[0]
def fetch_a_greedily(self, state):
return max(zip(self.Q[state].values(), self.Q[state].keys()))[1]
def sample_action(self, state, possible_actions):
if random.random() < self.epsilon:
return random.choice(possible_actions)
return self.fetch_a_greedily(state)
def update_epsilon(self, iter):
self.epsilon = self.c / (self.c + iter)
def init_Q(self, states, action_fetcher, terminal_reward_map):
self.Q = defaultdict(lambda: {})
for state in states:
if state in terminal_reward_map:
self.Q[state][action_fetcher(state)[0]] = terminal_reward_map[state]
continue
for action in action_fetcher(state):
self.Q[state][action] = 0.
def _fit_helper(self,
states,
init_state,
terminal_states,
terminal_reward_map,
state_action_state_probs,
action_fetcher,
reward_fetcher):
init_state_sampler = lambda: random.choice(list(set(states) - set(terminal_states)))
self.init_Q(states, action_fetcher, terminal_reward_map)
if init_state is None:
init_state = init_state_sampler()
cur_state = init_state
for k in range(self.max_iter):
possible_actions = action_fetcher(cur_state)
action_to_take = self.sample_action(cur_state, possible_actions)
new_state = self.get_next_state(cur_state,
action_to_take, state_action_state_probs)
if new_state in terminal_states:
target = terminal_reward_map[new_state]
new_state = init_state_sampler()
else:
cur_r = reward_fetcher(cur_state, action_to_take, new_state)
discounted_q = self.gamma * self.fetch_max_q(new_state)
target = cur_r + discounted_q
cur_q_weight = (1-self.alpha) * self.Q[cur_state][action_to_take]
target_q_weight = (self.alpha) * target
self.Q[cur_state][action_to_take] = cur_q_weight + target_q_weight
cur_state = new_state
if k % self.verbose == 0:
self.update_epsilon(k)