forked from Bick95/PPO
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils_ppo.py
More file actions
234 lines (181 loc) · 9.73 KB
/
utils_ppo.py
File metadata and controls
234 lines (181 loc) · 9.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import torch
import numpy as np
from PIL import Image
import torch.nn.functional as F
from scheduler import Scheduler
from lr_scheduler import CustomLRScheduler
from torch.optim.lr_scheduler import ExponentialLR, LambdaLR
def add_batch_dimension(state):
"""
Add a batch dimension to the state.
Handles different state formats from gym environments.
Args:
state: The state from the environment, which could be:
- numpy array
- dict (for new gym versions)
- tuple or list
Returns:
numpy array with batch dimension added
"""
# Handle dict states (new gym versions)
if isinstance(state, dict):
if 'observation' in state:
state = state['observation']
elif len(state) > 0:
# Take the first value if observation is not present
state = next(iter(state.values()))
# Handle tuple or list states
if isinstance(state, (tuple, list)):
try:
state = np.array(state, dtype=np.float32)
except:
# If conversion fails, try to extract the first element
if len(state) > 0:
return add_batch_dimension(state[0])
else:
# Empty sequence, return a default state
return np.zeros((1, 4), dtype=np.float32)
# Ensure state is a numpy array
if not isinstance(state, np.ndarray):
try:
state = np.array(state, dtype=np.float32)
except:
print(f"Warning: Could not convert state of type {type(state)} to numpy array")
return np.zeros((1, 4), dtype=np.float32)
# Add batch dimension if not already present
if len(state.shape) == 1:
return np.expand_dims(state, axis=0)
else:
# If already has batch dimension or is multi-dimensional, return as is
return state
def simulation_is_stuck(last_state, state):
# If two consecutive states are absolutely identical, then we assume the simulation to be stuck in a semi-terminal state
# Returns True if two states are identical, else False
return torch.eq(last_state, state).all()
def visualize_markov_state(state: np.ndarray or torch.tensor,
env_state_depth: int,
markov_length: int,
color_code: str = 'RGB',
confirm_message: str = "Confirm..."):
if isinstance(state, torch.Tensor):
state = state.numpy() # Convert to numpy array
if len(state.shape) > 3:
state = state.squeeze() # Drop batch dimension
# Get contained environmental state representations
images = []
for i in range(markov_length):
extracted_env_state = state[:, :, i*env_state_depth : (i+1)*env_state_depth].squeeze()
temp_image = Image.fromarray(extracted_env_state.astype('uint8'), color_code)
images.append(temp_image)
# Create empty image container
image = Image.new(color_code, (images[0].width * markov_length, images[0].height * markov_length))
# Add individual images
for i in range(markov_length):
image.paste(images[i], (i * images[0].width, 0))
image.show()
input(confirm_message)
def get_scheduler(parameter: float or dict, device: torch.device, train_iterations: int,
parameter_name: str = None, verbose: bool = False):
# Schedulers are used to decay parameters (e.g. the clipping parameter epsilon or the (non-fixed-)standard deviation.
# This method takes some specification of how a scheduler is supposed to decay a given parameter and returns a
# suitable scheduler
if isinstance(parameter, float):
return Scheduler(parameter, 'constant', device, value_name=parameter_name, verbose=verbose)
elif isinstance(parameter, dict):
# Anneal clipping parameter between some values (from max to min) - Create a scheduler for that
initial_value = parameter['initial_value'] if 'initial_value' in parameter.keys() else None
min_value = parameter['min_value'] if 'min_value' in parameter.keys() else None
decay_type = parameter['decay_type'].lower() if 'decay_type' in parameter.keys() else None
decay_rate = parameter['decay_rate'] if 'decay_rate' in parameter.keys() else None
decay_steps = parameter['decay_steps'] if 'decay_steps' in parameter.keys() else train_iterations
verbose = parameter['verbose'] if 'verbose' in parameter.keys() else verbose
if decay_type is not None and decay_type == 'trainable':
# If parameter is not supposed to be annealed, but to be trained, return None
return None
return Scheduler(
initial_value=initial_value,
decay_type=decay_type,
decay_rate=decay_rate,
decay_steps=decay_steps,
device=device,
min_value=min_value,
value_name=parameter_name,
verbose=verbose
)
else:
raise NotImplementedError("parameter must be float or dict")
def get_non_linearity(nonlinearity):
# Takes a string-specification of non-linearity-type and turns it into a functional non-linearity
if nonlinearity.lower() == 'relu':
return F.relu
elif nonlinearity.lower() == 'sigmoid':
return F.sigmoid
elif nonlinearity.lower() == 'tanh':
return F.tanh
else:
raise NotImplementedError("Only relu or sigmoid or tanh admissible as non-linearities")
def get_optimizer(learning_rate: float or dict, model_parameters):
# This method returns an Adam optimizer according to some specification
if isinstance(learning_rate, float):
# Simple optimizer with constant learning rate for neural net
return torch.optim.Adam(params=model_parameters, lr=learning_rate)
elif isinstance(learning_rate, dict):
# Create optimizer plus a learning rate scheduler associated with optimizer
return torch.optim.Adam(params=model_parameters, lr=learning_rate['initial_value'])
else:
raise NotImplementedError("learning_rate must be (constant) float or dict.")
def get_lr_scheduler(learning_rate: float or dict, optimizer, train_iterations: int,
value_name: str = 'Learning Rate to be decreased'):
# For scheduling learning rates, readily available PyTorch schedulers are used. This function takes some
# specification of how the scheduler is supposed to work and returns a correspondingly set up scheduler
if isinstance(learning_rate, float):
# Simple optimizer with constant learning rate for neural net, thus no scheduler needed
return None
elif isinstance(learning_rate, dict):
# Whether learning rate scheduler shall print feedback or not
verbose = learning_rate['verbose'] if 'verbose' in learning_rate.keys() else False
decay_type = learning_rate['decay_type'].lower() if 'decay_type' in learning_rate.keys() else None
initial_lr = learning_rate['initial_value'] if 'initial_value' in learning_rate.keys() else None
decay_steps = learning_rate['decay_steps'] if 'decay_steps' in learning_rate.keys() else train_iterations
decay_rate = learning_rate['decay_rate'] if 'decay_rate' in learning_rate.keys() else None
min_value = learning_rate['min_value'] if 'min_value' in learning_rate.keys() else None
# Choose which scheduler to return - Dependent on the requested decay type
if decay_steps or decay_rate or min_value is not None:
# If settings are provided that could not be incorporated into PyTorch's own LR-schedulers, use a custom one
return CustomLRScheduler(optimizer=optimizer, initial_value=initial_lr,
decay_type=decay_type, decay_steps=decay_steps,
decay_rate=decay_rate, min_value=min_value, value_name=value_name, verbose=verbose)
elif decay_type == 'linear':
lambda_lr = lambda epoch: (train_iterations - epoch) / train_iterations
return LambdaLR(optimizer, lr_lambda=lambda_lr, verbose=verbose)
elif decay_type == 'exponential':
decay_factor = learning_rate['decay_factor'] if 'decay_factor' in learning_rate.keys() else 0.9
return ExponentialLR(optimizer, gamma=decay_factor, verbose=verbose)
elif decay_type == 'constant':
raise NotImplementedError("Provide a float value as learning rate parameter when intending to keep learning rate constant.")
else:
raise NotImplementedError("Learning rate decay may only be linear or exponential.")
else:
raise NotImplementedError("learning_rate_pol must be (constant) float or dict.")
def is_provided(param):
# Returns whether a parameter-specification is provided or not
if param is not None:
return True
return False
def is_trainable(param):
# Returns true if a provided parameter is supposed to be trainable, otherwise false
if isinstance(param, dict) and 'decay_type' in param.keys() and param['decay_type'] == 'trainable':
return True
return False
def nan_error(tensor):
# Checks if NAN-error is present in a tensor
return torch.isnan(tensor).any()
def print_nan_error_loss(loss, L_CLIP, L_V, action, log_prob, log_prob_old, state, state_val, L_ENTROPY=None):
# Print some special error message when NAN-error is detected
print(
"Loss happened to be nan. This indicates loss terms going out of bounds. Please check your hyperparameters once again.")
print('Values were as follows:\n')
print('Loss:', loss, '\nL_CLIP:', L_CLIP, '\nL_V:', L_V)
print('L_ENTROPY:', L_ENTROPY if L_ENTROPY else 'N/A')
print('action:', action, '\nlog_prob:', log_prob, '\nlog_prob_old:', log_prob_old)
print('state:', state, '\nstate_val:', state_val)