-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenv_wrapper.py
More file actions
161 lines (125 loc) · 5.21 KB
/
env_wrapper.py
File metadata and controls
161 lines (125 loc) · 5.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import safety_gymnasium as gym
import gymnasium
import torch
import os
import numpy as np
import pickle
"""
Modes:
dist: postpi
pgbroil: pgbroil
mean: b-rex mean
map: b-rex map
trex: t-rex
true: optimize wrt to ground-truth reward
modes are named this way for historical reasons...
"""
class SafetyGoalFeatureWrapper(gymnasium.Wrapper):
"""
Hand design reward features
"""
def __init__(self, env, env_name, samples_filename=None, mode="dist", samples_type=["cp"]):
super().__init__(env)
self.true_reward = self.get_true_reward()
# self.task_id = env_name
self.env_name = env_name
if "Vision" in self.env_name:
self.observation_space = self.observation_space['vision']
self.samples_filename = samples_filename
self.mode = mode
if mode == "true":
self.reward_samples = self.true_reward.reshape(1, -1)
if samples_filename is not None:
# load reward samples file
samples_path = os.path.join("./reward_samples", env_name.replace("Vision", ""), samples_filename)
with open(samples_path, "rb") as f:
samples = pickle.load(f)
if mode == "dist" or mode == "pgbroil":
# for postpi or pgbroil, load the entire set of reward samples (candidate proposal set)
reward_samples = []
# we have two sets of reward samples, one reserved for candidate proposal
# and one reserved for safety test. samples_type determines
# which (or both) set to use.
if "cp" in samples_type:
reward_samples.append(samples['samples'])
if "st" in samples_type:
reward_samples.append(samples['test_samples'])
self.reward_samples = np.concatenate(reward_samples)
elif mode == "mean":
# load the mean reward
self.reward_samples = samples['mean_r'].reshape(1, -1)
elif mode == "map":
# load the map reward
self.reward_samples = samples['map_r'].reshape(1, -1)
elif mode == "trex":
# load the t-rex reward
self.reward_samples = samples.reshape(1, -1)
def get_true_reward(self):
# Get the ground-truth reward R*
if "Goal" in self.task_id:
# [dense reward, in goal, in hazard1, ..., in hazard n]
true_reward = [1, 1]
true_reward += [-1] * self.env.task._obstacles[1].num
return np.array(true_reward).astype(float)
elif "Circle" in self.task_id:
# [dense reward, in circle, out of boundary]
# return np.array([1, 0, -1]).astype(float)
# [dense reward, out of boundary]
return np.array([1, -1]).astype(float)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
if "Goal" in self.task_id:
info['state_features'] = np.array([0.0] * 6)
elif "Circle" in self.task_id:
info['state_features'] = np.array([0.0] * 2)
if "Vision" in self.env_name:
return obs['vision'], info
else:
return obs, info
def step(self, action):
obs, reward, cost, terminated, truncated, info = self.env.step(action)
if "Vision" in self.env_name:
obs = obs['vision']
# construct state features
features = self.state_features(reward, cost)
info["state_features"] = features
# if self.samples_filename is not None or self.mode == "true":
info["rewards"] = self.get_rewards(features)
return obs, reward - cost, terminated, truncated, info
def get_rewards(self, features):
return self.reward_samples @ features
def get_n_rewards(self):
# return number of rewrad samples
if self.mode in ["mean", "true", "map", "trex"]:
return 1
elif self.mode == "dist" or self.mode == "pgbroil":
return self.reward_samples.shape[0]
def state_features(self, reward, cost):
if "Goal" in self.task_id:
return self.goal_state_features(reward)
elif "Circle" in self.task_id:
return self.circle_state_features(reward, cost)
def circle_state_features(self, reward, cost):
# [dense reward, agent out of bound]
features = [reward]
features += [cost]
return np.array(features)
def goal_state_features(self, reward):
"""
reward: actual reward at this time step,
used for calculating dense reward component
"""
# [dense reward, agent in goal, agent in hazard 1, ..., agent in hazard n]
features = []
goal_achieved = int(np.round(reward))
# feature: dense reward
features.append(reward - goal_achieved)
# feature: agent in goal
features.append(goal_achieved)
# feature: agent in hazard
hazards = self.env.task._obstacles[1]
for h_pos in hazards.pos:
h_dist = hazards.agent.dist_xy(h_pos)
violate = int(h_dist <= hazards.size)
features.append(violate)
return np.array(features)