-
Notifications
You must be signed in to change notification settings - Fork 0
/
her_sac_gym_fetch_reach.py
108 lines (101 loc) · 4.4 KB
/
her_sac_gym_fetch_reach.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import gym
import rlkit.torch.pytorch_util as ptu
from rlkit.data_management.obs_dict_replay_buffer import ObsDictRelabelingBuffer
from rlkit.launchers.launcher_util import setup_logger
from rlkit.samplers.data_collector import GoalConditionedPathCollector
from rlkit.torch.her.her import HERTrainer
from rlkit.torch.networks import FlattenMlp
from rlkit.torch.sac.policies import MakeDeterministic, TanhGaussianPolicy
from rlkit.torch.sac.sac import SACTrainer
from rlkit.torch.torch_rl_algorithm import TorchBatchRLAlgorithm
def experiment(variant):
eval_env = gym.make('FetchReach-v1')
expl_env = gym.make('FetchReach-v1')
observation_key = 'observation'
desired_goal_key = 'desired_goal'
achieved_goal_key = desired_goal_key.replace("desired", "achieved")
replay_buffer = ObsDictRelabelingBuffer(env=eval_env,
observation_key=observation_key,
desired_goal_key=desired_goal_key,
achieved_goal_key=achieved_goal_key,
**variant['replay_buffer_kwargs'])
obs_dim = eval_env.observation_space.spaces['observation'].low.size
action_dim = eval_env.action_space.low.size
goal_dim = eval_env.observation_space.spaces['desired_goal'].low.size
qf1 = FlattenMlp(input_size=obs_dim + action_dim + goal_dim,
output_size=1,
**variant['qf_kwargs'])
qf2 = FlattenMlp(input_size=obs_dim + action_dim + goal_dim,
output_size=1,
**variant['qf_kwargs'])
target_qf1 = FlattenMlp(input_size=obs_dim + action_dim + goal_dim,
output_size=1,
**variant['qf_kwargs'])
target_qf2 = FlattenMlp(input_size=obs_dim + action_dim + goal_dim,
output_size=1,
**variant['qf_kwargs'])
policy = TanhGaussianPolicy(obs_dim=obs_dim + goal_dim,
action_dim=action_dim,
**variant['policy_kwargs'])
eval_policy = MakeDeterministic(policy)
trainer = SACTrainer(env=eval_env,
policy=policy,
qf1=qf1,
qf2=qf2,
target_qf1=target_qf1,
target_qf2=target_qf2,
**variant['sac_trainer_kwargs'])
trainer = HERTrainer(trainer)
eval_path_collector = GoalConditionedPathCollector(
eval_env,
eval_policy,
observation_key=observation_key,
desired_goal_key=desired_goal_key,
)
expl_path_collector = GoalConditionedPathCollector(
expl_env,
policy,
observation_key=observation_key,
desired_goal_key=desired_goal_key,
)
algorithm = TorchBatchRLAlgorithm(trainer=trainer,
exploration_env=expl_env,
evaluation_env=eval_env,
exploration_data_collector=expl_path_collector,
evaluation_data_collector=eval_path_collector,
replay_buffer=replay_buffer,
**variant['algo_kwargs'])
algorithm.to(ptu.device)
algorithm.train()
if __name__ == "__main__":
variant = dict(
algorithm='HER-SAC',
version='normal',
algo_kwargs=dict(
batch_size=128,
num_epochs=100,
num_eval_steps_per_epoch=5000,
num_expl_steps_per_train_loop=1000,
num_trains_per_train_loop=1000,
min_num_steps_before_training=1000,
max_path_length=50,
),
sac_trainer_kwargs=dict(
discount=0.99,
soft_target_tau=5e-3,
target_update_period=1,
policy_lr=3E-4,
qf_lr=3E-4,
reward_scale=1,
use_automatic_entropy_tuning=True,
),
replay_buffer_kwargs=dict(
max_size=int(1E6),
fraction_goals_rollout_goals=0.2, # equal to k = 4 in HER paper
fraction_goals_env_goals=0,
),
qf_kwargs=dict(hidden_sizes=[400, 300], ),
policy_kwargs=dict(hidden_sizes=[400, 300], ),
)
setup_logger('her-sac-fetch-experiment', variant=variant)
experiment(variant)