-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
* feat(autoware_debug_tools): add processing time visualizer Signed-off-by: Y.Hisaki <[email protected]> * ignore spell check Signed-off-by: Y.Hisaki <[email protected]> * update everythings Signed-off-by: Y.Hisaki <[email protected]> --------- Signed-off-by: Y.Hisaki <[email protected]>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Autoware Debug Tools | ||
|
||
This package provides tools for debugging Autoware. | ||
|
||
## Processing Time Visualizer | ||
|
||
This tool visualizes `tier4_debug_msgs/msg/ProcessingTimeTree` messages. | ||
|
||
### Usage | ||
|
||
1. Run the following command to start the visualizer. | ||
|
||
```bash | ||
ros2 run autoware_debug_tools processing_time_visualizer | ||
``` | ||
|
||
2. Select a topic to visualize. | ||
|
||
 | ||
|
||
3. Then, the visualizer will show the processing time tree. | ||
|
||
 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import curses | ||
import time | ||
from typing import Dict | ||
import uuid | ||
|
||
import pyperclip | ||
import rclpy | ||
import rclpy.executors | ||
from rclpy.node import Node | ||
from tier4_debug_msgs.msg import ProcessingTimeTree as ProcessingTimeTreeMsg | ||
|
||
from .print_tree import print_trees | ||
from .topic_selector import select_topic | ||
from .tree import ProcessingTimeTree | ||
from .utils import exit_curses | ||
from .utils import init_curses | ||
|
||
|
||
class ProcessingTimeVisualizer(Node): | ||
def __init__(self): | ||
super().__init__("processing_time_visualizer" + str(uuid.uuid4()).replace("-", "_")) | ||
self.subscriber = self.subscribe_processing_time_tree() | ||
self.trees: Dict[str, ProcessingTimeTree] = {} | ||
self.worst_case_tree: Dict[str, ProcessingTimeTree] = {} | ||
self.stdcscr = init_curses() | ||
Check warning on line 25 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py
|
||
self.show_comment = False | ||
print_trees("🌲 Processing Time Tree 🌲", self.topic_name, self.trees, self.stdcscr) | ||
Check warning on line 27 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py
|
||
|
||
self.create_timer(0.1, self.update_screen) | ||
|
||
def subscribe_processing_time_tree(self): | ||
topics = [] | ||
|
||
s = time.time() | ||
while True: | ||
for topic_name, topic_types in self.get_topic_names_and_types(): | ||
for topic_type in topic_types: | ||
if ( | ||
topic_type == "tier4_debug_msgs/msg/ProcessingTimeTree" | ||
and topic_name not in topics | ||
): | ||
topics.append(topic_name) | ||
|
||
if time.time() - s > 1.0: | ||
break | ||
|
||
if len(topics) == 0: | ||
self.get_logger().info("No ProcessingTimeTree topic found") | ||
self.get_logger().info("Exiting...") | ||
exit(1) | ||
else: | ||
self.topic_name = curses.wrapper(select_topic, topics) | ||
subscriber = self.create_subscription( | ||
ProcessingTimeTreeMsg, | ||
self.topic_name, | ||
self.callback, | ||
10, | ||
) | ||
|
||
return subscriber | ||
|
||
def update_screen(self): | ||
key = self.stdcscr.getch() | ||
Check warning on line 63 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py
|
||
|
||
self.show_comment = not self.show_comment if key == ord("c") else self.show_comment | ||
logs = print_trees( | ||
"🌲 Processing Time Tree 🌲", | ||
self.topic_name, | ||
self.trees.values(), | ||
self.stdcscr, | ||
Check warning on line 70 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py
|
||
self.show_comment, | ||
) | ||
if key == ord("y"): | ||
pyperclip.copy(logs) | ||
if key == ord("q"): | ||
raise KeyboardInterrupt | ||
|
||
def callback(self, msg: ProcessingTimeTreeMsg): | ||
tree = ProcessingTimeTree.from_msg(msg) | ||
self.trees[tree.name] = tree | ||
if tree.name not in self.worst_case_tree: | ||
self.worst_case_tree[tree.name] = tree | ||
else: | ||
self.worst_case_tree[tree.name] = ( | ||
tree | ||
if tree.processing_time > self.worst_case_tree[tree.name].processing_time | ||
else self.worst_case_tree[tree.name] | ||
) | ||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
try: | ||
node = ProcessingTimeVisualizer() | ||
except KeyboardInterrupt: | ||
exit_curses() | ||
return | ||
try: | ||
rclpy.spin(node) | ||
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): | ||
node.destroy_node() | ||
exit_curses() | ||
if len(node.worst_case_tree) == 0: | ||
exit(1) | ||
print("⏰ Worst Case Execution Time ⏰") | ||
for tree in node.worst_case_tree.values(): | ||
print(tree, end=None) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import curses | ||
from itertools import chain | ||
from typing import List | ||
|
||
from .tree import ProcessingTimeTree | ||
from .utils import abbreviate_topic | ||
from .utils import wrap_lines | ||
|
||
|
||
def print_trees( | ||
prefix: str, | ||
topic_name: str, | ||
trees: List[ProcessingTimeTree], | ||
stdscr: curses.window, | ||
show_comment: bool = False, | ||
): | ||
stdscr.clear() | ||
height, width = stdscr.getmaxyx() | ||
stdscr.addstr(0, 0, prefix[: width - 2], curses.color_pair(2)) | ||
topic_showing = (abbreviate_topic(topic_name) if len(topic_name) > width else topic_name)[ | ||
: width - 2 | ||
] | ||
stdscr.addstr(1, 0, topic_showing, curses.color_pair(1)) | ||
tree_lines = list(chain.from_iterable(tree.to_lines(show_comment) + [""] for tree in trees)) | ||
tree_lines = wrap_lines(tree_lines, width, height - 2) | ||
for i, line in enumerate(tree_lines): | ||
stdscr.addstr(i + 2, 1, line) | ||
stdscr.addstr(height - 1, 0, "'q' => quit. 'c' => show comment. 'y' => copy."[: width - 2]) | ||
stdscr.refresh() | ||
|
||
return "".join([line + "\n" for line in tree_lines]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import curses | ||
from typing import List | ||
from typing import Optional | ||
|
||
from .utils import abbreviate_topic | ||
from .utils import wrap_topic_name | ||
|
||
|
||
def select_topic(stdscr: curses.window, topics: List[str]) -> Optional[str]: | ||
curses.curs_set(0) # Hide the cursor | ||
curses.start_color() # Enable color support | ||
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE) # Define color pair | ||
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) # Define red color pair | ||
|
||
current_topic = 0 | ||
start_index = 0 | ||
max_topics = 8 | ||
|
||
while True: | ||
stdscr.clear() | ||
height, width = stdscr.getmaxyx() | ||
|
||
# Check if the terminal window is too small | ||
if ( | ||
width < max(len(abbreviate_topic(topic)) for topic in topics) + 2 | ||
or height < max_topics + 2 | ||
): | ||
error_msg = "Terminal window too small. Please resize." | ||
stdscr.addstr(height // 2, width // 2 - len(error_msg) // 2, error_msg) | ||
stdscr.refresh() | ||
key = stdscr.getch() | ||
if key in [ord("q"), ord("Q")]: | ||
return None | ||
continue | ||
|
||
# Display the full selected topic in red at the top, with wrapping if necessary | ||
full_topic = topics[current_topic] | ||
lines = wrap_topic_name(full_topic, width - 2) | ||
|
||
for i, line in enumerate(lines): | ||
stdscr.attron(curses.color_pair(2)) | ||
Check warning on line 41 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py
|
||
stdscr.addstr(i, 1, line) | ||
stdscr.attroff(curses.color_pair(2)) | ||
Check warning on line 43 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py
|
||
|
||
# Display the topics | ||
for idx in range(start_index, min(start_index + max_topics, len(topics))): | ||
abbreviated_option = abbreviate_topic(topics[idx])[: width - 2] # Truncate if necessary | ||
x = width // 2 - len(abbreviated_option) // 2 | ||
y = height // 2 - max_topics // 2 + idx - start_index + len(lines) | ||
if idx == current_topic: | ||
stdscr.attron(curses.color_pair(1)) | ||
Check warning on line 51 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py
|
||
stdscr.addstr(y, x, abbreviated_option) | ||
stdscr.attroff(curses.color_pair(1)) | ||
Check warning on line 53 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py
|
||
else: | ||
stdscr.addstr(y, x, abbreviated_option) | ||
|
||
# Display navigation buttons if needed | ||
if start_index + max_topics < len(topics): | ||
string = "Next>" | ||
stdscr.addstr(height - 1, width - len(string) - 1, string) | ||
if start_index > 0: | ||
string = "<Prev" | ||
stdscr.addstr(height - 1, 0, string) | ||
|
||
stdscr.refresh() | ||
|
||
# Handle user input | ||
key = stdscr.getch() | ||
if key == curses.KEY_UP and current_topic > 0: | ||
current_topic -= 1 | ||
if current_topic < start_index: | ||
start_index -= 1 | ||
elif key == curses.KEY_DOWN and current_topic < len(topics) - 1: | ||
current_topic += 1 | ||
if current_topic >= start_index + max_topics: | ||
start_index += 1 | ||
elif key in [curses.KEY_ENTER, 10, 13]: | ||
return topics[current_topic] | ||
elif key == curses.KEY_RIGHT and start_index + max_topics < len(topics): | ||
start_index += max_topics | ||
current_topic = start_index | ||
elif key == curses.KEY_LEFT and start_index > 0: | ||
start_index -= max_topics | ||
current_topic = start_index |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from typing import Dict | ||
|
||
from tier4_debug_msgs.msg import ProcessingTimeTree as ProcessingTimeTreeMsg | ||
|
||
|
||
class ProcessingTimeTree: | ||
def __init__( | ||
self, | ||
name: str = "", | ||
processing_time: float = 0.0, | ||
comment: str = "", | ||
id: int = 1, # noqa | ||
parent_id: int = 0, | ||
): | ||
self.name = name | ||
self.processing_time = processing_time | ||
self.comment = comment | ||
self.id = id | ||
self.parent_id = parent_id | ||
self.children = [] | ||
|
||
@classmethod | ||
def from_msg(cls, msg: ProcessingTimeTreeMsg) -> "ProcessingTimeTree": | ||
# Create a dictionary to map node IDs to ProcessingTimeTree objects | ||
node_dict: Dict[int, ProcessingTimeTree] = { | ||
node.id: ProcessingTimeTree( | ||
node.name, node.processing_time, node.comment, node.id, node.parent_id | ||
) | ||
for node in msg.nodes | ||
} | ||
|
||
# Build the tree structure | ||
root = node_dict[1] | ||
for node in list(node_dict.values()): | ||
parent = node_dict.get(node.parent_id) | ||
if parent: | ||
parent.children.append(node) | ||
|
||
return root | ||
|
||
def to_lines(self, show_comment: bool = True) -> str: | ||
def construct_string( | ||
node: "ProcessingTimeTree", | ||
lines: list, | ||
prefix: str, | ||
is_last: bool, | ||
is_root: bool, | ||
) -> None: | ||
# If not the root, append the prefix and the node information | ||
line = "" | ||
if not is_root: | ||
line += prefix + ("└── " if is_last else "├── ") | ||
line += f"{node.name}: {node.processing_time:.2f} [ms]" | ||
line += f": {node.comment}" if show_comment and node.comment else "" | ||
lines.append(line) | ||
# Recur for each child node | ||
for i, child in enumerate(node.children): | ||
construct_string( | ||
child, | ||
lines, | ||
prefix + (" " if is_last else "│ "), | ||
i == len(node.children) - 1, | ||
False, | ||
) | ||
|
||
lines = [] | ||
# Start the recursive string construction with the root node | ||
construct_string(self, lines, "", True, True) | ||
return lines | ||
|
||
def __str__(self) -> str: | ||
return "".join([line + "\n" for line in self.to_lines()]) | ||
|
||
def __eq__(self, other: "ProcessingTimeTree") -> bool: | ||
return self.name == other.name |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import curses | ||
from typing import List | ||
|
||
|
||
def abbreviate_topic(topic: str) -> str: | ||
parts = topic.split("/") | ||
abbreviated_parts = [part[0] if len(part) > 1 else part for part in parts[:-1]] | ||
return "/".join(abbreviated_parts + [parts[-1]]) | ||
|
||
|
||
def wrap_topic_name(text: str, width: int) -> List[str]: | ||
lines = [] | ||
while len(text) > width: | ||
split_point = text.rfind("/", 0, width) | ||
if split_point == -1: | ||
split_point = width | ||
lines.append(text[:split_point]) | ||
text = text[split_point:] | ||
lines.append(text) | ||
return lines | ||
|
||
|
||
def wrap_lines(lines, width, height): | ||
return [line[:width] for line in lines][:height] | ||
|
||
|
||
def exit_curses(): | ||
curses.echo() | ||
curses.nocbreak() | ||
Check warning on line 29 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/utils.py
|
||
curses.endwin() | ||
Check warning on line 30 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/utils.py
|
||
|
||
|
||
def init_curses() -> curses.window: | ||
stdscr = curses.initscr() | ||
stdscr.nodelay(True) | ||
curses.noecho() | ||
curses.cbreak() | ||
stdscr.keypad(True) | ||
curses.mousemask(curses.ALL_MOUSE_EVENTS) | ||
curses.start_color() | ||
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) | ||
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_WHITE) | ||
return stdscr |