diff --git a/src/holosoma_inference/docker/run.sh b/src/holosoma_inference/docker/run.sh old mode 100644 new mode 100755 index 4b459b25..4312a259 --- a/src/holosoma_inference/docker/run.sh +++ b/src/holosoma_inference/docker/run.sh @@ -17,6 +17,11 @@ PROJECT_ROOT="$( cd "$SCRIPT_DIR/../../.." && pwd )" CONTAINER_NAME="holosoma-inference-container" IMAGE_NAME="holosoma-inference" +# Mount bash history in the host filesystem in order to preserve history between containers +HOLOSOMA_DEPS_DIR="$HOME/.holosoma_deps" +mkdir -p "$HOLOSOMA_DEPS_DIR" +touch "$HOLOSOMA_DEPS_DIR/bash_history" + # Function to create a new container create_container() { local mounts=( @@ -25,6 +30,8 @@ create_container() { -v /tmp/.X11-unix:/tmp/.X11-unix -v $HOME/.Xauthority:/root/.Xauthority:ro -v /dev/shm:/dev/shm + -v /dev/input:/dev/input + -v "$HOLOSOMA_DEPS_DIR/bash_history":/root/.bash_history ) [[ -d "$EXT_DIR" ]] && mounts+=(-v "$EXT_DIR":/workspace/holosoma-extension) # optionally mount extension repo diff --git a/src/holosoma_inference/holosoma_inference/config/config_values/inference.py b/src/holosoma_inference/holosoma_inference/config/config_values/inference.py index 8d579724..0d144ba5 100644 --- a/src/holosoma_inference/holosoma_inference/config/config_values/inference.py +++ b/src/holosoma_inference/holosoma_inference/config/config_values/inference.py @@ -52,19 +52,56 @@ task=task.wbt, ) +# Core defaults - no extension imports at module load time DEFAULTS = { "g1-29dof-loco": g1_29dof_loco, "t1-29dof-loco": t1_29dof_loco, "g1-29dof-wbt": g1_29dof_wbt, } -# Auto-discover inference configs from installed extensions -for ep in entry_points(group="holosoma.config.inference"): - DEFAULTS[ep.name] = ep.load() +# Track whether extensions have been loaded +_extensions_loaded = False -AnnotatedInferenceConfig = Annotated[ - InferenceConfig, - tyro.conf.arg( - constructor=tyro.extras.subcommand_type_from_defaults({f"inference:{k}": v for k, v in DEFAULTS.items()}) + +def _load_extensions() -> None: + """Lazily load extension configs from entry points. + + This is deferred to avoid circular imports when extensions import + from holosoma_inference.config at module load time. + """ + global _extensions_loaded # noqa: PLW0603 + if _extensions_loaded: + return + _extensions_loaded = True + for ep in entry_points(group="holosoma.config.inference"): + DEFAULTS[ep.name] = ep.load() + + +def get_annotated_inference_config() -> type: + """Build the annotated InferenceConfig type with all discovered configs. + + This function loads extension configs lazily and returns a tyro-compatible + annotated type for CLI subcommand generation. + + Returns: + Annotated type suitable for use with tyro.cli() + """ + _load_extensions() + return Annotated[ + InferenceConfig, + tyro.conf.arg( + constructor=tyro.extras.subcommand_type_from_defaults( + {f"inference:{k}": v for k, v in DEFAULTS.items()} + ) ), -] + ] + + +def get_defaults() -> dict: + """Get all inference config defaults, including extensions. + + Returns: + Dictionary mapping config names to InferenceConfig instances. + """ + _load_extensions() + return DEFAULTS diff --git a/src/holosoma_inference/holosoma_inference/config/config_values/robot.py b/src/holosoma_inference/holosoma_inference/config/config_values/robot.py index 124e68c0..68252a09 100644 --- a/src/holosoma_inference/holosoma_inference/config/config_values/robot.py +++ b/src/holosoma_inference/holosoma_inference/config/config_values/robot.py @@ -192,6 +192,7 @@ # Default Configurations Dictionary # ============================================================================= +# Core defaults - no extension imports at module load time DEFAULTS = { "g1-29dof": g1_29dof, "t1-29dof": t1_29dof, @@ -201,6 +202,29 @@ Keys use hyphen-case naming convention for CLI compatibility. """ -# Auto-discover robot configs from installed extensions -for ep in entry_points(group="holosoma.config.robot"): - DEFAULTS[ep.name] = ep.load() +# Track whether extensions have been loaded +_extensions_loaded = False + + +def _load_extensions() -> None: + """Lazily load extension configs from entry points. + + This is deferred to avoid circular imports when extensions import + from holosoma_inference.config at module load time. + """ + global _extensions_loaded # noqa: PLW0603 + if _extensions_loaded: + return + _extensions_loaded = True + for ep in entry_points(group="holosoma.config.robot"): + DEFAULTS[ep.name] = ep.load() + + +def get_defaults() -> dict: + """Get all robot config defaults, including extensions. + + Returns: + Dictionary mapping config names to RobotConfig instances. + """ + _load_extensions() + return DEFAULTS diff --git a/src/holosoma_inference/holosoma_inference/policies/wbt.py b/src/holosoma_inference/holosoma_inference/policies/wbt.py index 25920b71..5111cc17 100644 --- a/src/holosoma_inference/holosoma_inference/policies/wbt.py +++ b/src/holosoma_inference/holosoma_inference/policies/wbt.py @@ -251,6 +251,9 @@ def rl_inference(self, robot_state_data): self.curr_motion_timestep = self.timestep_util.timestep obs = self.prepare_obs_for_rl(robot_state_data) + if self.config.task.print_observations: + self._print_observations(obs) + input_feed = {"time_step": np.array([[self.curr_motion_timestep]], dtype=np.float32), "obs": obs["actor_obs"]} policy_action, self.motion_command_t, self.ref_quat_xyzw_t = self.policy(input_feed) diff --git a/src/holosoma_inference/holosoma_inference/run_policy.py b/src/holosoma_inference/holosoma_inference/run_policy.py index 093ba6b0..80ba0fff 100644 --- a/src/holosoma_inference/holosoma_inference/run_policy.py +++ b/src/holosoma_inference/holosoma_inference/run_policy.py @@ -17,7 +17,7 @@ from loguru import logger from holosoma_inference.config.config_types.inference import InferenceConfig -from holosoma_inference.config.config_values.inference import AnnotatedInferenceConfig +from holosoma_inference.config.config_values.inference import get_annotated_inference_config from holosoma_inference.config.utils import TYRO_CONFIG from holosoma_inference.policies.locomotion import LocomotionPolicy from holosoma_inference.policies.wbt import WholeBodyTrackingPolicy @@ -120,7 +120,8 @@ def run_policy(config: InferenceConfig): def main(annotated_config=None): """Main entry point. Extensions can pass their own AnnotatedInferenceConfig.""" if annotated_config is None: - annotated_config = AnnotatedInferenceConfig + # Use factory function to lazily load extension configs + annotated_config = get_annotated_inference_config() config = tyro.cli(annotated_config, config=TYRO_CONFIG) run_policy(config) diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/__init__.py deleted file mode 100644 index 7b35e985..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ -from holosoma_inference.config.config_types.robot import RobotConfig - -from .base import BasicCommandSender - - -def create_command_sender(config: RobotConfig, lcm=None): - """ - Factory function to create the appropriate command sender based on configuration. - - Args: - config: Robot configuration dictionary - lcm: LCM instance (optional, for LCM-based senders) - - Returns: - An instance of the appropriate command sender class - """ - sdk_type = config.sdk_type - - if sdk_type == "unitree": - from .unitree import UnitreeCommandSender - - return UnitreeCommandSender(config, lcm) - if sdk_type == "booster": - from .booster import BoosterCommandSender - - return BoosterCommandSender(config, lcm) - raise ValueError(f"Unsupported SDK type: {sdk_type}") - - -# For backward compatibility -CommandSender = create_command_sender - -__all__ = [ - "BasicCommandSender", - "BoosterCommandSender", - "CommandSender", # Backward compatibility - "UnitreeCommandSender", - "create_command_sender", -] diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/base/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/base/__init__.py deleted file mode 100644 index 1a850457..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/base/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -Base Command Sender module. - -This module contains the base classes for command sending functionality. -""" - -from .basic_command_sender import BasicCommandSender - -__all__ = ["BasicCommandSender"] diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/base/basic_command_sender.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/base/basic_command_sender.py deleted file mode 100644 index 1f85fd6f..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/base/basic_command_sender.py +++ /dev/null @@ -1,121 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod - -from holosoma_inference.config.config_types.robot import RobotConfig - - -def _get_config_value(config: RobotConfig | dict, key: str, default=None): - """ - Util function to temporarily support getting valeus from both `RobotConfig` and old dict-based configs. - - TODO: Force dataclasses-based access once `run_sim.py` is moved to `holosoma`. - """ - - # Try dataclass attribute access first - if hasattr(config, key): - return getattr(config, key) - # Fall back to hydra - if isinstance(config, dict): - return config.get(key, default) - raise RuntimeError(f"Unsupported type {type(config)}") - - -class BasicCommandSender(ABC): - """Abstract base class for command sender implementations.""" - - def __init__(self, config: RobotConfig | dict, lcm=None): - self.lcm = lcm - self.config = config - self.sdk_type = _get_config_value(config, "sdk_type", "unitree") - self.motor_type = _get_config_value(config, "motor_type", "serial") - - # Initialize control gains - self.kp_level = 1.0 - self.kd_level = 1.0 - self.waist_kp_level = 1.0 - - # Initialize weak motor joint index - self.weak_motor_joint_index = [] - weak_motor_cfg = _get_config_value(self.config, "weak_motor_joint_index") - if weak_motor_cfg: - for value in self.robot.WeakMotorJointIndex.values(): - self.weak_motor_joint_index.append(value) - - self.no_action = 0 - - # Initialize SDK-specific components - self._init_sdk_components() - - @abstractmethod - def _init_sdk_components(self): - """Initialize SDK-specific components. Must be implemented by subclasses.""" - - @abstractmethod - def send_command(self, cmd_q, cmd_dq, cmd_tau, dof_pos_latest=None, kp_override=None, kd_override=None): - """Send command to robot. Must be implemented by subclasses.""" - - def is_weak_motor(self, motor_index): - """Check if a motor is a weak motor.""" - return motor_index in self.weak_motor_joint_index - - def _set_motor_command( - self, - motor_cmd, - motor_id, - joint_id, - cmd_q, - cmd_dq, - cmd_tau, - motor_kp, - motor_kd, - kp_override=None, - kd_override=None, - ): - """Set motor command for a specific motor.""" - default_q = _get_config_value(self.config, "default_motor_angles") - - if joint_id == -1 or self.no_action: - motor_cmd.q = default_q[motor_id] - motor_cmd.dq = 0.0 - motor_cmd.tau = 0.0 - motor_cmd.kp = 0.0 - motor_cmd.kd = 0.0 - else: - motor_cmd.q = cmd_q[joint_id] - motor_cmd.dq = cmd_dq[joint_id] - motor_cmd.tau = cmd_tau[joint_id] - - kp_value = motor_kp[motor_id] - kd_value = motor_kd[motor_id] - if kp_override is not None and joint_id != -1: - kp_value = kp_override[joint_id] - if kd_override is not None and joint_id != -1: - kd_value = kd_override[joint_id] - - motor_cmd.kp = kp_value * self.kp_level - motor_cmd.kd = kd_value * self.kd_level - - def _fill_motor_commands(self, motor_cmd, cmd_q, cmd_dq, cmd_tau, kp_override=None, kd_override=None): - """Fill motor commands for all motors.""" - motor2joint = _get_config_value(self.config, "motor2joint") - num_motors = _get_config_value(self.config, "num_motors") - motor_kp = _get_config_value(self.config, "motor_kp") - motor_kd = _get_config_value(self.config, "motor_kd") - - for i in range(num_motors): - m_id = i - j_id = motor2joint[i] - cmd = motor_cmd[m_id] - self._set_motor_command( - cmd, - m_id, - j_id, - cmd_q, - cmd_dq, - cmd_tau, - motor_kp, - motor_kd, - kp_override=kp_override, - kd_override=kd_override, - ) diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/__init__.py deleted file mode 100644 index f563eaf7..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Booster command sender implementation. -""" - -from .booster_command_sender import BoosterCommandSender - -__all__ = ["BoosterCommandSender"] diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/booster_command_sender.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/booster_command_sender.py deleted file mode 100644 index 7452954a..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/booster_command_sender.py +++ /dev/null @@ -1,106 +0,0 @@ -from booster_robotics_sdk import ( - B1LocoClient, - B1LowCmdPublisher, - LowCmd, - LowCmdType, - MotorCmd, - RobotMode, -) - -from holosoma_inference.config.config_types.robot import RobotConfig - -from ..base import BasicCommandSender # noqa: TID252 - - -class BoosterCommandSender(BasicCommandSender): - """Booster command sender implementation.""" - - def _init_sdk_components(self): - """Initialize Booster SDK-specific components.""" - - robot_type = self.config.robot_type - - if robot_type in ["t1_23dof", "t1_29dof"]: - self.LowCmd = LowCmd - self.LowCmdType = LowCmdType - self.MotorCmd = MotorCmd - self.lowcmd_publisher_ = B1LowCmdPublisher() - self.client = B1LocoClient() - self.lowcmd_publisher_.InitChannel() - self.client.Init() - self.init_booster_low_cmd() - self.create_prepare_cmd(self.low_cmd, self.config) - self._send_cmd(self.low_cmd) - self.client.ChangeMode(RobotMode.kCustom) - self.dof_names = self.config.dof_names - self.dof_names_parallel_mech = self.config.dof_names_parallel_mech - self.parallel_mech_indexes = [self.dof_names.index(name) for name in self.dof_names_parallel_mech] - else: - raise NotImplementedError(f"Robot type {robot_type} is not supported yet") - - def init_booster_low_cmd(self): - """Initialize Booster low-level command.""" - self.low_cmd = self.LowCmd() - if self.motor_type == "serial": - self.low_cmd.cmd_type = self.LowCmdType.SERIAL - elif self.motor_type == "parallel": - self.low_cmd.cmd_type = self.LowCmdType.PARALLEL - self.motor_cmds = [self.MotorCmd() for _ in range(self.config.num_motors)] - self.low_cmd.motor_cmd = self.motor_cmds - - def send_command(self, cmd_q, cmd_dq, cmd_tau, dof_pos_latest=None, kp_override=None, kd_override=None): - """Send command to Booster robot.""" - # In booster, we need to fill the motor_cmds first - self.low_cmd = self.LowCmd() - if self.motor_type == "serial": - self.low_cmd.cmd_type = self.LowCmdType.SERIAL - elif self.motor_type == "parallel": - self.low_cmd.cmd_type = self.LowCmdType.PARALLEL - else: - raise NotImplementedError(f"Motor type {self.motor_type} is not supported yet") - self.low_cmd.motor_cmd = self.motor_cmds - - motor_cmd = self.low_cmd.motor_cmd - self._fill_motor_commands( - motor_cmd, - cmd_q, - cmd_dq, - cmd_tau, - kp_override=kp_override, - kd_override=kd_override, - ) - - # Send command - self.lowcmd_publisher_.Write(self.low_cmd) - - def _send_cmd(self, cmd): - """Send command to robot.""" - self.lowcmd_publisher_.Write(cmd) - - def init_cmd_t1(self, low_cmd): - """Initialize T1 command.""" - low_cmd.cmd_type = self.LowCmdType.SERIAL - motorCmds = [self.MotorCmd() for _ in range(self.config.num_motors)] - low_cmd.motor_cmd = motorCmds - - num_motors = min(len(motorCmds), self.config.num_motors) - for i in range(num_motors): - low_cmd.motor_cmd[i].q = 0.0 - low_cmd.motor_cmd[i].dq = 0.0 - low_cmd.motor_cmd[i].tau = 0.0 - low_cmd.motor_cmd[i].kp = 0.0 - low_cmd.motor_cmd[i].kd = 0.0 - # weight is not effective in custom mode - low_cmd.motor_cmd[i].weight = 0.0 - - def create_prepare_cmd(self, low_cmd, cfg: RobotConfig): - """Create prepare command for T1.""" - self.init_cmd_t1(low_cmd) - # Use motor_kp, motor_kd, and default_motor_angles from RobotConfig - # Note: motor_kp and motor_kd may be None during initialization (loaded from ONNX later) - num_motors = min(len(low_cmd.motor_cmd), cfg.num_motors) - for i in range(num_motors): - low_cmd.motor_cmd[i].kp = cfg.motor_kp[i] if cfg.motor_kp is not None else 0.0 - low_cmd.motor_cmd[i].kd = cfg.motor_kd[i] if cfg.motor_kd is not None else 0.0 - low_cmd.motor_cmd[i].q = cfg.default_motor_angles[i] - return low_cmd diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/joystick_message.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/joystick_message.py deleted file mode 100644 index 8f0e9c70..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/joystick_message.py +++ /dev/null @@ -1,25 +0,0 @@ -class BoosterJoystickMessage: - """Simple message class to provide unified interface for booster joystick data.""" - - def __init__(self, remote_control_service): - self.remote_control_service = remote_control_service - - @property - def lx(self): - """Left stick X axis (lateral movement).""" - return self.remote_control_service.lx - - @property - def ly(self): - """Left stick Y axis (forward/backward movement).""" - return self.remote_control_service.ly - - @property - def rx(self): - """Right stick X axis (yaw rotation).""" - return self.remote_control_service.rx - - @property - def keys(self): - """Button states as integer bitmask.""" - return self.remote_control_service.keys diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/remote_control_service.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/remote_control_service.py deleted file mode 100644 index 57c6bec5..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/booster/remote_control_service.py +++ /dev/null @@ -1,311 +0,0 @@ -from __future__ import annotations - -import threading -import time -from dataclasses import dataclass - -import evdev - - -@dataclass -class JoystickConfig: - max_vx: float = 0.8 - max_vy: float = 0.5 - max_vyaw: float = 0.5 - control_threshold: float = 0.1 - # logitech - left and right analog sticks - left_x_axis: evdev.ecodes = evdev.ecodes.ABS_X # Left stick X (left/right) - left_y_axis: evdev.ecodes = evdev.ecodes.ABS_Y # Left stick Y (forward/back) - right_x_axis: evdev.ecodes = evdev.ecodes.ABS_RX # Right stick X (yaw rotation) - right_y_axis: evdev.ecodes = evdev.ecodes.ABS_RY # Right stick Y (not used currently) - - # beitong - # left_x_axis: evdev.ecodes = evdev.ecodes.ABS_X - # left_y_axis: evdev.ecodes = evdev.ecodes.ABS_Y - # right_x_axis: evdev.ecodes = evdev.ecodes.ABS_RX - # right_y_axis: evdev.ecodes = evdev.ecodes.ABS_RY - - -class BoosterRemoteControlService: - """Service for handling joystick remote control input for Booster robots.""" - - def __init__(self, config: JoystickConfig | None = None): - """Initialize remote control service with optional configuration.""" - self.config = config or JoystickConfig() - self._lock = threading.Lock() - self._running = True - - self.vx = 0.0 - self.vy = 0.0 - self.vyaw = 0.0 - self.lx = 0.0 - self.ly = 0.0 - self.rx = 0.0 - self.keys = 0 - - try: - self._init_joystick() - self._start_joystick_thread() - except Exception as e: - print(f"Failed to initialize joystick: {e}") - self.joystick = None - self.joystick_runner = None - - def _init_joystick(self) -> None: - """Initialize and validate joystick connection using evdev.""" - devices = [evdev.InputDevice(path) for path in evdev.list_devices()] - joystick = None - - for device in devices: - caps = device.capabilities() - # Check for both absolute axes and keys - if evdev.ecodes.EV_ABS in caps and evdev.ecodes.EV_KEY in caps: - abs_info = caps.get(evdev.ecodes.EV_ABS, []) - # Look for typical gamepad axes - axes = [code for (code, info) in abs_info] - if all( - code in axes - for code in [self.config.left_x_axis, self.config.left_y_axis, self.config.right_x_axis] - ): - absinfo = {code: info for code, info in abs_info} - self.axis_ranges = { - self.config.left_x_axis: absinfo[self.config.left_x_axis], - self.config.left_y_axis: absinfo[self.config.left_y_axis], - self.config.right_x_axis: absinfo[self.config.right_x_axis], - self.config.right_y_axis: absinfo[self.config.right_y_axis], - } - print(f"Found suitable joystick: {device.name}") - joystick = device - break - - if not joystick: - raise RuntimeError("No suitable joystick found") - - self.joystick = joystick - print(f"Selected joystick: {joystick.name}") - - def _start_joystick_thread(self): - """Start joystick polling thread.""" - self.joystick_runner = threading.Thread(target=self._run_joystick) - self.joystick_runner.daemon = True - self.joystick_runner.start() - - def _run_joystick(self): - """Poll joystick events.""" - while self._running: - if not self._process_events(): - break - - def _process_events(self): - """Process joystick events.""" - try: - for event in self.joystick.read_loop(): - if event.type == evdev.ecodes.EV_ABS: - # Handle axis events - self._handle_axis(event.code, event.value) - elif event.type == evdev.ecodes.EV_KEY: - # Handle button events - self._handle_button(event.code, event.value) - return True - except BlockingIOError: - # No events available - time.sleep(0.01) - return True - except Exception: - return False - - def _handle_axis(self, code: int, value: int): - """Handle axis events.""" - try: - # Left stick - movement - if code == self.config.left_y_axis: # Left stick Y (forward/back) - self.vx = self._scale(value, self.config.max_vx, self.config.control_threshold, code) - self.ly = self.vx # Map for compatibility with unitree interface - elif code == self.config.left_x_axis: # Left stick X (left/right) - self.vy = self._scale(value, self.config.max_vy, self.config.control_threshold, code) - self.lx = -self.vy # Map for compatibility with unitree interface (inverted) - # Right stick - rotation - elif code == self.config.right_x_axis: # Right stick X (yaw rotation) - self.vyaw = self._scale(value, self.config.max_vyaw, self.config.control_threshold, code) - self.rx = -self.vyaw # Map for compatibility with unitree interface (inverted) - elif code == self.config.right_y_axis: # Right stick Y (not used currently) - pass # Could be used for pitch control in the future - # D-pad handling via HAT axes (most common for controllers) - elif code == evdev.ecodes.ABS_HAT0X: # D-pad horizontal - self._handle_dpad_axis("horizontal", value) - elif code == evdev.ecodes.ABS_HAT0Y: # D-pad vertical - self._handle_dpad_axis("vertical", value) - # Analog triggers - elif code == evdev.ecodes.ABS_RZ: # Right trigger (R2) - self._handle_analog_trigger("R2", value) - elif code == evdev.ecodes.ABS_Z: # Left trigger (L2) - some controllers - self._handle_analog_trigger("L2", value) - else: - pass - except Exception: - raise - - def _handle_button(self, code: int, value: int): - """Handle button events.""" - # Track individual button states - if not hasattr(self, "_button_states"): - self._button_states = {} - - if value == 1: # Button pressed - self._button_states[code] = True - elif value == 0: # Button released - self._button_states[code] = False - - # Calculate combined keys value based on button combinations - self.keys = self._calculate_keys_value() - - def _handle_dpad_axis(self, direction: str, value: int): - """Handle D-pad input via HAT axes.""" - # Track D-pad states - if not hasattr(self, "_dpad_states"): - self._dpad_states = {} - - if direction == "horizontal": - # HAT0X: -1 = left, 0 = center, 1 = right - self._dpad_states["left"] = value == -1 - self._dpad_states["right"] = value == 1 - if value == 0: # Released - self._dpad_states["left"] = False - self._dpad_states["right"] = False - elif direction == "vertical": - # HAT0Y: -1 = up, 0 = center, 1 = down - self._dpad_states["up"] = value == -1 - self._dpad_states["down"] = value == 1 - if value == 0: # Released - self._dpad_states["up"] = False - self._dpad_states["down"] = False - - # Update keys value - self.keys = self._calculate_keys_value() - - def _handle_analog_trigger(self, trigger_name: str, value: int): - """Handle analog trigger as button press.""" - # Track trigger states - if not hasattr(self, "_trigger_states"): - self._trigger_states = {} - - # Treat trigger as pressed if value is above threshold (usually > 128 for 8-bit triggers) - threshold = 128 - is_pressed = value > threshold - - self._trigger_states[trigger_name] = is_pressed - - # Update keys value - self.keys = self._calculate_keys_value() - - def _calculate_keys_value(self): - """Calculate keys value based on current button states to match unitree mapping.""" - keys_value = 0 - - # Check button states - if hasattr(self, "_button_states"): - button_states = self._button_states - - # Map common buttons to unitree equivalents - # BTN_A -> A (256), BTN_B -> B (512), BTN_X -> X (1024), BTN_Y -> Y (2048) - # BTN_START -> start (4), BTN_SELECT -> select (8) - # BTN_TR -> R1 (1), BTN_TL -> L1 (2), BTN_TR2 -> R2 (16), BTN_TL2 -> L2 (32) - - if button_states.get(evdev.ecodes.BTN_A, False): - keys_value |= 256 # A - print("A pressed") - if button_states.get(evdev.ecodes.BTN_B, False): - keys_value |= 512 # B - print("B pressed") - if button_states.get(evdev.ecodes.BTN_X, False): - keys_value |= 1024 # X - print("X pressed") - if button_states.get(evdev.ecodes.BTN_Y, False): - keys_value |= 2048 # Y - print("Y pressed") - if button_states.get(evdev.ecodes.BTN_START, False): - keys_value |= 4 # start - print("Start pressed") - if button_states.get(evdev.ecodes.BTN_SELECT, False): - keys_value |= 8 # select - print("Select pressed") - if button_states.get(evdev.ecodes.BTN_TR, False): - keys_value |= 1 # R1 - print("R1 pressed") - if button_states.get(evdev.ecodes.BTN_TL, False): - keys_value |= 2 # L1 - print("L1 pressed") - if button_states.get(evdev.ecodes.BTN_TR2, False): - keys_value |= 16 # R2 - print("R2 pressed") - if button_states.get(evdev.ecodes.BTN_TL2, False): - keys_value |= 32 # L2 - print("L2 pressed") - - # Add D-pad support - if button_states.get(evdev.ecodes.BTN_DPAD_UP, False): - keys_value |= 4096 # up - print("Up pressed") - if button_states.get(evdev.ecodes.BTN_DPAD_DOWN, False): - keys_value |= 16384 # down - print("Down pressed") - if button_states.get(evdev.ecodes.BTN_DPAD_LEFT, False): - keys_value |= 32768 # left - print("Left pressed") - if button_states.get(evdev.ecodes.BTN_DPAD_RIGHT, False): - keys_value |= 8192 # right - print("Right pressed") - - # Check D-pad states from HAT axes - if hasattr(self, "_dpad_states"): - dpad_states = self._dpad_states - if dpad_states.get("up", False): - keys_value |= 4096 # up - print("D-pad Up pressed") - if dpad_states.get("down", False): - keys_value |= 16384 # down - print("D-pad Down pressed") - if dpad_states.get("left", False): - keys_value |= 32768 # left - print("D-pad Left pressed") - if dpad_states.get("right", False): - keys_value |= 8192 # right - print("D-pad Right pressed") - - # Check analog trigger states - if hasattr(self, "_trigger_states"): - trigger_states = self._trigger_states - if trigger_states.get("R2", False): - keys_value |= 16 # R2 - print("R2 pressed") - if trigger_states.get("L2", False): - keys_value |= 32 # L2 - print("L2 pressed") - - return keys_value - - def _scale(self, value: float, max_val: float, threshold: float, axis_code: int) -> float: - """Scale joystick input to velocity command using actual axis ranges.""" - absinfo = self.axis_ranges[axis_code] - min_in = absinfo.min - max_in = absinfo.max - - mapped_value = ((value - min_in) / (max_in - min_in) * 2 - 1) * max_val - - if abs(mapped_value) < threshold: - return 0.0 - return -mapped_value - - def close(self): - """Clean up resources.""" - self._running = False - if hasattr(self, "joystick") and self.joystick is not None: - self.joystick.close() - if hasattr(self, "joystick_runner") and self.joystick_runner is not None: - self.joystick_runner.join(timeout=1.0) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/unitree/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/unitree/__init__.py deleted file mode 100644 index a2a9a1ff..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/unitree/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Unitree command sender implementation. -""" - -from .unitree_command_sender import UnitreeCommandSender - -__all__ = ["UnitreeCommandSender"] diff --git a/src/holosoma_inference/holosoma_inference/sdk/command_sender/unitree/unitree_command_sender.py b/src/holosoma_inference/holosoma_inference/sdk/command_sender/unitree/unitree_command_sender.py deleted file mode 100644 index 39ee7084..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/command_sender/unitree/unitree_command_sender.py +++ /dev/null @@ -1,76 +0,0 @@ -from ..base import BasicCommandSender # noqa: TID252 - - -class UnitreeCommandSender(BasicCommandSender): - """Unitree command sender implementation.""" - - def _init_sdk_components(self): - """Initialize Unitree SDK-specific components.""" - from unitree_sdk2py.core.channel import ChannelPublisher - from unitree_sdk2py.utils.crc import CRC - - robot_type = self.config.robot_type - - if "g1" in robot_type or "h1-2" in robot_type: - from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ - from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ - - self.low_cmd = unitree_hg_msg_dds__LowCmd_() - elif "h1" in robot_type or "go2" in robot_type: - from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ - from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ - - self.low_cmd = unitree_go_msg_dds__LowCmd_() - else: - raise NotImplementedError(f"Robot type {robot_type} is not supported yet") - - # Initialize low command publisher - self.lowcmd_publisher_ = ChannelPublisher("rt/lowcmd", LowCmd_) - self.lowcmd_publisher_.Init() - self.init_unitree_low_cmd() - self.low_state = None - self.crc = CRC() - - def init_unitree_low_cmd(self): - """Initialize Unitree low-level command.""" - robot_type = self.config.robot_type - - # Set head for h1/go2 - if robot_type in {"h1", "go2"}: - self.low_cmd.head[0] = 0xFE - self.low_cmd.head[1] = 0xEF - - self.low_cmd.level_flag = 0xFF - self.low_cmd.gpio = 0 - - for i in range(self.robot.NUM_MOTORS): - if self.is_weak_motor(i): - self.low_cmd.motor_cmd[i].mode = 0x01 - else: - self.low_cmd.motor_cmd[i].mode = 0x0A - self.low_cmd.motor_cmd[i].q = self.robot.UNITREE_LEGGED_CONST["PosStopF"] - self.low_cmd.motor_cmd[i].kp = 0 - self.low_cmd.motor_cmd[i].dq = self.robot.UNITREE_LEGGED_CONST["VelStopF"] - self.low_cmd.motor_cmd[i].kd = 0 - self.low_cmd.motor_cmd[i].tau = 0 - - # Set mode for g1/h1-2 - if robot_type in {"g1_29dof", "g1_23dof", "h1-2_21dof", "h1-2_27dof"} and self.config.unitree_legged_const: - self.low_cmd.mode_machine = self.config.unitree_legged_const.get("MODE_MACHINE") - self.low_cmd.mode_pr = self.config.unitree_legged_const.get("MODE_PR") - - def send_command(self, cmd_q, cmd_dq, cmd_tau, dof_pos_latest=None, kp_override=None, kd_override=None): - """Send command to Unitree robot.""" - motor_cmd = self.low_cmd.motor_cmd - self._fill_motor_commands( - motor_cmd, - cmd_q, - cmd_dq, - cmd_tau, - kp_override=kp_override, - kd_override=kd_override, - ) - - # Add CRC and send - self.low_cmd.crc = self.crc.Crc(self.low_cmd) - self.lowcmd_publisher_.Write(self.low_cmd) diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/__init__.py deleted file mode 100644 index b366cf93..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -from holosoma_inference.config.config_types.robot import RobotConfig - -from .base import BasicStateProcessor - - -def create_state_processor(config: RobotConfig, lcm=None): - """ - Factory function to create the appropriate state processor based on configuration. - - Args: - config: Robot configuration dictionary - lcm: LCM instance (optional, for LCM-based processors) - - Returns: - An instance of the appropriate state processor class - """ - sdk_type = config.sdk_type - - if sdk_type == "unitree": - from .unitree import UnitreeStateProcessor - - return UnitreeStateProcessor(config, lcm) - if sdk_type == "booster": - from .booster import BoosterStateProcessor - - return BoosterStateProcessor(config, lcm) - if sdk_type in ["lcm_unitree", "lcm_booster"]: - raise ValueError( - f"LCM SDK types are no longer supported. Please use 'unitree' or 'booster' instead of '{sdk_type}'" - ) - raise ValueError(f"Unsupported SDK type: {sdk_type}") - - -# For backward compatibility -StateProcessor = create_state_processor - -__all__ = [ - "BasicStateProcessor", - "BoosterStateProcessor", - "StateProcessor", # Backward compatibility - "UnitreeStateProcessor", - "create_state_processor", -] diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/base/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/base/__init__.py deleted file mode 100644 index 87ab2626..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/base/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -Base State Processor module. - -This module contains the base classes for state processing functionality. -""" - -from .basic_state_processor import BasicStateProcessor - -__all__ = ["BasicStateProcessor"] diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/base/basic_state_processor.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/base/basic_state_processor.py deleted file mode 100644 index e5f93c9b..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/base/basic_state_processor.py +++ /dev/null @@ -1,57 +0,0 @@ -from abc import ABC, abstractmethod - -import numpy as np - -from holosoma_inference.config.config_types.robot import RobotConfig - - -class BasicStateProcessor(ABC): - """Abstract base class for state processor implementations.""" - - def __init__(self, config: RobotConfig, lcm=None): - self.lcm = lcm - self.config = config - self.num_motor = self.config.num_motors - self.sdk_type = self.config.sdk_type - self.motor_type = self.config.motor_type - - # Initialize state arrays - self.num_dof = self.config.num_joints - # 3 + 4 + num_dof (base_pos + base_quat + joint_pos) - self._init_q = np.zeros(3 + 4 + self.num_dof) - self.q = self._init_q - self.dq = np.zeros(3 + 3 + self.num_dof) # base_lin_vel + base_ang_vel + joint_vel - self.ddq = np.zeros(3 + 3 + self.num_dof) # base_lin_acc + base_ang_acc + joint_acc - self.tau_est = np.zeros(3 + 3 + self.num_dof) # base_lin_force + base_ang_torque + joint_torque - self.temp_first = np.zeros(self.num_dof) - self.temp_second = np.zeros(self.num_dof) - self.robot_state_data = None - - # Initialize SDK-specific components - self._init_sdk_components() - - @abstractmethod - def _init_sdk_components(self): - """Initialize SDK-specific components. Must be implemented by subclasses.""" - - @abstractmethod - def prepare_low_state(self, msg): - """Prepare low-level state data from message. Must be implemented by subclasses.""" - - def get_robot_state_data(self): - """Get the current robot state data.""" - return self.robot_state_data - - @abstractmethod - def _extract_imu_data(self, imu_state): - """Extract IMU data from state message. Must be implemented by subclasses.""" - - @abstractmethod - def _extract_joint_data(self, robot_joint_state): - """Extract joint data from state message. Must be implemented by subclasses.""" - - def _create_robot_state_data(self): - """Create the final robot state data array.""" - return np.array( - self.q.tolist() + self.dq.tolist() + self.tau_est.tolist() + self.ddq.tolist(), dtype=np.float64 - ).reshape(1, -1) diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/booster/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/booster/__init__.py deleted file mode 100644 index 8c727963..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/booster/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Booster state processor implementation. -""" - -from .booster_state_processor import BoosterStateProcessor - -__all__ = ["BoosterStateProcessor"] diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/booster/booster_state_processor.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/booster/booster_state_processor.py deleted file mode 100644 index dcd96ad2..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/booster/booster_state_processor.py +++ /dev/null @@ -1,64 +0,0 @@ -from booster_robotics_sdk import B1LowStateSubscriber - -from holosoma_inference.utils.math.quat import rpy_to_quat - -from ..base import BasicStateProcessor # noqa: TID252 - - -class BoosterStateProcessor(BasicStateProcessor): - """Booster state processor implementation.""" - - def _init_sdk_components(self): - """Initialize Booster SDK-specific components.""" - - robot_type = self.config.robot_type - - if robot_type in {"t1_23dof", "t1_29dof"}: - self.robot_lowstate_subscriber = B1LowStateSubscriber(self.low_state_handler_b1) - self.robot_lowstate_subscriber.InitChannel() - else: - raise NotImplementedError(f"Robot type {robot_type} is not supported") - - def prepare_low_state(self, msg): - """Prepare Booster low-level state data from message.""" - if not msg: - print("robot_low_state is None") - return None - - imu_state = msg.imu_state - self._extract_imu_data(imu_state) - - # Choose the appropriate motor state based on motor type - if self.motor_type == "serial": - robot_joint_state = msg.motor_state_serial - elif self.motor_type == "parallel": - robot_joint_state = msg.motor_state_parallel - else: - raise ValueError(f"Invalid motor type '{self.motor_type}'. Expected 'serial' or 'parallel'.") - - self._extract_joint_data(robot_joint_state) - - self.robot_state_data = self._create_robot_state_data() - return self.robot_state_data - - def _extract_imu_data(self, imu_state): - """Extract IMU data from Booster state message.""" - - # base quaternion - self.q[0:3] = 0.0 # base position (assumed to be at origin) - rpy = imu_state.rpy - self.q[3:7] = rpy_to_quat(rpy) - self.dq[3:6] = imu_state.gyro - self.ddq[0:3] = imu_state.acc - - def _extract_joint_data(self, robot_joint_state): - """Extract joint data from Booster state message.""" - for i in range(self.num_dof): - motor_idx = self.config.joint2motor[i] - self.q[7 + i] = robot_joint_state[motor_idx].q - self.dq[6 + i] = robot_joint_state[motor_idx].dq - self.tau_est[6 + i] = robot_joint_state[motor_idx].tau_est - - def low_state_handler_b1(self, msg): - """Handle Booster B1 low-level state messages.""" - self.robot_state_data = self.prepare_low_state(msg) diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/unitree/__init__.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/unitree/__init__.py deleted file mode 100644 index aefd0333..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/unitree/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Unitree state processor implementation. -""" - -from .unitree_state_processor import UnitreeStateProcessor - -__all__ = ["UnitreeStateProcessor"] diff --git a/src/holosoma_inference/holosoma_inference/sdk/state_processor/unitree/unitree_state_processor.py b/src/holosoma_inference/holosoma_inference/sdk/state_processor/unitree/unitree_state_processor.py deleted file mode 100644 index 0a51a695..00000000 --- a/src/holosoma_inference/holosoma_inference/sdk/state_processor/unitree/unitree_state_processor.py +++ /dev/null @@ -1,62 +0,0 @@ -from unitree_sdk2py.core.channel import ChannelSubscriber -from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ as LowState_go -from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ as LowState_hg - -from ..base import BasicStateProcessor # noqa: TID252 - - -class UnitreeStateProcessor(BasicStateProcessor): - """Unitree state processor implementation.""" - - def _init_sdk_components(self): - """Initialize Unitree SDK-specific components.""" - - robot_type = self.config.robot_type - - if "g1" in robot_type or "h1-2" in robot_type: - self.robot_lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_hg) - self.robot_lowstate_subscriber.Init(self.low_state_handler_hg, 1) - elif "h1" in robot_type or "go2" in robot_type: - self.robot_lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_go) - self.robot_lowstate_subscriber.Init(self.low_state_handler_go, 1) - else: - raise NotImplementedError(f"Robot type {robot_type} is not supported") - - def prepare_low_state(self, msg): - """Prepare Unitree low-level state data from message.""" - if not msg: - print("robot_low_state is None") - return None - - imu_state = msg.imu_state - self._extract_imu_data(imu_state) - - robot_joint_state = msg.motor_state - self._extract_joint_data(robot_joint_state) - - self.robot_state_data = self._create_robot_state_data() - return self.robot_state_data - - def _extract_imu_data(self, imu_state): - """Extract IMU data from Unitree state message.""" - # base quaternion - self.q[0:3] = 0.0 # base position (assumed to be at origin) - self.q[3:7] = imu_state.quaternion # w, x, y, z - self.dq[3:6] = imu_state.gyroscope - self.ddq[0:3] = imu_state.accelerometer - - def _extract_joint_data(self, robot_joint_state): - """Extract joint data from Unitree state message.""" - for i in range(self.num_dof): - motor_idx = self.robot.JOINT2MOTOR[i] - self.q[7 + i] = robot_joint_state[motor_idx].q - self.dq[6 + i] = robot_joint_state[motor_idx].dq - self.tau_est[6 + i] = robot_joint_state[motor_idx].tau_est - - def low_state_handler_go(self, msg): - """Handle Unitree GO low-level state messages.""" - self.robot_state_data = self.prepare_low_state(msg) - - def low_state_handler_hg(self, msg): - """Handle Unitree HG low-level state messages.""" - self.robot_state_data = self.prepare_low_state(msg) diff --git a/src/holosoma_inference/setup.py b/src/holosoma_inference/setup.py index b1d7637b..71a5ccda 100644 --- a/src/holosoma_inference/setup.py +++ b/src/holosoma_inference/setup.py @@ -48,6 +48,7 @@ "wandb", "zmq", "defusedxml", + "evdev", ], extras_require={ "dev": [