forked from mikislin/RPi4Cam
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession_processor.py
More file actions
156 lines (127 loc) · 5.46 KB
/
session_processor.py
File metadata and controls
156 lines (127 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# session_processor.py
import argparse
import csv
import re
from pathlib import Path
import sys
def find_session_files(session_dir: Path):
"""Finds all necessary data files within a session directory."""
files = {
'start_timestamps': None,
'frame_timestamps': [],
'gpio_events': None
}
start_ts_files = list(session_dir.glob("*_StartTimestamps.txt"))
if not start_ts_files:
raise FileNotFoundError(f"Could not find *_StartTimestamps.txt in {session_dir}")
files['start_timestamps'] = start_ts_files[0]
files['frame_timestamps'] = sorted(session_dir.glob("*_part_*.txt"))
if not files['frame_timestamps']:
print("Warning: No frame timestamp files (*_part_*.txt) found.")
gpio_files = list(session_dir.glob("*_events.csv"))
if gpio_files:
files['gpio_events'] = gpio_files[0]
else:
print("Info: No GPIO event file (*_events.csv) found. Proceeding without event data.")
return files
def parse_start_timestamps(filepath: Path):
"""Parses the _StartTimestamps.txt file to get a POSIX start time for each part."""
part_starts = {}
block_regex = re.compile(
r"--- Part (\d+) ---\s*.*?StartTimePOSIX:\s*([\d\.]+)",
re.DOTALL
)
with open(filepath, 'r') as f:
content = f.read()
for match in block_regex.finditer(content):
part_num = int(match.group(1))
posix_time = float(match.group(2))
part_starts[part_num] = posix_time
print(f" Found start time for Part {part_num}: {posix_time}")
return part_starts
def process_frame_files(frame_files: list, part_start_times: dict):
"""Processes all frame timestamp files and converts them to absolute POSIX times."""
all_frames = []
part_num_regex = re.compile(r"_part_(\d+)\.txt")
for file in frame_files:
match = part_num_regex.search(file.name)
if not match:
continue
part_num = int(match.group(1))
if part_num not in part_start_times:
print(f"Warning: No start time found for Part {part_num}. Skipping {file.name}.")
continue
part_start_time = part_start_times[part_num]
print(f"Processing frames for Part {part_num} (from {file.name})...")
with open(file, 'r') as f:
reader = csv.reader(f)
try:
next(reader) # Skip header
except StopIteration:
continue
frame_count = 0
for row in reader:
try:
relative_ms = float(row[0])
absolute_ts = part_start_time + (relative_ms / 1000.0)
all_frames.append({
'timestamp': absolute_ts,
'event_type': 'frame',
'source_file': file.name,
'details': f'frame_{frame_count}'
})
frame_count += 1
except (ValueError, IndexError):
continue
print(f" ...processed {frame_count} frames.")
return all_frames
def process_gpio_file(gpio_file: Path):
"""Processes the GPIO events CSV file."""
if not gpio_file:
return []
all_events = []
with open(gpio_file, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
try:
all_events.append({
'timestamp': float(row['posix_timestamp']),
'event_type': 'gpio',
'source_file': gpio_file.name,
'details': f"{row['event_label']} (pin {row['gpio_pin']})"
})
except (ValueError, KeyError):
continue
return all_events
def main():
parser = argparse.ArgumentParser(description="Process camera session data.")
parser.add_argument("session_dir", type=str, help="Path to the session directory.")
args = parser.parse_args()
session_path = Path(args.session_dir).resolve()
if not session_path.is_dir():
print(f"Error: Directory not found: {session_path}", file=sys.stderr)
sys.exit(1)
print(f"--- Starting processing for session: {session_path.name} ---")
try:
session_files = find_session_files(session_path)
part_start_times = parse_start_timestamps(session_files['start_timestamps'])
if not part_start_times:
print("Error: Could not extract part start times. Aborting.", file=sys.stderr)
sys.exit(1)
frame_data = process_frame_files(session_files['frame_timestamps'], part_start_times)
gpio_data = process_gpio_file(session_files['gpio_events'])
combined_data = sorted(frame_data + gpio_data, key=lambda x: x['timestamp'])
if not combined_data:
print("Warning: No data to write.")
return
output_filename = session_path / 'timestamps_events_absolute.csv'
with open(output_filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['timestamp', 'event_type', 'source_file', 'details'])
writer.writeheader()
writer.writerows(combined_data)
print(f"--- Processing Complete! Master file created: {output_filename} ---")
except Exception as e:
print(f"An unexpected error occurred: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()