-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathregressionrnn.py
153 lines (123 loc) · 4.93 KB
/
regressionrnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#coding=utf-8
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
BATCH_START = 0
TIME_STEPS = 20
BATCH_SIZE = 50
INPUT_SIZE = 1
OUTPUT_SIZE = 1
CELL_SIZE = 10
LR = 0.006
def get_batch(): #目的:拿res序列来预测seq序列(RNN使用一段序列来预测另一段序列)
global BATCH_START , TIME_STEPS
#xs shape(50 batch , 20 steps)
xs = np.arange(BATCH_START , BATCH_START + TIME_STEPS * BATCH_SIZE).reshape((BATCH_SIZE , TIME_STEPS))/ (10 * np.pi)#为了数据呈现更好
seq = np.sin(xs)
res = np.cos(xs)
BATCH_START += TIME_STEPS
return [seq[: , : , np.newaxis] , res[: , : , np.newaxis] , xs]
class LSTMRNN(object):
def __init__(self , n_steps , input_size , output_size , cell_size , batch_size):
self.n_steps = n_steps
self.input_size = input_size
self.output_size = output_size
self.cell_size = cell_size
self.batch_size = batch_size
with tf.name_scope('inputs'):
self.xs = tf.placeholder(tf.float32 , [None , n_steps , input_size] , name='xs')
self.ys = tf.placeholder(tf.float32 , [None , n_steps , output_size] , name = 'ys')
with tf.variable_scope('in_hidden'):
self.add_input_layer()
with tf.variable_scope('LSTM_cell'):
self.add_cell()
with tf.variable_scope('out_hidden'):
self.add_output_layer()
with tf.name_scope('cost'):
self.compute_cost()
with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(LR).minimize(self.cost)
def add_input_layer(self , ): #都必须经历从3d变成2d,然后再变回3d的过程,因为和权重矩阵相乘必须是二维的。
l_in_x = tf.reshape(self.xs , [-1 , self.input_size] , name = '2_2D') #(batch*n_step , in_size)
#Ws (in_size , cell_size)
Ws_in = self._weight_variable([self.input_size , self.cell_size])
#bs (cell_size , )
bs_in = self._bias_variable([self.cell_size ,])
# l_in_y = (batch * n_step , cell_size)
with tf.name_scope('Wx_plus_b'):
l_in_y = tf.matmul(l_in_x , Ws_in) + bs_in
# reshape l_in_y ==> (batch , n_steps , cell_size)
self.l_in_y = tf.reshape(l_in_y , [-1 , self.n_steps, self.cell_size] , name='2_3D')
def add_cell(self):
lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(self.cell_size , forget_bias = 1.0 , state_is_tuple = True)
with tf.name_scope('initial_state'):
self.cell_init_state = lstm_cell.zero_state(self.batch_size , dtype=tf.float32)
self.cell_outputs , self.cell_final_state = tf.nn.dynamic_rnn(lstm_cell , self.l_in_y , initial_state = self.cell_init_state , time_major = False)
def add_output_layer(self):
#shape = (batch * steps , cell_size)
l_out_x = tf.reshape(self.cell_outputs , [-1 , self.cell_size] , name = '2_2D')
Ws_out = self._weight_variable([self.cell_size , self.output_size])
bs_out = self._bias_variable([self.output_size ,])
#shape = (batch , steps , output_size)
with tf.name_scope('Wx_plus_b'):
self.pred = tf.matmul(l_out_x , Ws_out) + bs_out
def compute_cost(self):
losses = tf.nn.seq2seq.sequence_loss_by_example(
[tf.reshape(self.pred , [-1] , name='reshape_pred')] ,
[tf.reshape(self.ys , [-1] , name = 'reshape_target')] ,
[tf.ones([self.batch_size * self.n_steps] , dtype = tf.float32)] ,
average_across_timesteps = True ,
softmax_loss_function = self.ms_error ,
name = 'losses'
)
with tf.name_scope('average_cost'):
self.cost = tf.div(
tf.reduce_sum(losses , name = 'losses_sum') ,
self.batch_size ,
name = 'average_cost'
)
tf.scalar_summary('cost' , self.cost)
def ms_error(self , y_pre , y_target):
return tf.square(tf.sub(y_pre , y_target))
def _weight_variable(self , shape , name = 'weights'):
initializer = tf.random_normal_initializer(mean = 0. , stddev = 1. , )
return tf.get_variable(shape = shape , initializer = initializer , name = name)
def _bias_variable(self , shape , name = 'biases'):
initializer = tf.constant_initializer(0.1)
return tf.get_variable(name = name , shape = shape , initializer = initializer)
if __name__ == '__main__':
model = LSTMRNN(TIME_STEPS , INPUT_SIZE , OUTPUT_SIZE , CELL_SIZE , BATCH_SIZE)
sess = tf.Session()
merged = tf.merge_all_summaries()
writer = tf.train.SummaryWriter("logs" , sess.graph)
sess.run(tf.initialize_all_variables())
plt.ion()
plt.show()
for i in range(200):
seq , res , xs = get_batch()
if i == 0:
feed_dict = {
model.xs : seq ,
model.ys : res ,
# create initial state
}
else:
feed_dict = {
model.xs : seq ,
model.ys : res ,
model.cell_init_state : state # use last state as the initial state for this run
}
_ , cost , state , pred = sess.run(
[model.train_op , model.cost , model.cell_final_state , model.pred] ,
feed_dict = feed_dict
)
# plotting
plt.plot(xs[0 , :] , res[0].flatten() , 'r' , xs[0 , :] , pred.flatten()[:TIME_STEPS] , 'b--')
plt.ylim((-1.2 , 1.2))
plt.draw()
plt.pause(0.3)
if i % 20 == 0:
print 'cost: ' , round(cost , 4)
## for tensorboard
#result = sess.run(merged , feed_dict)
#writer.add_summary(result , i)