-
Notifications
You must be signed in to change notification settings - Fork 1
/
ofc4hci.py
2523 lines (2234 loc) · 165 KB
/
ofc4hci.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os.path
import pickle
import numpy as np
import pandas as pd
from scipy import stats
from scipy.linalg import expm, eigh, sqrtm
from scipy.optimize import differential_evolution
from zipfile import ZipFile
from io import BytesIO
from urllib.request import urlopen
from pd_dataset import datasetpreparation
def pointingdynamics_data(user, distance, width, direction,
manual_reactiontime_cutoff=True,
shift_target_to_origin=False, analysis_dim=2,
dir_path="PointingDynamicsDataset", DATAtrials=None, trial_info_path="PD-Dataset.pickle"):
"""
Computes reference trajectory (distribution) from Pointing Dynamics Dataset.
:param user: user ID [1-12]
:param distance: distance to target in px [765 or 1275]
:param width: corresponding target width in px [(distance, width) must be from {(765, 255), (1275, 425), (765, 51), (1275, 85), (765, 12), (1275, 20), (765, 3), (1275, 5)}]
:param direction: movement direction ["left" or "right"]
:param manual_reactiontime_cutoff: whether reaction times should be removed from reference user trajectory [bool]
:param shift_target_to_origin: whether to shift coordinate system such that target center is in origin [bool]
:param analysis_dim: how many dimensions of user data trajectories should be used for mean computation etc. (e.g., 2 means position and velocity only) [1-3]
:param dir_path: local path to Mueller's PointingDynamicsDataset (only used if "dir_path.txt" does not exist) [str]
:param DATAtrials: trial info object (if not provided, this is loaded from trial_info_path) [pandas.DataFrame]
:param trial_info_path: path to trial info object (if file does not exist, DATAtrials object is re-computed and stored to file) [str]
:return:
- x_loc_data: average user trajectory [(N+1)*analysis_dim-array]
- x_scale_data: covariance sequence of user trajectory [(N+1)*analysis_dim*analysis_dim-array]
- Nmax: number of time steps (excluding final step; computed as longest duration of considered trials) [int]
- dt: time step duration in seconds [int/float]
- dim: dimension of the task (1D, 2D, or 3D) [int]
- initialvalues_alltrials: list of tuples containing (actual) initial position, velocity, and acceleration for each considered user trial [list of tuples]
- T_alltrials: list of tuples containing (nominal) initial position (=target position of preceding trial) and current target position for each considered user trial [list of tuples]
- widthm_alltrials: list of tuples containing width (i.e., radius) of preceding and current target spheres for each considered user trial [list of tuples]
"""
# 1. Read dataset and meta information
# Provide local path to Mueller's PointingDynamicsDataset
# (http://joergmueller.info/controlpointing/PointingDynamicsDataset.zip)
if os.path.isfile("dir_path.txt"): # if dir_path has been stored from a previous run, use this
with open("dir_path.txt", "r") as file:
dir_path = file.read()
dir_path = os.path.abspath(dir_path)
else:
dir_path = os.path.abspath(dir_path)
if not os.path.exists(dir_path):
download_PointingDynamicsDataset = input("Could not find reference to PointingDynamicsDataset. Do you want to download it (~1.4GB)? (y/N) ")
if download_PointingDynamicsDataset.lower().startswith("y"):
print("Will download and unzip it to 'PointingDynamicsDataset/'.")
print("Downloading archive... ", end='', flush=True)
resp = urlopen("http://joergmueller.info/controlpointing/PointingDynamicsDataset.zip")
zipfile = ZipFile(BytesIO(resp.read()))
print("unzip archive... ", end='', flush=True)
for file in zipfile.namelist():
zipfile.extract(file)
print("done.")
dir_path = os.path.abspath("PointingDynamicsDataset")
assert os.path.exists(dir_path), "Internal Error during unpacking of PointingDynamicsDataset."
else:
dir_path = input("Insert the path to the PointingDynamicsDataset directory: ")
dir_path = os.path.abspath(dir_path)
assert os.path.exists(dir_path), "ERROR: Invalid input path."
with open("dir_path.txt", "w") as file: # remember dir_path for later runs
file.write(dir_path)
dim = 1 # dimension of the task (1D, 2D, or 3D)
if DATAtrials is None:
if trial_info_path is not None and os.path.exists(trial_info_path):
# -> OPTION 1: load trial information from file
with open(trial_info_path, 'rb') as handle:
DATAtrials = pickle.load(handle)
else:
# -> OPTION 2: compute and store trial information
DATAtrials = datasetpreparation(dir_path=dir_path)
with open('PD-Dataset.pickle', 'wb') as handle:
pickle.dump(DATAtrials, handle, protocol=pickle.HIGHEST_PROTOCOL)
filenameread = os.path.join(dir_path,f"P{user}",f"Data0,-1,{distance},{width}.csv")
assert os.path.isfile(filenameread), "Pointing data could not be found at given path. Make sure that 'dir_path.txt' in the project's main folder contains the correct path or delete the file."
DATA = pd.read_csv(filenameread)
dt = np.mean(np.diff(DATA.time)) # DATA time step duration
# 2. Split and pre-process dataset
available_trials = DATAtrials.loc[(DATAtrials['User'] == user) & (DATAtrials['Distance'] == distance) & (
DATAtrials['Width'] == width) & (DATAtrials['Direction'] == direction), 'N0':'N1']
N0_alltrials = available_trials['N0'].values[0]
ndelta_alltrials = available_trials['ndelta'].values[0]
N1_alltrials = available_trials['N1'].values[0]
T_alltrials = [[[target_pos for target_pos in DATA.target.unique() if target_pos != DATA.target[N0 + 1]][0],
DATA.target[N0 + 1]] for N0 in N0_alltrials]
widthm_alltrials = [[DATA.widthm[N0 + 1], DATA.widthm[N0 + 1]] for N0 in N0_alltrials]
# Remove reaction times
if manual_reactiontime_cutoff:
initialvalues_alltrials = [(DATA.y[ndelta], DATA.sgv[ndelta], DATA.sga[ndelta]) for ndelta in ndelta_alltrials]
ydata_alltrials = [
np.array([DATA.y[ndelta:N1 + 1], DATA.sgv[ndelta:N1 + 1], DATA.sga[ndelta:N1 + 1]]).transpose() for
ndelta, N1 in zip(ndelta_alltrials, N1_alltrials)]
else:
initialvalues_alltrials = [(DATA.y[N0], DATA.sgv[N0], DATA.sga[N0]) for N0 in N0_alltrials]
ydata_alltrials = [np.array([DATA.y[N0:N1 + 1], DATA.sgv[N0:N1 + 1], DATA.sga[N0:N1 + 1]]).transpose() for
N0, N1 in zip(N0_alltrials, N1_alltrials)]
# Translate coordinates such that target is in origin
if shift_target_to_origin:
initialvalues_alltrials = [
(initialvalues_trial[0] - T_trial[-1], initialvalues_trial[1], initialvalues_trial[2]) for
initialvalues_trial, T_trial in zip(initialvalues_alltrials, T_alltrials)]
ydata_alltrials = [ydata_trial - [T_trial[-1], 0, 0] for ydata_trial, T_trial in
zip(ydata_alltrials, T_alltrials)]
T_alltrials = [list(np.array(T_trial) - T_trial[-1]) for T_trial in T_alltrials]
# Remove trajectories with any outlier position value
ydata_alltrials = pd.Series(ydata_alltrials)[np.bitwise_and.reduce(np.squeeze([[np.abs(stats.zscore(
[ydata_trial[i, 0] if i < len(ydata_trial) else ydata_trial[-1, 0] for ydata_trial in ydata_alltrials])) < 3]
for i in range(
max([len(ydata_trial) for ydata_trial in ydata_alltrials]))]))].tolist()
# Remove trajectories with outlier duration
ydata_alltrials = pd.Series(ydata_alltrials)[
np.abs(stats.zscore([ydata_trial.shape[0] for ydata_trial in ydata_alltrials])) < 3].tolist()
# 3. Compute mean trajectories
# (Extend all trajectories to longest duration of considered trials (constantly using last value and zero velocity))
Nmax = max([ydata_trial.shape[0] for ydata_trial in ydata_alltrials]) - 1
x_loc_data = [None] * (Nmax + 1) # np.zeros(shape=(Nmax, ))
x_scale_data = [None] * (Nmax + 1) # np.zeros(shape=(Nmax, ))
for i in range(Nmax + 1):
x_loc_data[i] = np.mean(np.squeeze(
[ydata_trial[i, :analysis_dim] if ydata_trial.shape[0] > i else
(ydata_trial[-1, :] * np.repeat(np.array([1, 0, 0]), dim))[:analysis_dim] for
ydata_trial in ydata_alltrials]).reshape(-1, analysis_dim), axis=0)
x_scale_data[i] = np.cov(np.squeeze(
[ydata_trial[i, :analysis_dim] if ydata_trial.shape[0] > i else
(ydata_trial[-1, :] * np.repeat(np.array([1, 0, 0]), dim))[:analysis_dim] for
ydata_trial in ydata_alltrials]), rowvar=0).reshape(-1, analysis_dim)
x_loc_data = np.squeeze(x_loc_data)
x_scale_data = np.squeeze(x_scale_data)
return x_loc_data, x_scale_data, Nmax, dt, dim, initialvalues_alltrials, T_alltrials, widthm_alltrials
def _control_pointingdynamics(control_model, *args, **kwargs):
"""
Wrapper that passes args and kwargs to model-specific pointingdynamics function.
:param control_model: control model to use [str]
:param args: arbitrary number of arguments
:param kwargs: arbitrary number of keyword arguments
:return: output of respective model function
"""
if control_model == "2OL-Eq":
return secondorderlag_eq_pointingdynamics(*args, **kwargs)
elif control_model == "MinJerk":
return minjerk_pointingdynamics(*args, **kwargs)
elif control_model == "LQR":
return lqr_pointingdynamics(*args, **kwargs)
elif control_model == "LQG":
return lqg_pointingdynamics(*args, **kwargs, system_dynamics="LQG")
elif control_model == "E-LQG":
return lqg_pointingdynamics(*args, **kwargs, system_dynamics="E-LQG")
else:
raise NotImplementedError
def _get_custom_param_dict(control_model, user, distance, width, direction, param_dict_custom=None, use_opt_params=False):
"""
Sets up parameter dict consisting of custom and optimal parameter values (if "use_opt_params==True").
:param control_model: control model to use (only used if "use_opt_params==True") [str]
:param user: user ID (only used if "use_opt_params==True") [1-12]
:param distance: distance to target in px (only used if "use_opt_params==True") [765 or 1275]
:param width: corresponding target width in px (only used if "use_opt_params==True")
[(distance, width) must be from {(765, 255), (1275, 425), (765, 51), (1275, 85), (765, 12), (1275, 20), (765, 3), (1275, 5)}]
:param direction: movement direction (only used if "use_opt_params==True") ["left" or "right"]
:param param_dict_custom: dictionary with name-value pairs of "control_models" parameters [dict]
:param use_opt_params: whether optimal params should be used for non-specified parameters
(otherwise, these parameters are omitted from returned dictionary, i.e., their default values will be used) [bool]
:return: parameter dictionary [dict]
"""
# Load values of non-specified parameters (either default or optimal parameters)
if use_opt_params:
with open(f"optparams/{control_model}/optparam_{user}_{distance}-{width}_{direction}.npy", 'rb') as f:
opt_info_npy = np.load(f, allow_pickle=True)
param_dict = opt_info_npy[1]
else:
param_dict = {}
# Update dict with specified parameter values
if param_dict_custom is not None:
param_dict.update(param_dict_custom)
return param_dict
def secondorderlag_eq_pointingdynamics(user, distance, width, direction, secondorderlag_eq_param_dict,
manual_reactiontime_cutoff=True,
shift_target_to_origin=False, analysis_dim=1,
dir_path="PointingDynamicsDataset", DATAtrials=None, trial_info_path="PD-Dataset.pickle"):
"""
Computes 2OL-Eq trajectory for a given parameter set, as well as several metrics comparing this trajectory to
reference user trajectory from Pointing Dynamics Dataset.
:param user: user ID [1-12]
:param distance: distance to target in px [765 or 1275]
:param width: corresponding target width in px [(distance, width) must be from {(765, 255), (1275, 425), (765, 51), (1275, 85), (765, 12), (1275, 20), (765, 3), (1275, 5)}]
:param direction: movement direction ["left" or "right"]
:param secondorderlag_eq_param_dict: dictionary with name-value pairs of 2OL-Eq parameters; missing parameters are set to their default value [dict]
:param manual_reactiontime_cutoff: whether reaction times should be removed from reference user trajectory [bool]
:param shift_target_to_origin: whether to shift coordinate system such that target center is in origin [bool]
:param analysis_dim: how many dimensions of user data trajectories should be used for mean computation etc. (e.g., 2 means position and velocity only) [1-2]
:param dir_path: local path to Mueller's PointingDynamicsDataset (only used if "dir_path.txt" does not exist) [str]
:param DATAtrials: trial info object (if not provided, this is loaded from trial_info_path) [pandas.DataFrame]
:param trial_info_path: path to trial info object (if file does not exist, DATAtrials object is re-computed and stored to file) [str]
:return:
- x: resulting state sequence [(N+1)*n-array]
- u: resulting control sequence [N*dim-array]
- x_loc_data: average user trajectory [(N+1)*analysis_dim-array]
- SSE: Sum Squared Error between 2OL-Eq and user trajectory (along first analysis_dim dimensions) [float]
- MaximumError: Maximum Error between 2OL-Eq and user trajectory (along first analysis_dim dimensions) [float]
"""
assert analysis_dim in [1, 2], "Invalid analyis_dim."
# 1. Compute task information and reference data trajectories/distributions
x_loc_data, x_scale_data, Nmax, dt, dim, \
initialvalues_alltrials, T_alltrials, widthm_alltrials = pointingdynamics_data(user, distance, width, direction,
manual_reactiontime_cutoff=manual_reactiontime_cutoff,
shift_target_to_origin=shift_target_to_origin,
analysis_dim=analysis_dim,
dir_path=dir_path, DATAtrials=DATAtrials, trial_info_path=trial_info_path)
# 2. Compute initial state x0 (if not given)
T_alltrials = [T[1:] for T in T_alltrials] # remove initial position from target T (and thus from initial state x0)
x0_alltrials = [np.array([initialuservalues_trial[0], initialuservalues_trial[1]])
for initialuservalues_trial in initialvalues_alltrials]
x0 = np.mean(x0_alltrials, axis=0)
assert len(np.unique(T_alltrials)) == 1, "ERROR: Target changes during movement..."
T = T_alltrials[0]
# 3. Run 2OL-Eq
x, u = secondorderlag_eq(Nmax, dt, x0, dim, T, **secondorderlag_eq_param_dict)
# 4. Compute metrics that compare simulation and user data
x_loc_sim = x[:, :analysis_dim]
SSE = compute_SSE(x_loc_sim, x_loc_data)
MaximumError = compute_MaximumError(x_loc_sim, x_loc_data)
return x, u, x_loc_data, SSE, MaximumError
def minjerk_pointingdynamics(user, distance, width, direction, minjerk_param_dict,
manual_reactiontime_cutoff=True,
shift_target_to_origin=False, analysis_dim=1,
dir_path="PointingDynamicsDataset", DATAtrials=None, trial_info_path="PD-Dataset.pickle"):
"""
Computes MinJerk trajectory for a given parameter set, as well as several metrics comparing this trajectory to
reference user trajectory from Pointing Dynamics Dataset.
:param user: user ID [1-12]
:param distance: distance to target in px [765 or 1275]
:param width: corresponding target width in px [(distance, width) must be from {(765, 255), (1275, 425), (765, 51), (1275, 85), (765, 12), (1275, 20), (765, 3), (1275, 5)}]
:param direction: movement direction ["left" or "right"]
:param minjerk_param_dict: dictionary with name-value pairs of MinJerk parameters; missing parameters are set to their default value [dict]
:param manual_reactiontime_cutoff: whether reaction times should be removed from reference user trajectory [bool]
:param shift_target_to_origin: whether to shift coordinate system such that target center is in origin [bool]
:param analysis_dim: how many dimensions of user data trajectories should be used for mean computation etc. (e.g., 2 means position and velocity only) [1-3]
:param dir_path: local path to Mueller's PointingDynamicsDataset (only used if "dir_path.txt" does not exist) [str]
:param DATAtrials: trial info object (if not provided, this is loaded from trial_info_path) [pandas.DataFrame]
:param trial_info_path: path to trial info object (if file does not exist, DATAtrials object is re-computed and stored to file) [str]
:return:
- x: resulting state sequence [(N+1)*n-array]
- u: resulting control sequence [N*dim-array]
- x_loc_data: average user trajectory [(N+1)*analysis_dim-array]
- SSE: Sum Squared Error between MinJerk and user trajectory (along first analysis_dim dimensions) [float]
- MaximumError: Maximum Error between MinJerk and user trajectory (along first analysis_dim dimensions) [float]
"""
# 1. Compute task information and reference data trajectories/distributions
x_loc_data, x_scale_data, Nmax, dt, dim, \
initialvalues_alltrials, T_alltrials, widthm_alltrials = pointingdynamics_data(user, distance, width, direction,
manual_reactiontime_cutoff=manual_reactiontime_cutoff,
shift_target_to_origin=shift_target_to_origin,
analysis_dim=analysis_dim,
dir_path=dir_path, DATAtrials=DATAtrials, trial_info_path=trial_info_path)
# 2. Compute initial state x0 (if not given)
T_alltrials = [T[1:] for T in T_alltrials] # remove initial position from target T (and thus from initial state x0)
x0_alltrials = [np.array(list(initialuservalues_trial)) for initialuservalues_trial in initialvalues_alltrials]
x0 = np.mean(x0_alltrials, axis=0)
assert len(np.unique(T_alltrials)) == 1, "ERROR: Target changes during movement..."
T = T_alltrials[0]
# 3. Run MinJerk
x, u = minjerk(Nmax, dt, x0, dim, T, **minjerk_param_dict)
# 4. Compute metrics that compare simulation and user data
x_loc_sim = x[:, :analysis_dim]
SSE = compute_SSE(x_loc_sim, x_loc_data)
MaximumError = compute_MaximumError(x_loc_sim, x_loc_data)
return x, u, x_loc_data, SSE, MaximumError
def lqr_pointingdynamics(user, distance, width, direction, lqr_param_dict,
manual_reactiontime_cutoff=True,
shift_target_to_origin=False, analysis_dim=1,
dir_path="PointingDynamicsDataset", DATAtrials=None, trial_info_path="PD-Dataset.pickle"):
"""
Computes LQR trajectory for a given parameter set, as well as several metrics comparing this trajectory to
reference user trajectory from Pointing Dynamics Dataset.
:param user: user ID [1-12]
:param distance: distance to target in px [765 or 1275]
:param width: corresponding target width in px [(distance, width) must be from {(765, 255), (1275, 425), (765, 51), (1275, 85), (765, 12), (1275, 20), (765, 3), (1275, 5)}]
:param direction: movement direction ["left" or "right"]
:param lqr_param_dict: dictionary with name-value pairs of LQR parameters; missing parameters are set to their default value [dict]
:param manual_reactiontime_cutoff: whether reaction times should be removed from reference user trajectory [bool]
:param shift_target_to_origin: whether to shift coordinate system such that target center is in origin [bool]
:param analysis_dim: how many dimensions of user data trajectories should be used for mean computation etc. (e.g., 2 means position and velocity only) [1-3]
:param dir_path: local path to Mueller's PointingDynamicsDataset (only used if "dir_path.txt" does not exist) [str]
:param DATAtrials: trial info object (if not provided, this is loaded from trial_info_path) [pandas.DataFrame]
:param trial_info_path: path to trial info object (if file does not exist, DATAtrials object is re-computed and stored to file) [str]
:return:
- J: optimal costs [float]
- x: optimal state sequence [(N+1)*n-array]
- u: optimal control sequence [N*dim-array]
- x_loc_data: average user trajectory [(N+1)*analysis_dim-array]
- SSE: Sum Squared Error between LQR and user trajectory (along first analysis_dim dimensions) [float]
- MaximumError: Maximum Error between LQR and user trajectory (along first analysis_dim dimensions) [float]
"""
# 1. Compute task information and reference data trajectories/distributions
x_loc_data, x_scale_data, Nmax, dt, dim, \
initialvalues_alltrials, T_alltrials, widthm_alltrials = pointingdynamics_data(user, distance, width, direction,
manual_reactiontime_cutoff=manual_reactiontime_cutoff,
shift_target_to_origin=shift_target_to_origin,
analysis_dim=analysis_dim,
dir_path=dir_path, DATAtrials=DATAtrials, trial_info_path=trial_info_path)
num_targets = 1 # number of via-point targets (default pointing task: 1)
# 2. Compute initial state x0 (if not given)
T_alltrials = [T[1:] for T in T_alltrials] # remove initial position from target T (and thus from initial state x0)
x0_alltrials = [np.array([initialuservalues_trial[0], initialuservalues_trial[1]] + [0] * (2 * dim) + T_trial)
for (T_trial, initialuservalues_trial) in zip(T_alltrials, initialvalues_alltrials)]
x0 = np.mean(x0_alltrials, axis=0)
# 3. Run LQR
J, x, u = lqr(Nmax, dt, x0, dim, **lqr_param_dict, num_targets=num_targets)
# 4. Compute metrics that compare simulation and user data
x_loc_sim = x[:, :analysis_dim]
SSE = compute_SSE(x_loc_sim, x_loc_data)
MaximumError = compute_MaximumError(x_loc_sim, x_loc_data)
return J, x, u, x_loc_data, SSE, MaximumError
def lqg_pointingdynamics(user, distance, width, direction, lqg_param_dict,
system_dynamics="LQG",
manual_reactiontime_cutoff=True,
shift_target_to_origin=False, analysis_dim=2, analysis_dim_deterministic=1,
algorithm_iterations=20, J_eps=1e-3,
include_proprioceptive_target_signals=False,
include_proprioceptive_endeffector_signals=False,
dir_path="PointingDynamicsDataset", DATAtrials=None, trial_info_path="PD-Dataset.pickle"):
"""
Computes LQG or E-LQG trajectory for a given parameter set, and compares it to reference data from Pointing Dynamics Dataset using various metrics.
:param user: user ID [1-12]
:param distance: distance to target in px [765 or 1275]
:param width: corresponding target width in px [(distance, width) must be from {(765, 255), (1275, 425), (765, 51), (1275, 85), (765, 12), (1275, 20), (765, 3), (1275, 5)}]
:param direction: movement direction ["left" or "right"]
:param lqg_param_dict: dictionary with name-value pairs of LQG/E-LQG parameters; missing parameters are set to their default value [dict]
:param system_dynamics: which dynamics to use ["LQG" or "E-LQG"]
:param manual_reactiontime_cutoff: whether reaction times should be removed from reference user trajectory [bool]
:param shift_target_to_origin: whether to shift coordinate system such that target center is in origin [bool]
:param analysis_dim: how many dimensions of user data trajectories should be used for mean computation, stochastic measures, etc. (e.g., 2 means position and velocity only) [1-3]
:param analysis_dim_deterministic: how many dimensions of user data trajectories should be used for deterministic measures
(e.g., for comparisons with 2OL-Eq, MinJerk, and LQR; e.g., 2 means position and velocity only) [1-3]
:param algorithm_iterations: (maximum) number of iterations, where the optimal control and
the optimal observation problem is solved alternately (if "J_eps" is set, early termination is possible) [int]
:param J_eps: if relative improvement of cost function falls below "J_eps" and "min_algorithm_iterations" is reached,
iterative solving algorithm terminates [float]
:param include_proprioceptive_target_signals: whether target position(s) can be observed in absolute coordinates
(only usable for LQG; default: False) [bool]
:param include_proprioceptive_endeffector_signals: whether end-effector can be observed position in absolute coordinates
(only usable for E-LQG; default: False) [bool]
:param dir_path: local path to Mueller's PointingDynamicsDataset (only used if "dir_path.txt" does not exist) [str]
:param DATAtrials: trial info object (if not provided, this is loaded from trial_info_path) [pandas.DataFrame]
:param trial_info_path: path to trial info object (if file does not exist, DATAtrials object is re-computed and stored to file) [str]
:return:
- Ical_expectation: mean of expected optimal state (or rather internal state estimate) sequence [(N+1)*m-array]
- Sigma_x: variance of expected optimal (true) state sequence [list of (N+1) m*m-arrays]
- x_loc_data: average user trajectory [(N+1)*analysis_dim-array]
- x_scale_data: covariance sequence of user trajectory [(N+1)*analysis_dim*analysis_dim-array]
- SSE: Sum Squared Error between LQG/E-LQG and user trajectory (along first analysis_dim dimensions) [float]
- MaximumError: Maximum Error between LQR and user trajectory (along first analysis_dim dimensions) [float]
- MKL: Mean KL Divergence between LQG/E-LQG and user trajectory (along first analysis_dim dimensions) [float]
- MWD: Mean 2-Wasserstein Distance between LQG/E-LQG and user trajectory (along first analysis_dim dimensions) [float]
"""
# 1. Compute task information and reference data trajectories/distributions
x_loc_data, x_scale_data, Nmax, dt, dim, \
initialvalues_alltrials, T_alltrials, widthm_alltrials = pointingdynamics_data(user, distance, width, direction,
manual_reactiontime_cutoff=manual_reactiontime_cutoff,
shift_target_to_origin=shift_target_to_origin,
analysis_dim=analysis_dim,
dir_path=dir_path, DATAtrials=DATAtrials, trial_info_path=trial_info_path)
num_targets = 1 + (system_dynamics == "E-LQG") # number of via-point targets (default pointing task: 1 (2 if system_dynamics == "E-LQG", as initial position needs to be included then))
# 2. Compute initial state mean "x0_mean" and covariance "Sigma0" (if not given)
T_alltrials = [T[(system_dynamics == "LQG"):] for T in T_alltrials] # remove initial position from target T (and thus from initial state x0; only for LQG!)
x0_alltrials = [np.array([initialuservalues_trial[0], initialuservalues_trial[1]] + [0] * (2 * dim) + T_trial)
for (T_trial, initialuservalues_trial) in zip(T_alltrials, initialvalues_alltrials)]
x0_mean = np.mean(x0_alltrials, axis=0)
u0 = np.zeros(shape=(dim, ))
# Define initial covariance matrix (the variable Sigma0 corresponds to Sigma_1 in [Todorov1998])
Sigma0 = np.cov(x0_alltrials, rowvar=False)
Sigma0[np.abs(Sigma0) < 10e-10] = 0
(_, x_expectation, Sigma_x, _, _, _, _, _, _, _) = lqg(Nmax, dt, x0_mean, u0, Sigma0, dim, num_targets,
system_dynamics=system_dynamics,
include_proprioceptive_target_signals=include_proprioceptive_target_signals,
include_proprioceptive_endeffector_signals=include_proprioceptive_endeffector_signals,
minimum_computations=True,
algorithm_iterations=algorithm_iterations,
J_eps=J_eps,
**lqg_param_dict)
# 3. Compute metrics that compare simulation and user data
x_loc_sim = x_expectation[:, :analysis_dim]
x_scale_sim = np.squeeze([cov_matrix[:analysis_dim, :analysis_dim] for cov_matrix in Sigma_x])
x_loc_sim_deterministic = x_expectation[:, :analysis_dim_deterministic]
x_loc_data_deterministic = x_loc_data[:, :analysis_dim_deterministic]
SSE = compute_SSE(x_loc_sim_deterministic, x_loc_data_deterministic)
MaximumError = compute_MaximumError(x_loc_sim_deterministic, x_loc_data_deterministic)
MKL = compute_MKL_normal(x_loc_data, x_scale_data, x_loc_sim, x_scale_sim) #WARNING: order of arguments matter, since MKL is no real metric!
MWD = compute_MWD_normal(x_loc_data, x_scale_data, x_loc_sim, x_scale_sim)
return x_expectation, Sigma_x, x_loc_data, x_scale_data, SSE, MaximumError, MKL, MWD
def secondorderlag_eq(N, dt, x0, dim, T,
k=50, d=30):
"""
Computes 2OL-Eq trajectory of given duration, using dynamics as described in the paper.
:param N: number of time steps (excluding final step) [int]
:param dt: time step duration in seconds [h in paper] [int/float]
:param x0: initial state ((flattened) position and velocity) [list/array of length 2*dim]
:param dim: dimension of the task (1D, 2D, or 3D) [int]
:param num_targets: number of targets (does not have an actual effect right now, but could be used for via-point tasks) [int]
:param T: target position [list/array of length dim]
:param k: stiffness parameter [float]
:param d: damping parameter [float]
:return:
- x: resulting state sequence [(N+1)*n-array]
- u: resulting control sequence [N*dim-array]
"""
# 1. Compute required system matrices
n = dim * 2 # dimension of state vector (incorporating position and velocity)
## VARIANT 1: use forward Euler approximation of 2OL
# A = np.vstack((np.hstack((np.eye(dim), dt * np.eye(dim))),
# np.hstack(((-k * dt) * np.eye(dim), (1 - d * dt) * np.eye(dim)))))
# B = np.vstack((np.zeros(shape=(dim, dim)), dt * np.eye(dim)))
## VARIANT 2: use exact transformation from continuous to discrete 2OL system matrices
A_cont = np.vstack((np.hstack((np.zeros(shape=(dim, dim)), np.eye(dim))),
np.hstack((-k * np.eye(dim), -d * np.eye(dim)))))
B_cont = np.vstack((np.zeros(shape=(dim, dim)), np.eye(dim)))
A = expm(A_cont * dt)
B = np.linalg.pinv(A_cont).dot(A - np.eye(n)).dot(B_cont)
assert len(x0) == n, "Initial state x0 has wrong dimension!"
# 2. (Online) Control Algorithm
x = np.zeros((N + 1, n))
u = np.zeros((N, dim))
x[0] = x0
for i in range(0, N):
u[i] = k * np.array(T)
x[i + 1] = A.dot(x[i]) + B.dot(u[i])
return x, u
def minjerk(N, dt, x0, dim, T,
final_vel=None, final_acc=None,
passage_times=None):
"""
Computes MinJerk trajectory of given duration (actually, Minjerk is only applied until passage_times[-1] and constantly extended afterwards).
:param N: number of time steps (excluding final step) [int]
:param dt: time step duration in seconds [h in paper] [int/float]
:param x0: initial state ((flattened) position, velocity, acceleration) [list/array of length 3*dim]
:param dim: dimension of the task (1D, 2D, or 3D) [int]
:param T: target position [list/array of length dim]
:param final_vel: desired terminal velocity (if this is set, a different algorithm is used to compute MinJerk trajectory) [list/array of length dim]
:param final_acc: desired terminal acceleration (if this is set, a different algorithm is used to compute MinJerk trajectory) [list/array of length dim]
:param passage_times: array of indices that correspond to target passing times;
here, this should be [0, N_MJ], with N_MJ end time step of actual MinJerk trajectory (can be continuous) [2-array]
:return:
- x: resulting state sequence [(N+1)*n-array]
- u: resulting control sequence [N*dim-array]
"""
# 1. Compute required system matrices
if passage_times is None:
passage_times = np.linspace(0, N, 2).astype(
int) # WARNING: here: equally distributed target passage times!
assert len(passage_times) == 2
n = dim * 3 # dimension of state vector (incorporating position, velocity, acceleration, and target position(s))
N_MJ = np.ceil(passage_times[1]).astype(int)
# 2. (Online) Control Algorithm
x = np.zeros((N + 1, n))
u = np.zeros((N, dim))
if (final_vel is None) and (final_acc is None):
### VARIANT 1: (this variant terminates with zero velocity and acceleration, independent of "final_vel" and "final_acc"!)
A = [None] * (N_MJ)
B = [None] * (N_MJ)
x[0] = x0
for i in range(0, N_MJ):
u[i] = T
# Compute time-dependent system matrices (with time relexation due to usage of passage_times[1] instead of int(passage_times[1])) [HoffArbib93]
movement_time = (passage_times[1] - i) * dt
A_continuous = np.vstack(
(np.hstack((np.zeros(shape=(dim, dim)), np.eye(dim), np.zeros(shape=(dim, dim)))),
np.hstack((np.zeros(shape=(dim, 2 * dim)), np.eye(dim))),
np.hstack(((-60 / (movement_time ** 3)) * np.eye(dim), (-36 / (movement_time ** 2)) * np.eye(dim),
(-9 / movement_time) * np.eye(dim)))))
B_continuous = np.vstack((np.zeros(shape=(2 * dim, dim)), (60 / (movement_time ** 3)) * np.eye(dim)))
# Use explicit solution formula
A[i] = expm(A_continuous * dt)
B[i] = np.linalg.pinv(A_continuous).dot(A[i] - np.eye(n)).dot(B_continuous)
############################################
x[i + 1] = A[i].dot(x[i]) + B[i].dot(u[i])
else:
### VARIANT 2:
# Explicit Solution Formula (current implementation only yields position time series!)
if final_vel is None:
final_vel = np.zeros((dim,))
if final_acc is None:
final_acc = np.zeros((dim,))
final_vel = np.array(final_vel)
final_acc = np.array(final_acc)
assert final_vel.shape == (dim,)
assert final_acc.shape == (dim,)
t_f = passage_times[1] * dt
coeff_vec = np.array([[x0[0 + i], t_f * x0[1*dim + i], 0.5 * (t_f ** 2) * x0[2*dim + i],
-10 * x0[0 + i] - 6 * t_f * x0[1*dim + i] - 1.5 * (t_f ** 2) * x0[2*dim + i] + 10 * T[i] - 4 * t_f * final_vel[i] + 0.5 * (t_f ** 2) * final_acc[i],
15 * x0[0 + i] + 8 * t_f * x0[1*dim + i] + 1.5 * (t_f ** 2) * x0[2*dim + i] - 15 * T[i] + 7 * t_f * final_vel[i] - 1 * (t_f ** 2) * final_acc[i],
-6 * x0[0 + i] - 3 * t_f * x0[1*dim + i] - 0.5 * (t_f ** 2) * x0[2*dim + i] + 6 * T[i] - 3 * t_f * final_vel[i] + 0.5 * (t_f ** 2) * final_acc[i]] for i in range(dim)] +
[[x0[1*dim + i], t_f * x0[2*dim + i],
(3 / t_f) * (-10 * x0[0 + i] - 6 * t_f * x0[1*dim + i] - 1.5 * (t_f ** 2) * x0[2*dim + i] + 10 * T[i] - 4 * t_f * final_vel[i] + 0.5 * (t_f ** 2) * final_acc[i]),
(4 / t_f) * (15 * x0[0 + i] + 8 * t_f * x0[1*dim + i] + 1.5 * (t_f ** 2) * x0[2*dim + i] - 15 * T[i] + 7 * t_f * final_vel[i] - 1 * (t_f ** 2) * final_acc[i]),
(5 / t_f) * (-6 * x0[0 + i] - 3 * t_f * x0[1*dim + i] - 0.5 * (t_f ** 2) * x0[2*dim + i] + 6 * T[i] - 3 * t_f * final_vel[i] + 0.5 * (t_f ** 2) * final_acc[i]), 0] for i in range(dim)] +
[[x0[2*dim + i],
(2 / t_f) * (3 / t_f) * (-10 * x0[0 + i] - 6 * t_f * x0[1*dim + i] - 1.5 * (t_f ** 2) * x0[2*dim + i] + 10 * T[i] - 4 * t_f * final_vel[i] + 0.5 * (t_f ** 2) * final_acc[i]),
(3 / t_f) * (4 / t_f) * (15 * x0[0 + i] + 8 * t_f * x0[1*dim + i] + 1.5 * (t_f ** 2) * x0[2*dim + i] - 15 * T[i] + 7 * t_f * final_vel[i] - 1 * (t_f ** 2) * final_acc[i]),
(4 / t_f) * (5 / t_f) * (-6 * x0[0 + i] - 3 * t_f * x0[1*dim + i] - 0.5 * (t_f ** 2) * x0[2*dim + i] + 6 * T[i] - 3 * t_f * final_vel[i] + 0.5 * (t_f ** 2) * final_acc[i]), 0, 0] for i in range(dim)])
x[:N_MJ + 1, :] = np.squeeze([coeff_vec @ np.array([(j / passage_times[1]) ** ii for ii in range(6)]) for j in range(N_MJ + 1)])
u[:N_MJ] = T
# 3. Constantly extend trajectory with last position, zero velocity, and zero acceleration
for i in range(N_MJ, N): # extend trajectory with constant last value
x[i + 1] = x[i] * np.repeat(np.array([1, 0, 0]), dim)
u[i] = T
return x, u
def lqr(N, dt, x0, dim,
r=-np.log(5e-3), velocitycosts_weight=5e-2, forcecosts_weight=1e-3,
mass=1, t_const_1=0.04, t_const_2=0.04,
num_targets=1):
"""
Computes LQR trajectory of given duration, using stage costs and dynamics as described in the paper.
:param N: number of time steps (excluding final step) [int]
:param dt: time step duration in seconds [h in paper] [int/float]
:param x0: initial state ((flattened) position, velocity, muscle force, muscle excitation, target position(s)) [array-like (1D)]
:param dim: dimension of the task (1D, 2D, or 3D) [int]
:param r: negative log (!) of effort cost weight [corresponds to -np.log(omega_r) with omega_r from paper] [int/float]
:param velocitycosts_weight: velocity cost weight [omega_v in paper] [positive int/float]
:param forcecosts_weight: force cost weight [omega_f in paper] [positive int/float]
:param mass: mass of object to which forces are applied [positive int/float]
:param t_const_1: time activation constant [tau_1 in paper] [float]
:param t_const_2: time excitation constant [tau_2 in paper] [float]
:param num_targets: number of targets (can be used for via-point tasks) [int]
:return:
- J: optimal costs [float]
- x: optimal state sequence [(N+1)*n-array]
- u: optimal control sequence [N*dim-array]
"""
# 1. Compute required system matrices
n = dim * (4 + num_targets) # dimension of state vector (incorporating position, velocity, force, excitation, and target(s) T (no initial position!))
Delta = 0 # number of delay time steps
m = dim * (4 + Delta + num_targets) # dimension of information vector (corresponding to state vector)
A = np.vstack((np.hstack((np.eye(dim), dt * np.eye(dim), np.zeros(shape=(dim, n - 2 * dim)))),
np.hstack((
np.zeros(shape=(dim, dim)), np.eye(dim), (dt / mass) * np.eye(dim),
np.zeros(shape=(dim, n - 3 * dim)))),
np.hstack((np.zeros(shape=(dim, 2 * dim)), (1 - (dt / t_const_2)) * np.eye(dim),
(dt / t_const_2) * np.eye(dim), np.zeros(shape=(dim, n - 4 * dim)))),
np.hstack((np.zeros(shape=(dim, 3 * dim)), (1 - (dt / t_const_1)) * np.eye(dim),
np.zeros(shape=(dim, n - 4 * dim)))),
np.hstack((np.zeros(shape=(n - 4 * dim, 4 * dim)), np.eye(n - 4 * dim)))))
B = np.vstack(
(np.zeros(shape=(3 * dim, dim)), (dt / t_const_1) * np.eye(dim), np.zeros(shape=(n - 4 * dim, dim))))
assert len(x0) == n, "Initial state x0 has wrong dimension!"
Q = []
for i in range(1, num_targets + 1):
D0i = np.vstack((np.hstack((np.eye(dim), np.zeros(shape=(dim, dim * (3 + (i - 1)))), -1 * np.eye(dim),
np.zeros(shape=(dim, n - dim * (4 + i))))),
np.zeros(shape=(dim * (3 + (i - 1)), n)),
np.hstack((-1 * np.eye(dim), np.zeros(shape=(dim, dim * (3 + (i - 1)))), np.eye(dim),
np.zeros(shape=(dim, n - dim * (4 + i))))),
np.zeros(shape=(n - dim * (4 + i), n))))
assert num_targets == 1
for j in range(N + 1):
Q.append(D0i.copy())
# INFO: Here, velocity and force costs are applied curing the complete movement as well!
D_v = np.zeros((dim, n))
D_v[:, dim:(2 * dim)] = np.eye(dim)
D_f = np.zeros((dim, n))
D_f[:, (2 * dim):(3 * dim)] = np.eye(dim)
for i in range(N + 1):
Q[i] += velocitycosts_weight * (D_v.transpose().dot(D_v))
Q[i] += forcecosts_weight * (D_f.transpose().dot(D_f))
R = (np.exp(-r) / (N - 1)) * np.eye(dim)
# 2. Compute Matrices required for Optimal Control Law
Ahelp = np.dot((np.linalg.matrix_power(A, Delta)),
np.hstack((np.eye(n), np.zeros(shape=(n, m - n + (int(not (m - n))) * dim)))))
Bhelp = [np.zeros(shape=(n, m + (int(not (m - n))) * dim))] * (Delta + 1)
for i in range(1, Delta + 1):
Bhelp[i] = np.dot((np.linalg.matrix_power(A, (i - 1))), B).dot(np.hstack((np.zeros(
shape=(dim, m - (1 - (int(not (m - n)))) * dim * i)), np.eye(dim), np.zeros(shape=(dim, dim * i - dim)))))
M = Ahelp + np.sum(Bhelp, 0)
# WARNING! Not usable for non-constant targets!
Acal = np.vstack((np.hstack((A, B.dot(np.eye(dim, min(dim, m - n))),
np.zeros(shape=(n, m - n - min(dim, m - n) + (int(not (m - n))) * dim)))),
np.hstack((np.zeros(
shape=(m - n - min(dim, m - n), n + min(dim, m - n) + (int(not (m - n))) * dim)),
np.eye(m - n - min(dim, m - n)))),
np.zeros(shape=(dim, m + (int(not (m - n))) * dim))))
Bcal = np.vstack((B[:(int(not (m - n))) * n, :], np.zeros(shape=((1 - (int(not (m - n)))) * n, dim)),
np.zeros(shape=(m - n - min(dim, m - n), dim)), np.eye(dim)))
# NOTE: if Delta!=0, i.e., m!=n, B is included in Acal, NOT in Bcal!
# 3. Compute Optimal Control Law
#RI1 = R.dot(np.hstack((np.zeros(shape=(dim, m - (1 - (int(not (m - n)))) * dim)), np.eye(dim)))) #only required when penalizing derivative of controls
#I1RI1 = np.vstack((np.zeros(shape=(m - (1 - (int(not (m - n)))) * dim, dim)), np.eye(dim))).dot(RI1) #only required when penalizing derivative of controls
RI1 = np.zeros(shape=(dim, m + dim))
I1RI1 = np.zeros(shape=(m + dim, m + dim))
L = [None] * (N + 1)
L[N] = np.hstack((np.zeros(shape=(dim, m - (1 - (int(not (m - n)))) * dim)), np.eye(dim)))
S = M.transpose().dot(Q[N]).dot(M)
for i in range(N - 1, -1, -1):
Bcal_Siplus1 = Bcal.transpose().dot(S)
Shelp = RI1 - Bcal_Siplus1.dot(Acal)
try:
L[i] = np.linalg.solve(R + Bcal_Siplus1.dot(Bcal), Shelp)
except np.linalg.LinAlgError as err:
if 'Singular matrix' in str(err):
print('ERROR in Discrete Riccati Equation: Singular matrix!\n')
# L[i] = np.linalg.pinv(Bcal_Siplus1.dot(Bcal)).dot(Shelp)
else:
raise ValueError
S = Acal.transpose().dot(S).dot(Acal) + M.transpose().dot(Q[i]).dot(M) + I1RI1 - Shelp.transpose().dot(L[i])
# 4. (Online) Control Algorithm
Ical = [None] * (N + 1)
Ical[0] = np.concatenate((x0, np.zeros((dim, )))) # extend to information vector space (albeit not required here)
J = Ical[0].transpose().dot(S).dot(Ical[0]) # optimal (expected) cost
x = np.zeros((N + 1, n))
u = np.zeros((N, dim))
J_cp = 0
for i in range(0, N):
u[i] = (L[i].dot(Ical[i]))
Ical[i + 1] = Acal.dot(Ical[i]) + Bcal.dot(u[i])
x[i] = (M.dot(Ical[i]))
J_cp += x[i].transpose().dot(Q[i]).dot(x[i])
J_cp += (u[i]).transpose().dot(R).dot(u[i])
x[N] = (M.dot(Ical[N]))
J_cp += x[N].transpose().dot(Q[N]).dot(x[N])
if J < 0:
print(f"!!! IMPORTANT WARNING (LQR): J = {J}!")
#print(f'LQR - J (expected; backward computation): {J}')
#print(f'LQR - J (expected; forward computation): {J_cp}')
return J, x, u
def lqg(N, dt, x0_mean, u0, Sigma0, dim, num_targets,
r=-np.log(5e-3), velocitycosts_weight=5e-2, forcecosts_weight=1e-3,
mass=1, t_const_1=0.4, t_const_2=0.4,
sigma_u=0.2, sigma_c=5e-5,
sigma_s=0.2, #only used for LQG
sigma_H=0.01, sigma_Hdot=0.1, sigma_frc=0.5, sigma_e=0.1, gamma=0.5, #only used for E-LQG
passage_times=None,
saccade_times=None, #only used for E-LQG
Delta=0,
system_dynamics="LQG",
use_information_vector=False,
include_proprioceptive_target_signals=False, #only used for LQG
include_proprioceptive_endeffector_signals=False, #only used for E-LQG
modify_init_target_estimate=True, #only used for E-LQG
use_square_root_filter=True, minimum_computations=False,
min_algorithm_iterations=1, algorithm_iterations=20, J_eps=1e-3):
"""
Computes LQG or E-LQG trajectory of given duration, using stage costs and dynamics as described in the paper.
:param N: number of time steps (excluding final step) [int]
:param dt: time step duration in seconds [h in paper] [int/float]
:param x0_mean: initial state expectation ((flattened) position, velocity, muscle force, muscle excitation, [initial position (only if system_dynamics=="E-LQG"),] target position(s)) [array-like (1D)]
:param u0: initial control value (only used if "use_information_vector == True") [dim-array]
:param Sigma0: initial state covariance matrix [array-like (2D)]
:param dim: dimension of the task (1D, 2D, or 3D) [int]
:param num_targets: number of targets [if system_dynamics=="E-LQG": including initial position] (can be used for via-point tasks) [int]
:param r: negative log (!) of effort cost weight [corresponds to -np.log(omega_r) with omega_r from paper] [int/float]
:param velocitycosts_weight: velocity cost weight [omega_v in paper] [positive int/float]
:param forcecosts_weight: force cost weight [omega_f in paper] [positive int/float]
:param mass: mass of object to which forces are applied [positive int/float]
:param t_const_1: time activation constant [tau_1 in paper] [float]
:param t_const_2: time excitation constant [tau_2 in paper] [float]
:param sigma_u: signal-dependent (multiplicative) control noise level [float]
:param sigma_c: constant (i.e., signal-independent) control noise level [float]
:param sigma_s: observation noise scaling parameter (only used for LQG) [float]
:param sigma_H: proprioceptive position noise level (only used for E-LQG, only used if "include_proprioceptive_endeffector_signals==True") [float]
:param sigma_Hdot: velocity perception noise level [sigma_v in paper] (only used for E-LQG) [float]
:param sigma_frc: force perception noise level [sigma_f in paper] (only used for E-LQG) [float]
:param sigma_e: gaze noise level (only used for E-LQG) [float]
:param gamma: position perception noise weight (only used for E-LQG) [float]
:param passage_times: array of indices that correspond to target passing times in via-point tasks;
at these time steps, distance, velocity, and force costs are applied [num_targets-array]
:param saccade_times: array of indices that correspond to saccade times [see n_s in paper] (only used for E-LQG) [num_targets-array]
:param Delta: observation time lag in time steps (experimental!; default: 0) [int]
:param system_dynamics: which dynamics to use ["LQG" or "E-LQG"]
:param use_information_vector: whether to augment state vectors with latest controls (needs to be True if Delta > 0) [bool]
:param include_proprioceptive_target_signals: whether target position(s) can be observed in absolute coordinates
(only usable for LQG; default: False) [bool]
:param include_proprioceptive_endeffector_signals: whether end-effector can be observed position in absolute coordinates
(only usable for E-LQG; default: False) [bool]
:param modify_init_target_estimate: whether to place an incorrect initial target estimate
(basically, the target estimate is set to initial movement position, which makes particular sense for via-point tasks)
(only works for E-LQG or if "include_proprioceptive_target_signals==True") [bool]
:param use_square_root_filter: whether to use the square root filter to update Kalman matrices (default: True) [bool]
:param minimum_computations: if True, realized costs and other stuff are not computed and printed (useful in optimizations etc.) [bool]
:param min_algorithm_iterations: minimum number of iterations (see "algorithm_iterations")
:param algorithm_iterations: (maximum) number of iterations, where the optimal control and
the optimal observation problem is solved alternately (if "J_eps" is set, early termination is possible) [int]
:param J_eps: if relative improvement of cost function falls below "J_eps" and "min_algorithm_iterations" is reached,
iterative solving algorithm terminates [float]
:return:
- J: optimal (expected) costs [float]
- x_expectation: mean of expected optimal state sequence (except for target components,
which correspond to internal estimates at same time) [(N+1)*m-array]
- Sigma_x: variance of expected optimal (true) state sequence [list of (N+Delta+1) m*m-arrays]
- u_expectation: expected optimal control sequence [N*dim-array]
- Ical: sequence internal state estimate means [list of (N+1) m-arrays]
- x: true state sequence [(N+1)*n-array]
- u: optimal control sequence (depending on realization of observations y) [N*dim-array]
- y: observation sequence [(N+1)*l-array]
- L: optimal feedback gain matrices [list of N dim*m-arrays]
- Kk: Kalman filter gain matrices [list of N m*l-arrays]
"""
assert algorithm_iterations >= 0
if Delta != 0:
print("WARNING: The current costs cannot be used with Delta > 0. Setting Delta to 0!")
Delta = 0
# INFO: if costs are manually provided that allow for Delta > 0, use_information_vector also needs to be set to True!
# 1. Set up optimal control problem and initialize values
system_matrices, n, m, l, gamma, passage_times, saccade_times = lqg_initialization(N, dt, dim, r=r,
velocitycosts_weight=velocitycosts_weight,
forcecosts_weight=forcecosts_weight,
mass=mass,
t_const_1=t_const_1,
t_const_2=t_const_2,
sigma_u=sigma_u,
sigma_c=sigma_c,
sigma_H=sigma_H,
sigma_Hdot=sigma_Hdot,
sigma_e=sigma_e,
gamma=gamma,
sigma_s=sigma_s,
sigma_frc=sigma_frc,
passage_times=passage_times,
saccade_times=saccade_times,
Delta=Delta,
system_dynamics=system_dynamics,
num_targets=num_targets,
use_information_vector=use_information_vector,
include_proprioceptive_endeffector_signals=include_proprioceptive_endeffector_signals,
include_proprioceptive_target_signals=include_proprioceptive_target_signals)
initial_state_matrices = lqg_setup_initial_state(dim, num_targets, x0_mean, u0, Sigma0, n, m, Delta=Delta,
system_dynamics=system_dynamics,
use_information_vector=use_information_vector,
include_proprioceptive_target_signals=include_proprioceptive_target_signals,
modify_init_target_estimate=modify_init_target_estimate)
Ical0, Sigma_x0 = initial_state_matrices[:2]
J = 10e+12 #use very large costs costs as initial reference value
Kk = None
# 2. Alternately solve the optimal control given fixed Kalman gain matrices Kk and the optimal observation problem given fixed feedback gains L
for iter in range(algorithm_iterations + 1):
# 2.1. Solve the optimal control problem given fixed Kalman gain matrices Kk and compute resulting costs
J_before = J
L, S_x, S_e, alpha = lqg_solve_control_problem(N, dim, num_targets, gamma, saccade_times, system_matrices, m, l,
Delta=Delta, Kk=Kk)
J = lqg_compute_optimal_costs(Ical0, Sigma_x0, S_x, S_e, alpha)
# assert J >= 0
if J < 0:
print(f"!!! IMPORTANT WARNING (LQG): J = {J}!")
# 2.2. Solve the optimal observation problem given fixed feedback gains L
Kk, Kcal, Sigma_Ical, Sigma_ecal, Sigma_Icalecal = lqg_solve_observation_problem(N, dim, num_targets,
gamma,
saccade_times,
system_matrices,
initial_state_matrices,
m, l, Delta=Delta, L=L,
use_square_root_filter=use_square_root_filter)
# 2.3. Decide whether to stop iteration
if J_eps is not None:
if (J <= J_before) and (np.abs((J - J_before)/J_before) <= J_eps) and (iter >= min_algorithm_iterations - 1):
if not minimum_computations:
print(f'{system_dynamics} converged after {iter + 1} iterations.')
break
elif iter == algorithm_iterations - 1:
if not minimum_computations:
print(f'WARNING: LQG stopped after {iter + 1} iterations without convergence.')
# 3. Run forward pass using optimal feedback gains L and Kalman gains Kk
J_cp, J_sample, x_expectation, Sigma_x, u_expectation, Ical, x, u, y = lqg_forward_simulation(
L,
Kk, Kcal, Sigma_Ical, Sigma_ecal, Sigma_Icalecal,
N,
dim,
num_targets,
x0_mean, u0, Sigma0,
system_matrices,
initial_state_matrices,
n, m, l,
gamma, saccade_times,
Delta=Delta,
minimum_computations=minimum_computations,
use_information_vector=use_information_vector)
if not minimum_computations:
print(f'LQG - J (expected; backward computation): {J}')
print(f'LQG - J (sample-based (internal); forward computation): {J_cp}')
print(f'LQG - J (sample-based; forward computation): {J_sample}')
# print(f'LQG - J (sample-based; forward computation; ONLY STATE COSTS): {J_sample_state}')
return J, x_expectation, Sigma_x, u_expectation, Ical, x, u, y, L, Kk
def lqg_initialization(N, dt, dim,
r=-np.log(5e-7), velocitycosts_weight=1.0, forcecosts_weight=1.0,
mass=1, t_const_1=0.4, t_const_2=0.4,
sigma_u=0.2, sigma_c=5e-5, sigma_s=0.2, sigma_H=0.01, sigma_Hdot=0.1, sigma_frc=0.5, sigma_e=0.1, gamma=0.5,
passage_times=None, saccade_times=None, Delta=0,
system_dynamics="LQG",
num_targets=1,
use_information_vector=False,
include_proprioceptive_target_signals=False,
include_proprioceptive_endeffector_signals=False):
"""
Provides system matrices required to compute LQG/E-LQG solution.
:param N: number of time steps (excluding final step) [int]
:param dt: time step duration in seconds [h in paper] [int/float]
:param x0_mean: initial state expectation ((flattened) position, velocity, muscle force, muscle excitation, [initial position (only if system_dynamics=="E-LQG"),] target position(s)) [array-like (1D)]
:param u0: initial control value (only used if "use_information_vector == True") [dim-array]
:param Sigma0: initial state covariance matrix [array-like (2D)]
:param dim: dimension of the task (1D, 2D, or 3D) [int]
:param r: negative log (!) of effort cost weight [corresponds to -np.log(omega_r) with omega_r from paper] [int/float]
:param velocitycosts_weight: velocity cost weight [omega_v in paper] [positive int/float]
:param forcecosts_weight: force cost weight [omega_f in paper] [positive int/float]
:param mass: mass of object to which forces are applied [positive int/float]
:param t_const_1: time activation constant [tau_1 in paper] [float]
:param t_const_2: time excitation constant [tau_2 in paper] [float]
:param sigma_u: signal-dependent (multiplicative) control noise level [float]
:param sigma_c: constant (i.e., signal-independent) control noise level [float]
:param sigma_s: observation noise scaling parameter (only used for LQG) [float]
:param sigma_H: proprioceptive position noise level (only used for E-LQG, only used if "include_proprioceptive_endeffector_signals==True") [float]
:param sigma_Hdot: velocity perception noise level [sigma_v in paper] (only used for E-LQG) [float]
:param sigma_frc: force perception noise level [sigma_f in paper] (only used for E-LQG) [float]
:param sigma_e: gaze noise level (only used for E-LQG) [float]
:param gamma: position perception noise weight (only used for E-LQG) [float]
:param passage_times: array of indices that correspond to target passing times in via-point tasks;
at these time steps, distance, velocity, and force costs are applied [num_targets-array]
:param saccade_times: array of indices that correspond to saccade times [see n_s in paper] (only used for E-LQG) [num_targets-array]
:param Delta: observation time lag in time steps (experimental!; default: 0) [int]
:param system_dynamics: which dynamics to use ["LQG" or "E-LQG"]
:param num_targets: number of targets [if system_dynamics=="E-LQG": including initial position] (can be used for via-point tasks) [int]
:param use_information_vector: whether to augment state vectors with latest controls (needs to be True if Delta > 0) [bool]
:param include_proprioceptive_target_signals: whether target position(s) can be observed in absolute coordinates
(only usable for LQG; default: False) [bool]
:param include_proprioceptive_endeffector_signals: whether end-effector can be observed position in absolute coordinates
(only usable for E-LQG; default: False) [bool]
:return:
- system_matrices: tuple of relevant system matrices (see definition below) that only need to be computed once at the beginning [tuple]
- n: dimension of state vector (see comment below)
- m: dimension of information vector (see comment below)
- l: dimension of observation vector (see comment below)
- gamma: see above
- passage_times: see above (might be set to default values if initially None)
- saccade_times: see above (might be set to default values if initially None)
"""
# INFO: for E-LQG, proprioceptive noise (defined by sigma_H, sigma_e) should be much larger than visual noise (e.g., defined by gamma) (following Todorov1998, p.55)
if passage_times is None:
passage_times = np.linspace(0, N, num_targets).astype(int) if num_targets > 1 \
else np.array([N]) # WARNING: here: equally distributed target passage times!
if saccade_times is None:
saccade_times = np.clip(-Delta + 50 + np.linspace(0, N, num_targets), -Delta, -Delta + N).astype(
int) # from Todorov1998, Fig. 4-6 #WARNING: optimize over eye fixation times!
if system_dynamics == "E-LQG":
assert len(passage_times) == num_targets
assert len(saccade_times) == num_targets
Delta = np.round(Delta).astype(int) # cast Delta to integer
#gamma = np.max((gamma, 4e-18)) # ensure positive gamma to avoid numerical instabilities
# 1. Compute system dynamics matrices
n = dim * (4 + num_targets) # dimension of state vector (incorporating position, velocity, force, excitation, [initial position (only if system_dynamics=="E-LQG"),] and target(s) T)
m = dim * (4 + num_targets + max(1, Delta) * use_information_vector) # dimension of information vector (corresponding to state vector)
if system_dynamics == "E-LQG":
l = dim * (3 + include_proprioceptive_endeffector_signals + num_targets) # dimension of observation vector (incorporating position [if include_proprioceptive_endeffector_signals], velocity, and force [and eye fixation, and targets relative to eye fixation (defined by saccade_times and list of targets)])
elif system_dynamics == "LQG":
l = dim * (3 + (num_targets * include_proprioceptive_target_signals)) # dimension of observation vector (incorporating position, velocity, and force [and targets])
else:
raise NotImplementedError
# INFO: If Delta=0, m!=n still holds (i.e., information vector has size dim*(2+num_targets+1)=m+dim), since u_(k-1) needs to be propagated if the derivatives of controls are penalized!
A = np.vstack((np.hstack((np.eye(dim), dt * np.eye(dim), np.zeros(shape=(dim, n - 2 * dim)))),
np.hstack((
np.zeros(shape=(dim, dim)), np.eye(dim), (dt/mass) * np.eye(dim), np.zeros(shape=(dim, n - 3 * dim)))),
np.hstack((np.zeros(shape=(dim, 2 * dim)), (1 - (dt/t_const_2)) * np.eye(dim),
(dt/t_const_2) * np.eye(dim), np.zeros(shape=(dim, n - 4 * dim)))),
np.hstack((np.zeros(shape=(dim, 3 * dim)), (1 - (dt/t_const_1)) * np.eye(dim),
np.zeros(shape=(dim, n - 4 * dim)))),
np.hstack((np.zeros(shape=(n - 4 * dim, 4 * dim)), np.eye(n - 4 * dim)))))
B = np.vstack((np.zeros(shape=(3 * dim, dim)), (dt/t_const_1) * np.eye(dim), np.zeros(shape=(n - 4 * dim, dim))))
C = sigma_u * B
D = sigma_c * np.diag(np.sum(B, axis=1)) # additive noise is only applied to control
# 2. Compute observation dynamics matrices
def get_current_Hk__ELQG(index, saccade_times, dim, num_targets):
"""
Provides current observation matrix.
Here, observation model from Todorov98_thesis is combined with system dynamics (LQG) from Todorov05.
:param index: current time step [int]
:param saccade_times: array of indices that correspond to saccade times [see n_s in paper] [num_targets-array]
:param dim: dimension of the task (1D, 2D, or 3D) [int]
:param num_targets: number of targets including initial position (can be used for via-point tasks) [int]
:return:
- Hk: observation matrix at current time step [l*n-array]
- f_k: index of targets that is currently fixated (starting with 1 as initial position, 2 as first target, ...) [int]
"""
remaining_fixations_indices = [i + 1 for i, switch_time in enumerate(saccade_times) if index <= switch_time]
remaining_fixations_indices_next = [i + 1 for i, switch_time in enumerate(saccade_times) if
index + 1 <= switch_time]
if len(remaining_fixations_indices) > 0:
f_k = remaining_fixations_indices[0]
else: # stay at last fixation point (= last target) if there are some time steps remaining...
f_k = len(saccade_times)
if include_proprioceptive_endeffector_signals:
Hk = np.vstack((np.hstack((np.eye(3 * dim), np.zeros(shape=(3 * dim, n - 3 * dim)))),