-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpuffStim.py
More file actions
333 lines (262 loc) · 10.6 KB
/
puffStim.py
File metadata and controls
333 lines (262 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
'''
Joey Broussard
PNI
20200723
puffStim
Modification of eyeblink code for puff stim production
this is NOT implemented to be a slave
if on Raspberry, using
- serial to trigger a session/trial
- serial for event logging
- serial to stop a trial
- I2C in arduino code to control treadmill state
todo:
'''
import serial
import time
import os.path
from threading import Thread
import eventlet
eventlet.monkey_patch()
#Initialize picamera as a subprocess with pkill
#Serial anf initialization of object settings
serialStr = '/dev/ttyACM0' #uno
altSerialStr = '/dev/ttyACM1' #alternative uno port
options = {}
options['serial'] = {}
options['serial']['port'] = serialStr
options['serial']['baud'] = 115200 #57600
options['picamera'] = 0
trial = {}
trial['filePath'] = ''
trial['fileName'] = ''
trial['sessionNumber'] = 0
trial['sessionDur'] = 0 # (trialDur * numTrial)
trial['trialNumber'] = 0
trial['trialDur'] = 5000
trial['numTrial'] = 3
trial['interTrialInterval'] = 5000 #ms, ITI
trial['prePuffDur'] = 250 #ms, imaged time prior to stim presentation
trial['justFinished'] = False
trial['puffNum'] = 100 #number of puffs to deliver
trial['puffFreq'] = 0.5 #frequency of puff stimulation
#end options
#
class eyeblink():
def __init__(self):
self.animalID = 'default'
self.trial = trial
self.socketio = None
try:
self.ser = serial.Serial(options['serial']['port'], options['serial']['baud'], timeout=0.25)
except:
options['serial']['port'] = altSerialStr
try:
self.ser = serial.Serial(options['serial']['port'], options['serial']['baud'], timeout = 0.25)
except:
self.ser = None
print "======================================================"
print "ERROR: treadmill did not find serial port '", options['serial']['port'], "'"
print "======================================================"
if options['picamera']:
print 'treadmill is using raspberry pi camera'
#serial is blocking. we need our trial to run in a separate thread so we do not block user interface
self.trialRunning = 0
thread = Thread(target=self.background_thread, args=())
thread.daemon = True; #as a daemon the thread will stop when *this stops
thread.start()
#save all serial data to file, set in setsavepath
self.savepath = '/media/usb/'
self.filePtr = None
self.arduinoStateList = None #grab from arduino at start of trial, write into each epoch file
print 'eyeblink.trial:', self.trial
def background_thread(self):
'''Background thread to continuously read serial. Used during a trial.'''
while True:
if self.trialRunning:
str = self.ser.readline().rstrip()
if len(str) > 0:
print str
self.NewSerialData(str)
time.sleep(0.01)
def bAttachSocket(self, socketio):
print 'eyeblink.bAttachSocket() attaching socketio:', socketio
self.socketio = socketio
def NewSerialData(self, str):
'''
we have received new serial data. pass it back to socketio
special case is when we receive stopTrial
'''
#we want 'millis,event,val', if serial data does not match this then do nothing
try:
if len(str)>0:
#save to file
if self.filePtr:
self.filePtr.write(str + '\n')
#print "\t=== treadmill.NewSerialData sending serial data to socketio: '" + str + "'"
if self.socketio:
self.socketio.emit('serialdata', {'data': str})
#stop trial
parts = str.split(',')
if len(parts) > 1:
if parts[1] == 'stopTrial':
print 'Set justFinished to True'
self.trial['justFinished'] = True
except:
print "=============="
print "ERROR: eyeblink.NewSerialData()"
print "=============="
def startSession(self):
if self.trialRunning:
print 'warning: session is already running'
return 0
self.trial['sessionNumber'] += 1
self.trial['trialNumber'] = 0
self.newtrialfile(0)
if self.socketio:
self.socketio.emit('serialdata', {'data': "=== Session " + str(self.trial['sessionNumber']) + " ==="})
time.sleep(0.01)
self.ser.write('startSession\n')
self.trialRunning = 1
print 'eyeblink.startSession()'
return 1
def startTrial(self):
if not self.trialRunning:
print 'warning: startTrial() trial is not running'
return 0
self.trial['trialNumber'] += 1
self.newtrialfile(self.trial['trialNumber'])
if self.socketio:
self.socketio.emit('serialdata', {'data': "=== Trial " + str(self.trial['trialNumber']) + " ==="})
return 1
def stopSession(self):
if self.filePtr:
self.filePtr.close()
self.filePtr = None
self.trialRunning = 0
self.ser.write('stopSession\n')
if self.socketio:
self.socketio.emit('serialdata', {'data': "=== Stop Session " + str(self.trial['sessionNumber']) + " ==="})
print 'eyeblink.stopSession()'
def newtrialfile(self, trialNumber):
# open a file for this trial
dateStr = time.strftime("%Y%m%d")
timeStr = time.strftime("%H%M%S")
datetimeStr = dateStr + '_' + timeStr
sessionStr = ''
sessionFolder = ''
if self.animalID and not (self.animalID == 'default'):
sessionStr = self.animalID + '_'
sessionFolder = dateStr + '_' + self.animalID
thisSavePath = self.savepath + dateStr + '/'
if not os.path.exists(thisSavePath):
os.makedirs(thisSavePath)
thisSavePath += sessionFolder + '/'
if not os.path.exists(thisSavePath):
os.makedirs(thisSavePath)
sessionFileName = sessionStr + datetimeStr + '_s' + str(self.trial['sessionNumber']) + '_t' + str(self.trial['trialNumber']) + '.txt'
sessionFilePath = thisSavePath + sessionFileName
self.trial['filePath'] = sessionFilePath
self.trial['fileName'] = sessionFileName
#
#header line 1 is all arduino parameters
if trialNumber==0:
self.arduinoStateList = self.GetArduinoState()
self.filePtr = open(sessionFilePath, 'w')
self.filePtr.write('session='+str(self.trial['sessionNumber'])+';')
self.filePtr.write('trial='+str(self.trial['trialNumber'])+';')
self.filePtr.write('date='+dateStr+';')
self.filePtr.write('time='+timeStr+';')
for state in self.arduinoStateList:
self.filePtr.write(state + ';')
self.filePtr.write('\n')
#
#header line 2 is column names
self.NewSerialData('millis,event,value')
#
#each call to self.NewSerialData() will write serial data to this file
def settrial(self, key, val):
'''
set value for this trial
send serial to set value on arduino
'''
'''Turned off this block so that trial params can be updated online
if self.trialRunning:
print 'warning: trial is already running'
return 0
'''
val = str(val)
print "=== eyeblink.settrial() key:'" + key + "' val:'" + val + "'"
if key in self.trial:
self.trial[key] = val
serialCommand = 'settrial,' + key + ',' + val
serialCommand = str(serialCommand)
print "\teyeblink.settrial() writing to serial '" + serialCommand + "'"
self.ser.write(serialCommand + '\n')
else:
print '\tERROR: eyeblink:settrial() did not find', key, 'in trial dict'
def updatetrial(self):
numTrial = long(self.trial['numTrial'])
trialDur = long(self.trial['trialDur'])
totalDur = numTrial * trialDur
print 'updatesession() set sessionDur=', totalDur
self.trial['sessionDur'] = str(totalDur)
def GetArduinoState(self):
if self.trialRunning:
print 'warning: trial is already running'
return 0
if self.socketio:
self.socketio.emit('serialdata', {'data': "=== Arduino State ==="})
self.ser.write('getState\n')
#time.sleep(.02)
stateList = self.emptySerial()
if self.socketio:
self.socketio.emit('serialdata', {'data': "=== Done ==="})
return stateList
def emptySerial(self):
if self.trialRunning:
print 'warning: trial is already running'
return 0
theRet = []
line = self.ser.readline()
i = 0
while line:
line = line.rstrip()
theRet.append(line)
self.NewSerialData(line)
line = self.ser.readline()
i += 1
return theRet
def setserialport(self, newPort):
if self.trialRunning:
print 'warning: trial is already running'
return 0
if os.path.exists(newPort) :
print 'setserialport() port', newPort, 'exists'
options['serial']['port'] = newPort
return 1
else:
print 'setserialport() port', newPort, 'does not exist'
return 0
def checkserialport(self):
if self.trialRunning:
print 'warning: trial is already running'
return 0
port = options['serial']['port']
print 'checking', port
if os.path.exists(port) :
print 'exists'
return 1, port
else:
print 'does not exist'
return 0, port
def checkarduinoversion(self):
if self.trialRunning:
print 'warning: trial is already running'
return 0
self.ser.write('version\n')
self.emptySerial()
def setsavepath(self, str):
self.savepath = str
def __del__(self):
print('Eyeblink object terminated by destructor method')