forked from SjulsonLab/RPi4_behavior_boxes
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsoyoun_task.py
More file actions
327 lines (292 loc) · 13.4 KB
/
soyoun_task.py
File metadata and controls
327 lines (292 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# python3: soyoun_task.py
import importlib
from transitions import Machine
from transitions import State
from transitions.extensions.states import add_state_features, Timeout
import pysistence, collections
from icecream import ic
import logging
import time
from datetime import datetime
import os
from gpiozero import PWMLED, LED, Button
from colorama import Fore, Style
import logging.config
from time import sleep
import random
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": True,
}
)
# all modules above this line will have logging disabled
import behavbox
# adding timing capability to the state machine
@add_state_features(Timeout)
class TimedStateMachine(Machine):
pass
class SoyounTask(object):
# Define states. States where the animals is waited to make their decision
def __init__(self, **kwargs): # name and task_information should be provided as kwargs
# if no name or session, make fake ones (for testing purposes)
if kwargs.get("name", None) is None:
self.name = "name"
print(
Fore.RED
+ Style.BRIGHT
+ "Warning: no name supplied; making fake one"
+ Style.RESET_ALL
)
else:
self.name = kwargs.get("name", None)
if kwargs.get("session_info", None) is None:
print(
Fore.RED
+ Style.BRIGHT
+ "Warning: no session_info supplied; making fake one"
+ Style.RESET_ALL
)
from fake_session_info import fake_session_info
self.session_info = fake_session_info
else:
self.session_info = kwargs.get("session_info", None)
ic(self.session_info)
if kwargs.get("task_information", None) is None:
print(
Fore.RED
+ Style.BRIGHT
+ "Warning: no task_information supplied; making fake one"
+ Style.RESET_ALL
)
from task_information_phase_1 import task_information
self.task_information = task_information
else:
self.task_information = kwargs.get("task_information", None)
ic(self.task_information)
self.error_repeat = self.task_information['error_repeat']
self.error_count_max = self.task_information['error_repeat_max']
# initialize the state machine
self.states = [
State(name='standby',
on_enter=["enter_standby"],
on_exit=["exit_standby"]),
Timeout(name='draw',
on_enter=["enter_draw"],
on_exit=["exit_draw"],
timeout=0,
on_timeout=["play_game"]
),
Timeout(name="initiate",
on_enter=["enter_initiate"],
on_exit=["exit_initiate"],
timeout=self.task_information["initiation_timeout"],
on_timeout=["restart"]),
Timeout(name='cue_state',
on_enter=["enter_cue_state"],
on_exit=["exit_cue_state"],
timeout=self.task_information["cue_timeout"],
on_timeout=["restart"]),
Timeout(name='reward_available',
on_enter=["enter_reward_available"],
on_exit=["exit_reward_available"],
timeout=self.task_information["reward_timeout"] +
self.task_information["reward_wait"],
on_timeout=["restart"])
]
self.transitions = [
['start_trial', 'standby', 'draw'], # format: ['trigger', 'origin', 'destination']
['play_game', 'draw', 'initiate'],
['start_cue', 'initiate', 'cue_state'],
['evaluate_reward', 'cue_state', 'reward_available'],
['restart', ['initiate', 'cue_state', 'reward_available'], 'standby']
]
self.machine = TimedStateMachine(
model=self,
states=self.states,
transitions=self.transitions,
initial='standby'
)
self.session_length = len(self.task_information["block_list"])
self.trial_running = False
self.restart_flag = False
# self.restart_flag_inter = False
self.trial_number = 0
self.error_count = 0
self.card_count = -1
self.deck = self.task_information["deck"]
self.current_card = None
# initialize behavior box
self.box = behavbox.BehavBox(self.session_info)
self.pump = behavbox.Pump()
self.treadmill = self.box.treadmill
self.distance_initiation = self.task_information['treadmill_setup']['distance_initiation']
self.distance_buffer = self.treadmill.distance_cm
self.distance_diff = 0
########################################################################
# functions called when state transitions occur
########################################################################
def run(self):
if self.box.event_list:
event_name = self.box.event_list.popleft()
else:
event_name = ""
if self.state == "standby":
pass
elif self.state == "draw":
self.play_game()
# elif self.restart_flag:
# self.restart()
elif self.state == "initiate":
if self.distance_buffer:
self.distance_diff = self.treadmill.distance_cm - self.distance_buffer
if self.distance_diff >= self.distance_initiation:
self.distance_buffer = self.treadmill.distance_cm
self.distance_diff = 0
self.start_cue()
else:
self.error_count += 1
elif self.state == "cue_state":
if self.distance_buffer:
self.distance_diff = self.treadmill.distance_cm - self.distance_buffer
distance_required = self.task_information['treadmill_setup'][
self.task_information["state"][self.current_card[1]]]
if self.distance_diff >= distance_required:
self.distance_buffer = self.treadmill.distance_cm
self.distance_diff = 0
self.evaluate_reward()
else:
self.error_count += 1
self.restart_flag = True
# logging.info(str(time.time()) + ", " + str(
# self.trial_number) + ", treadmill state distance did not pass")
elif self.state == "reward_available":
# first detect the lick signal:
cue_state = self.current_card[0]
side_choice = self.task_information['choice'][self.current_card[2]]
# question: do we want entry mark as lick?
side_mice = None
if event_name == "left_IR_entry":
side_mice = 'left'
elif event_name == "right_IR_entry":
side_mice = 'right'
if side_mice:
reward_size = self.task_information['reward'][self.current_card[3]]
if cue_state == 2:
self.pump.reward(side_mice, self.task_information["reward_size"][reward_size])
elif side_choice == side_mice:
if side_mice == 'left':
reward_side = '1'
elif side_mice == 'right':
reward_side = '2'
# reward_size = self.task_information['reward'][self.current_card[3]]
self.pump.reward(reward_side, self.task_information["reward_size"][reward_size])
else:
self.error_count += 1
self.restart_flag = True
self.restart()
else:
self.error_count += 1
self.restart_flag = True
# look for keystrokes
self.box.check_keybd()
def enter_standby(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", entering standby, prepare to start the trial...")
self.trial_running = False
# if self.restart_flag:
# time.sleep(self.task_information["punishment_timeout"])
# pass
time.sleep(self.task_information["reward_wait"])
def exit_standby(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", exiting standby")
self.trial_number += 1
pass
def enter_draw(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", entering draw")
if self.card_count >= self.session_length:
self.trial_running = False # terminate the state machine, end the session
elif self.error_repeat and (self.error_count < self.error_count_max):
self.trial_running = True
else:
self.restart_flag = False
self.trial_running = True
def exit_draw(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", exiting draw")
# if self.restart_flag:
# # self.error_count += 1
# self.restart_flag = False
# else:
# self.card_count += 1
# # print(str(self.card_count))
# self.current_card = self.deck[self.card_count]
print(str(self.current_card))
card_cue = self.task_information['cue'][self.current_card[0]]
card_state = self.task_information['state'][self.current_card[1]]
card_choice = self.task_information['choice'][self.current_card[2]]
card_reward = self.task_information['reward'][self.current_card[3]]
print("****************************\n" +
"Current card condition: \n" +
"****************************\n" +
"*Cue: " + str(card_cue) + "\n" +
"*State: " + str(card_state) + "\n" +
"*Choice: " + str(card_choice) + "\n" +
"*Reward: " + str(card_reward) + "\n")
def enter_initiate(self):
# check error_repeat
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", entering initiate")
# wait for treadmill signal and process the treadmill signal
self.distance_buffer = self.treadmill.distance_cm
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", treadmill distance t0: " + str(self.distance_buffer))
def exit_initiate(self):
# check the flag to see whether to shuffle or keep the original card
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", exiting initiate")
self.distance_buffer = self.treadmill.distance_cm
self.restart_flag = True
def enter_cue_state(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", entering cue state")
# turn on the cue according to the current card
self.check_cue(self.task_information['cue'][self.current_card[0]])
# wait for treadmill signal and process the treadmill signal
self.distance_buffer = self.treadmill.distance_cm
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", treadmill distance t0: " + str(self.distance_buffer))
def exit_cue_state(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", exiting cue state")
self.cue_off(self.task_information['cue'][self.current_card[0]])
def enter_reward_available(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", entering reward available")
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", cue_state distance satisfied, treadmill: " + str(self.treadmill.distance_cm))
self.cue_off(self.task_information['cue'][self.current_card[0]])
def exit_reward_available(self):
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", exiting reward available")
pass
def check_cue(self, cue):
if cue == 'sound':
self.box.sound1.on() # could be modify according to specific sound cue
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", cue sound1 on")
elif cue == 'LED':
self.box.cueLED1.on()
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", cueLED1 on")
else:
self.box.sound1.on()
self.box.cueLED1.on()
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", sound1 + cueLED1 on (free choice)")
def cue_off(self, cue):
if cue == 'sound':
self.box.sound1.off() # could be modify according to specific sound cue
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", cue sound1 off")
elif cue == 'LED':
self.box.cueLED1.off()
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", cueLED1 off")
else:
self.box.sound1.off()
self.box.cueLED1.off()
logging.info(str(time.time()) + ", " + str(self.trial_number) + ", sound1 + cueLED1 off (free choice)")
########################################################################
# methods to start and end the behavioral session
########################################################################
def start_session(self):
ic("TODO: start video")
self.box.video_start()
def end_session(self):
ic("TODO: stop video")
self.box.video_stop()