-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
72 lines (60 loc) · 2.27 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import numpy as np
import tensorflow as tf
import random
import scipy.signal
import scipy.optimize
dtype = tf.float32
def normalized_columns_initializer(std=1.0):
def _initializer(shape, dtype=None, partition_info=None):
out = np.random.randn(*shape).astype(np.float32)
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
return tf.constant(out)
return _initializer
def linear(x, size, name, initializer=None, bias_init=0):
w = tf.get_variable(name + "/w", [x.get_shape()[1], size], initializer=initializer)
b = tf.get_variable(name + "/b", [size], initializer=tf.constant_initializer(bias_init))
return tf.matmul(x, w) + b
def create_net(obs, hidden_sizes, nonlinear, action_size):
x = obs
for i in range(len(hidden_sizes)):
x = linear(x, hidden_sizes[i], "net/l{}".format(i), normalized_columns_initializer(0.01))
if nonlinear[i]:
x = tf.nn.tanh(x)
action = linear(x, action_size, "net/mean")
return action
def var_shape(x):
out = [k.value for k in x.get_shape()]
assert all(isinstance(a, int) for a in out), \
"shape function assumes that shape is fully known"
return out
def numel(x):
return np.prod(var_shape(x))
class GetFlat(object):
def __init__(self, session, var_list):
self.session = session
self.op = tf.concat(0, [tf.reshape(v, [numel(v)]) for v in var_list])
def __call__(self):
return self.op.eval(session=self.session)
class SetFromFlat(object):
def __init__(self, session, var_list):
self.session = session
assigns = []
shapes = map(var_shape, var_list)
total_size = sum(np.prod(shape) for shape in shapes)
self.theta = theta = tf.placeholder(dtype, [total_size])
start = 0
assigns = []
for (shape, v) in zip(shapes, var_list):
size = np.prod(shape)
assigns.append(
tf.assign(
v,
tf.reshape(
theta[
start:start +
size],
shape)))
start += size
self.op = tf.group(*assigns)
def __call__(self, theta):
self.session.run(self.op, feed_dict={self.theta: theta})