From 339842de07e62203c20f74ea11d95709d862add2 Mon Sep 17 00:00:00 2001 From: ML1907 <145844072+ML1907@users.noreply.github.com> Date: Mon, 2 Dec 2024 11:00:40 +0100 Subject: [PATCH] Mpc integration (#42) * Add svea vision as a submodule * basic navigation stack integration implemented. to be tested * add lidar to localize.launch outdoor * small change * fixed lidar ip config and dependency in svea sensor * added lidar transform * implemented blp converter node. configured simulation enviroment for fast dev. * added lidar transform in simulator. ros nav works in simulator * base navigation + teb advanced nav. have been configured. both working in the simulator. costmaps are still handled wrongly. * improved TEB tuning in simulator. Costmaps now make more sense either. * change costmap param * map loading with datum for svea * make .py executable * update launch * yaml update * add username for gps * added datum in nav.launch. last tuning before first test on svea. * update ntrip topics with old version * fallback to rtcm message from mavros * static and inflation layers well configured. teb planner perfectly navigating through obstacles from map. Todo: fix global layer. * added lidar visualizer for debugging * added new map with fake static obstacles. changed parameters for localization bag. added new set_datum node. * real time testing updates. change ll controller. higher speed ref for teb * minor change * add new controller to collect some new rosbags * fix gps transform. added changes to local controller to set min velocity * added steering rate of change to be tested. fixed teb params * final changes. Tested on SVEA. now navigates smoothly in static env. * pedestrian poses simulator added. still have some troubles handling them in local costmap (does not get cleared prop.) * added poses in base link instead of map frame * added poses in base link instead of map * added svea vision parameters in launch file * Add requirements.txt to .gitignore * prova * gitignore update * added svea vision dep * fixed local costmap clearing. to be tested. first tuning of teb with dynamic obstacles * update image tag * config update * updated image to the same used for ismir. updated rtk manager to current version. added util/start * added frame id param for object pose node in nav. launch file * discorporate gps.launch * fix * fix * fix * fix * record topic updated to debug gps+zed issue * fix * reorganize launch * bags folder * update static tf * tf problem fixed. * added reachability analysis output simulator (dynamic inflation). teb does not incorporate it well. tuned pointcloud method instead. * added dynamic obstacles as polygons * update * planner moved in map frame * smaller zonotopes * added corrections from testing. added crowded behaviour (wip) * crowded behaviour tested on simul. added controller velocity smoothing feature. * added feature in crowded behaviour. * added new map. * itrl_sidewalk tests * set lower limits for teb acceleration * bigger zonotope * removed velocity reset when backwards. removed crowded behaviour * re-loaded original util/run, works better. retry velocity feature in controller. smaller zonotopes * added again homotopy plan, crowded behaviour. * increased min_dist. more max steering. bigger min turn_rad * improvements after testing * parameters configuration offering great results on testing. * updated files. tidy up * code clean up. commentig. readme schedule setup (wip) * added brief documentation TEB navigation * repository setup for easier controller comparison: teb and mpc * added mpc class and test script. results are still wrong and class have to be fragmented. main script is still on development. updated build file to handle svea base and ismir image (not final version) * mpc set up. still working on achieveing goo reverse. still need path planner. * [parking skills]: mpc successfully reach goal state, with good precision, performing reverse manouvers . [fix]: reduce continuos back and forth * mpc works but still present oscillatory behaviour when parallel to goal. pre change of control inputs * best parameters tuning so far [backup] * mpc with steering rate and acceleration as inputs * global path planner configured * static path planner and reference extraction mechanism tested on the simulator * update config * added mocap repo for testing. * added dependencies for svea mocap * added ros package of mocap * remove dep cause build fail * fix mocap integration * mocap correction * try * fixed mocap import bug * try * try * [FIX] path planner * added mocap to map transform * removed transform * added measured_speed publisher * added change of brach in svea mocap * -reintroduced speed feedback -fixed comments -fixed spin problem (added adaptive final weight matrix) * -implemented mpc reset at every new trajectory -found much better tuning. svea does not get stuck anymore * -changed parameters completely on testing with svea0 -introduced measured_dt in main -introduced steering bias for svea0 -removed speed feedback in optimal control computation good experimental results * code refactoring: main file, increased code efficiency * removed adaptive horizon and changed weight matrices * code refactoring for pull request. * fix: remove build, devel and reset .gitignore * fix: remove some more unrelated changes * fix: remove __pycache__ * fix: need to checkout back to when branching out * fix: remove svea_navigation/maps and remove unrelated changes in svea_sensors * fix: roll back newer updates to avoid conflict * fix: remove __pycache__ * fix: remove svea_navigation * add: load_params helper function, refactor mpc parameters to use ROS parameters * rename mpc_params.yaml -> mpc_default.yaml * added mpc example compatible with simulation and mocap [not tested]. removed clicked point service, replaced with subscriber * fix mpc example: works on simulation * added more documentation as docstrings. simplified example code * Ignore submodules and directories in mpc_integration * removed submodules, updated gitignore * updated mpc param with values tested successfully on svea7 * updated files to take relative config_ns --------- Co-authored-by: Sulthan Suresh Fazeela Co-authored-by: xiao3913 Co-authored-by: Kaj Munhoz Arfvidsson --- .gitignore | 2 +- requirements.txt | 2 + src/svea_core/params/.gitkeep | 0 src/svea_core/params/mpc_default.yaml | 59 ++++ src/svea_core/src/svea/controllers/mpc.py | 297 ++++++++++++++++ src/svea_core/src/svea/helpers.py | 17 + src/svea_core/src/svea/simulators/sim_SVEA.py | 1 + .../svea_managers/path_following_sveas.py | 62 ++++ .../launch/mpc_navigation.launch | 45 +++ .../scripts/mocap_examples/mpc.py | 331 ++++++++++++++++++ 10 files changed, 815 insertions(+), 1 deletion(-) delete mode 100644 src/svea_core/params/.gitkeep create mode 100644 src/svea_core/params/mpc_default.yaml create mode 100755 src/svea_core/src/svea/controllers/mpc.py create mode 100644 src/svea_core/src/svea/helpers.py create mode 100644 src/svea_examples/launch/mpc_navigation.launch create mode 100755 src/svea_examples/scripts/mocap_examples/mpc.py diff --git a/.gitignore b/.gitignore index 946666c3..85d2a94f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ __pycache__/ src/.vscode/ venv/ -site/ \ No newline at end of file +site/ diff --git a/requirements.txt b/requirements.txt index ff78f39e..a10d9f8d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ pyserial pyubx2 geopy utm +# This is for NMPC +casadi==3.5.5 diff --git a/src/svea_core/params/.gitkeep b/src/svea_core/params/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/src/svea_core/params/mpc_default.yaml b/src/svea_core/params/mpc_default.yaml new file mode 100644 index 00000000..df131bef --- /dev/null +++ b/src/svea_core/params/mpc_default.yaml @@ -0,0 +1,59 @@ +## +## Configuration file for MPC Controller +## + +mpc: + ## Core Parameters + # The time step in which the optimization problem is divided (unit [s]) + time_step: 0.4 + + # The prediction horizon steps for the mpc optimization problem + prediction_horizon: 10 + + ## Objective Function J + + # Q1 + state_weight_matrix: [1, 0, 0, 0, + 0, 1, 0, 0, + 0, 0, 1, 0, + 0, 0, 0, 0] + + # Q2 + control_rate_weight_matrix: [0, 0, + 0, 0] + + # Q3 + control_weight_matrix: [0.1, 0, + 0, 0.1] + + # Qf + final_state_weight_matrix: [1, 0, 0, 0, + 0, 1, 0, 0, + 0, 0, 1, 0, + 0, 0, 0, 0] + + # Qv + # Note: A big value encourage forward movements rather than backwards. + # FEATURE DOES NOT WORK SO FAR. + forward_speed_weight: 0 + + ## Model Parameters + + # The min/max steering angle allowed for SVEA (unit [degrees]) + steering_min: -40 + steering_max: 40 + + # The min/max steering rate allowed for SVEA (unit [degrees/s]) + steering_rate_min: -35 + steering_rate_max: 35 + + # The min/max velocity allowed for SVEA (unit [m/s^2]) + velocity_min: -0.4 + velocity_max: 0.4 + + # The min/max acceleration allowed for SVEA (unit [m/s^2]) + acceleration_min: -0.2 + acceleration_max: 0.4 + + # The wheelbase of the vehicle (unit [m]). + wheelbase: 0.32 \ No newline at end of file diff --git a/src/svea_core/src/svea/controllers/mpc.py b/src/svea_core/src/svea/controllers/mpc.py new file mode 100755 index 00000000..082c17c6 --- /dev/null +++ b/src/svea_core/src/svea/controllers/mpc.py @@ -0,0 +1,297 @@ +#! /usr/bin/env python3 + +import casadi as ca +import numpy as np + +from svea.helpers import load_param + +class MPC_casadi: + + def __init__(self, vehicle_name='', config_ns='~mpc'): + """ + This is the release 0 of a general-purpose Nonlinear Model Predictive Controller (NMPC) + designed for the SVEA platform. It offers the following features: + + - Nonlinear dynamics with state representation [x, y, theta, v, steering] + and control inputs [steering_rate, acceleration]. + - Accepts a reference trajectory as input (see `compute_control` method) + and optimizes the predicted trajectory to minimize the deviation of + the predicted points from the next reference point. + - Functions as both an optimal path planner and path tracker: + - For path planning and tracking: Reference points should be spaced apart to give + the MPC the freedom to determine the optimal path. + - For tracking only: Adjust parameters such as prediction horizon to + follow the reference trajectory closely. + + Limitations: + - Neither static nor dynamic obstacles are considered + in this implementation. + + Initialize the MPC controller with the given parameters: + + :param L: Wheelbase of the vehicle (unit [m]) + :param N: Prediction horizon steps + :param dt: Sampling time + :param min_steering: Minimum steering angle [rad] + :param max_steering: Maximum steering angle [rad] + :param min_acceleration: Minimum acceleration [m/s^2] + :param max_acceleration: Maximum acceleration [m/s^2] + :param min_velocity: Minimum velovity [m/s] + :param max_velocity: Maximum velocity [m/s] + :param Q1: State weight matrix (4x4) + :param Q2: Control rate weight matrix (2x2) + :param Q3: Control weight matrix (2x2) + :param Qf: Final state weight matrix (4x4) + :param Qv: Forward_speed_weight scalar + """ + + ## Core Parameters + + # The prediction horizon steps for the mpc optimization problem. + self.N = load_param(f'{config_ns}/prediction_horizon') + self.current_horizon = self.N # Initially set to max horizon + + # The time step in which the optimization problem is divided (unit [s]). + self.dt = ca.DM(load_param(f'{config_ns}/time_step')) + + ## Load weight matrices + # Note: we convert to dense matrix to allow symbolic operations. + + self.Q1_list = load_param(f'{config_ns}/state_weight_matrix') + self.Q1 = ca.DM(np.array(self.Q1_list).reshape((4, 4))) + + self.Q2_list = load_param(f'{config_ns}/control_rate_weight_matrix') + self.Q2 = ca.DM(np.array(self.Q2_list).reshape((2, 2))) + + self.Q3_list = load_param(f'{config_ns}/control_weight_matrix') + self.Q3 = ca.DM(np.array(self.Q3_list).reshape((2, 2))) + + self.Qf_list = load_param(f'{config_ns}/final_state_weight_matrix') + self.Qf = ca.DM(np.array(self.Qf_list).reshape((4, 4))) + + self.Qv_num = load_param(f'{config_ns}/forward_speed_weight') + self.Qv = ca.DM(self.Qv_num) + + ## Model Parameters + + self.min_steering = np.radians(load_param(f'{config_ns}/steering_min')) + self.max_steering = np.radians(load_param(f'{config_ns}/steering_max')) + + self.min_steering_rate = np.radians(load_param(f'{config_ns}/steering_rate_min')) + self.max_steering_rate = np.radians(load_param(f'{config_ns}/steering_rate_max')) + + self.min_velocity = load_param(f'{config_ns}/velocity_min') + self.max_velocity = load_param(f'{config_ns}/velocity_max') + + self.min_acceleration = load_param(f'{config_ns}/acceleration_min') + self.max_acceleration = load_param(f'{config_ns}/acceleration_max') + + # Wheelbase of the vehicle (unit [m]). + self.L = ca.DM(load_param(f'{config_ns}/wheelbase')) + + ## Setup CasADi + + self.opti = ca.Opti() + + self.define_state_and_control_variables() + self.set_objective_function() + self.set_state_constraints() + self.set_control_input_constraints() + self.set_solver_options() + + def compute_control(self, state, reference_trajectory): + """ + Compute the control actions (steering, acceleration) using MPC. + + :param state: Current state of the vehicle [x, y, theta, v, delta] + :param reference_trajectory: Reference trajectory [4, N+1] + :return: steering, velocity + """ + # Bound the initial state to respect constraints + bounded_state = self.bound_initial_state(state) + + # Enlarge the reference trajectory with a fictitious velocity and steering value for feasibility. + # Add two rows of zeros to the reference_trajectory matrix. + reference_trajectory = ca.vertcat(reference_trajectory, ca.DM.zeros(2, reference_trajectory.shape[1])) + + # Set current state and reference trajectory for the active part of the horizon + self.opti.set_value(self.x_init, bounded_state) + self.opti.set_value(self.x_ref[:, :self.current_horizon+1], reference_trajectory[:, :self.current_horizon+1]) + + # Solve the optimization problem + self.sol = self.opti.solve() + + # Extract control actions (acceleration and steering rate) + acceleration = self.sol.value(self.u[0, 0]) + steering_rate = self.sol.value(self.u[1, 0]) + + return steering_rate, acceleration + + + def get_optimal_control(self, all=True): + """ + This method returns the optimal control computed by the mpc. + :param all: Flag to indicate if all of the optimal control for the + whole prediction should be returned or only the first step. + :type all: bool + :return: The optimal control computed by the mpc controller. + :rtype: NumPy array + """ + if self.sol is not None: + u_opt = np.array(self.sol.value(self.u)) + if all: + return u_opt + else: + return u_opt[:,0] + else: + return None + + def get_optimal_states(self): + """ + This method returns the optimal states computed by the mpc controller + for the whole prediction horizon. + :return: The optimal states for the whole prediction horizon. + :rtype: NumPy array + """ + if self.sol is not None: + return np.array(self.sol.value(self.x)) # Note: to get only optimized one: self.sol.value(self.x[:, :self.current_horizon]) + else: + return None + + def define_state_and_control_variables(self): + # Define state and control variables + self.x = self.opti.variable(5, self.N + 1) # state = [x, y, theta, v, steering] + self.u = self.opti.variable(2, self.N) # input = [steering_rate, acceleration] + + self.x_init = self.opti.parameter(5) # Initial state + self.x_ref = self.opti.parameter(5, self.N + 1) # Reference trajectory + + def set_objective_function(self): + # Define the objective function: + # J = (x[k]-x_ref[k])^T Q1 (x[k]-x_ref[k]) + (u[k+1] - u[k])^T Q2 (u[k+1] - u[k]) + u[k]^T Q3 u[k] + Qv max(0,-x[3])^2 + self.objective = 0 + for k in range(self.current_horizon): + # State error term (ignore delta in reference trajectory) + state_error = self.x[0:4, k] - self.x_ref[0:4, k] + + # Control input rate of change term (u[k+1] - u[k]) + if k < self.current_horizon - 1: + input_cost = self.u[:, k+1] - self.u[:, k] + else: + input_cost = ca.DM.zeros(self.u.shape[0], 1) + + # Penalize for negative velocity (soft constraint) + velocity_penalty = ca.fmax(0, -self.x[3, k]) # Penalize if v < 0 + + # Accumulate the terms into the objective + self.objective += (ca.mtimes([state_error.T, self.Q1, state_error]) + + ca.mtimes([input_cost.T, self.Q2, input_cost]) + + ca.mtimes([self.u[:, k].T, self.Q3, self.u[:, k]]) + + ca.mtimes([velocity_penalty.T, self.Qv, velocity_penalty])) + + # Final state cost + final_state_error = self.x[0:4, self.current_horizon] - self.x_ref[0:4, self.current_horizon] + self.objective += ca.mtimes([final_state_error.T, self.Qf, final_state_error]) + + # Specify type of optimization problem + self.opti.minimize(self.objective) + + def set_state_constraints(self): + # Initial state constraint + self.opti.subject_to(self.x[:, 0] == self.x_init) + + # Vehicle dynamics constraints - Simple kinematic bycicle model + for k in range(self.N): + x_next = self.x[0, k] + self.dt * self.x[3, k] * ca.cos(self.x[2, k]) # x_k+1 = x_k + dt * v_k * cos(theta_k) + y_next = self.x[1, k] + self.dt * self.x[3, k] * ca.sin(self.x[2, k]) # y_k+1 = y_k + dt * v_k * sin(theta_k) + theta_next = self.x[2, k] + self.dt * (self.x[3, k] / self.L) * ca.tan(self.x[4, k]) # theta_k+1 = theta_k + dt * v_k * tan(delta_k) / L + v_next = self.x[3, k] + self.dt * self.u[0, k] # v_k+1 = v_k + dt * a_k + delta_next = self.x[4, k] + self.dt * self.u[1, k] # delta_k+1 = delta_k + dt * steering_rate_k + + self.opti.subject_to(self.x[0, k + 1] == x_next) + self.opti.subject_to(self.x[1, k + 1] == y_next) + self.opti.subject_to(self.x[2, k + 1] == theta_next) + self.opti.subject_to(self.x[3, k + 1] == v_next) + self.opti.subject_to(self.x[4, k + 1] == delta_next) + + # Velocity constraints + self.opti.subject_to(self.x[3, k] <= self.max_velocity) + self.opti.subject_to(self.x[3, k] >= self.min_velocity) + + # Steering angle constraints + self.opti.subject_to(self.x[4, k] <= self.max_steering) + self.opti.subject_to(self.x[4, k] >= self.min_steering) + + def set_control_input_constraints(self): + # Input constraints (acceleration, steering rate) + for k in range(self.N): + self.opti.subject_to(self.min_acceleration <= self.u[0, k]) + self.opti.subject_to(self.u[0, k] <= self.max_acceleration) + + # Steering rate constraint + self.opti.subject_to(self.min_steering_rate <= self.u[1, k]) + self.opti.subject_to(self.u[1, k] <= self.max_steering_rate) + + def set_solver_options(self): + # Set solver options + opts = {"ipopt.print_level": 0, "print_time": 0} + self.opti.solver("ipopt", opts) + + def bound_initial_state(self,state): + """ + This method checks if the initial state provided to the mpc, which could come from the localization stack, + is within the allowed bounds. If not, it clamps it to make the optimization problem feasible. + """ + # Ensure the velocity is within the specified bounds + clamped_velocity = max(self.min_velocity, min(state[3], self.max_velocity)) + + # Create a new state with the clamped velocity + bounded_state = state.copy() + bounded_state[3] = clamped_velocity + + return bounded_state + + def set_new_weight_matrix(self, matrix_name, new_value): + """ + Dynamically adjust one of the weight matrices. + Check if the matrix exists as an attribute and if so, update it. + """ + if hasattr(self, matrix_name): + try: + # Overwrite with the new dense matrix + setattr(self, matrix_name, ca.DM(new_value)) + # Redefine objective function with updated matrix. + self.set_objective_function() + except Exception as e: + print(f"Failed to update {matrix_name}: {e}") + else: + print(f"Matrix {matrix_name} does not exist in MPC class.") + + + def set_new_prediction_horizon(self, new_horizon): + """ + Dynamically adjust the active horizon for the optimization problem. + When the horizon is reduced, you simply freeze unused variables and update the reference trajectory + and state for the active part of the horizon. The solver will then only optimize over the active steps. + """ + self.current_horizon = new_horizon + + # Redefine objective function with new horizon. + self.set_objective_function() + + + def reset_parameters(self): + """ + Reset the core parameters and weight matrices of the MPC instance to their initial values. + Useful for restoring values to their original state after runtime modifications. + """ + self.current_horizon = self.N # Reset to max horizon + + # Reset weight matrices + self.Q1 = ca.DM(np.array(self.Q1_list).reshape((4, 4))) + self.Q2 = ca.DM(np.array(self.Q2_list).reshape((2, 2))) + self.Q3 = ca.DM(np.array(self.Q3_list).reshape((2, 2))) + self.Qf = ca.DM(np.array(self.Qf_list).reshape((4, 4))) + self.Qv = ca.DM(self.Qv_num) + # Reset objective function with initial values. + self.set_objective_function() \ No newline at end of file diff --git a/src/svea_core/src/svea/helpers.py b/src/svea_core/src/svea/helpers.py new file mode 100644 index 00000000..3a474d4b --- /dev/null +++ b/src/svea_core/src/svea/helpers.py @@ -0,0 +1,17 @@ +"""Helper functions for doing annoying ROS stuff.""" + +import rospy + +def load_param(name, value=None): + """Function used to get parameters from ROS parameter server + + :param name: name of the parameter + :type name: string + :param value: default value of the parameter, defaults to None + :type value: _type_, optional + :return: value of the parameter + :rtype: _type_ + """ + if value is None: + assert rospy.has_param(name), f'Missing parameter "{name}"' + return rospy.get_param(name, value) diff --git a/src/svea_core/src/svea/simulators/sim_SVEA.py b/src/svea_core/src/svea/simulators/sim_SVEA.py index 069fc60a..742e56de 100755 --- a/src/svea_core/src/svea/simulators/sim_SVEA.py +++ b/src/svea_core/src/svea/simulators/sim_SVEA.py @@ -87,6 +87,7 @@ def __init__(self, self.vehicle_name = namespace.split('/')[-2] self._map_frame_id = 'map' + self._laser_frame_id = sub_namespace + 'laser' self._odom_frame_id = sub_namespace + 'odom' self._base_link_frame_id = sub_namespace + 'base_link' diff --git a/src/svea_core/src/svea/svea_managers/path_following_sveas.py b/src/svea_core/src/svea/svea_managers/path_following_sveas.py index 6676cc02..b976392b 100755 --- a/src/svea_core/src/svea/svea_managers/path_following_sveas.py +++ b/src/svea_core/src/svea/svea_managers/path_following_sveas.py @@ -11,6 +11,9 @@ from svea.data import TrajDataHandler from svea.controllers import pure_pursuit from svea.controllers.pure_pursuit import PurePursuitController +from svea.interfaces import ActuationInterface +from svea.sensors import Lidar +from svea.data import BasicDataHandler __license__ = "MIT" __maintainer__ = "Frank Jiang, Tobias Bolin" @@ -163,3 +166,62 @@ def send_accel(self, accel, dt): self.controller.target_velocity += accel * dt steering, velocity = self.compute_control() self.send_control(steering, velocity) + + + +class SVEAManagerMPC(SVEAManager): + """Extended SVEAManager class with an additional controller_config_path parameter used to ease MPC setup. + + This class inherits from SVEAManager and adds the capability to pass a + configuration path specifically to the controller. + + The __init__ method is redefined to delay the creation of the controller, + allowing the controller to be initialized with both vehicle_name and + controller_config_path, which is not supported in the base SVEAManager. + + :param localizer: A chosen localization interface class constructor + :type localizer: class + :param controller: A chosen controller class constructor + :type controller: class + :param actuation: A chosen actuation interface class constructor, + defaults to ActuationInterface + :type actuation: class + :param data_handler: A chosen data handler class constructor, + defaults to BasicDataHandler + :type data_handler: class + :param vehicle_name: Name of vehicle; used to initialize each + software module, defaults to '' + :type vehicle_name: str + :param controller_config_path: Path to the configuration file for the controller, + defaults to '' + :type controller_config_path: str + """ + + def __init__(self, localizer, controller, + actuation=ActuationInterface, + data_handler=BasicDataHandler, + vehicle_name='', + controller_config_path=''): + + # Manually initialize components from SVEAManager except for the controller + self.vehicle_name = vehicle_name + sub_namespace = vehicle_name + '/' if vehicle_name else '' + self._emergency_topic = '{}lli/emergency'.format(sub_namespace) + + # Initialize localizer, actuation, and data handler + self.localizer = localizer(vehicle_name) + self.actuation = actuation(vehicle_name) + self.data_handler = data_handler(vehicle_name) + + # Initialize the controller with the additional controller_config_path parameter + self.controller = controller(vehicle_name, controller_config_path) + + # Initialize additional components from SVEAManager + self.lidar = Lidar() + + # Bring localizer state out into manager + self.state = self.localizer.state + self.last_state_time = None + + # Set up automatic state logging + self.localizer.add_callback(self.data_handler.log_state) \ No newline at end of file diff --git a/src/svea_examples/launch/mpc_navigation.launch b/src/svea_examples/launch/mpc_navigation.launch new file mode 100644 index 00000000..66e4b51a --- /dev/null +++ b/src/svea_examples/launch/mpc_navigation.launch @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/svea_examples/scripts/mocap_examples/mpc.py b/src/svea_examples/scripts/mocap_examples/mpc.py new file mode 100755 index 00000000..ad8f5324 --- /dev/null +++ b/src/svea_examples/scripts/mocap_examples/mpc.py @@ -0,0 +1,331 @@ +#! /usr/bin/env python3 + +import numpy as np +import math +import rospy +import tf +from svea.models.bicycle import SimpleBicycleModel +from svea.states import VehicleState +from svea.simulators.sim_SVEA import SimSVEA +from svea.interfaces import LocalizationInterface +try: + from svea_mocap.mocap import MotionCaptureInterface +except ImportError: + pass +from svea.controllers.mpc import MPC_casadi +from svea.svea_managers.path_following_sveas import SVEAManagerMPC +from svea.data import TrajDataHandler, RVIZPathHandler +from std_msgs.msg import Float32 +from geometry_msgs.msg import PoseArray, PoseStamped +from svea.simulators.viz_utils import publish_pose_array + + +def load_param(name, value=None): + if value is None: + assert rospy.has_param(name), f'Missing parameter "{name}"' + return rospy.get_param(name, value) + +class mpc_navigation: + """ + ROS Node for controlling and simulating the SVEA vehicle autonomously. + """ + + def __init__(self,sim_dt,mpc): + self.dt = sim_dt + self.mpc = mpc + + ## ROS Parameters + self.USE_RVIZ = load_param('~use_rviz', False) + self.IS_SIM = load_param('~is_sim', False) + self.STATE = load_param('~state', [-3, 0, 0, 0]) # [x,y,yaw,v] wrt map frame. Initial state for simulator. + self.MPC_FREQ = load_param('~mpc_freq', 10) + self.SVEA_MOCAP_NAME = load_param('~svea_mocap_name') + self.DELTA_S = load_param('~delta_s', 5) # static path discretization lenght + self.mpc_config_ns = load_param('~mpc_config_ns') + self.initial_horizon = load_param(f'{self.mpc_config_ns}/prediction_horizon') + self.initial_Qf = load_param(f'{self.mpc_config_ns}/final_state_weight_matrix') + + ## MPC parameters + self.GOAL_REACHED_DIST = 0.2 # The distance threshold (in meters) within which the goal is considered reached. + self.GOAL_REACHED_YAW = 0.2 # The yaw angle threshold (in radians) within which the goal orientation is considered reached. + self.UPDATE_MPC_PARAM = True # A flag indicating if the MPC parameters can be updated when the system is approaching the target. + self.RESET_MPC_PARAM = False # A flag indicating if the MPC parameters should be reset when the system is moving away from the target. + self.predicted_state = None + self.mpc_last_time = rospy.get_time() + self.mpc_dt = 1.0 / self.MPC_FREQ + self.current_horizon = self.initial_horizon + + ## Static Planner parameters + self.APPROACH_TARGET_THR = 5 # The distance threshold (in meters) to define when the system is "approaching" the target. + self.NEW_REFERENCE_THR = 1 # The distance threshold (in meters) to update the next intermediate reference point. + self.goal_pose = None + self.static_path_plan = np.empty((3, 0)) + self.current_index_static_plan = 0 + self.is_last_point = False + + ## Other parameters + self.steering = 0 + self.velocity = 0 + self.state = [] + + if self.IS_SIM is False: + # add steering bias of sveas. {(svea0:28),(svea7:7)} + unitless_steering = 7 + PERC_TO_LLI_COEFF = 1.27 + MAX_STEERING_ANGLE = 40 * math.pi / 180 + steer_percent = unitless_steering / PERC_TO_LLI_COEFF + self.steering_bias = (steer_percent / 100.0) * MAX_STEERING_ANGLE + else: + self.steering_bias = 0 + + self.create_simulator_and_SVEAmanager() + self.init_publishers() + self.init_subscribers() + + def run(self): + while self.keep_alive(): + self.spin() + + def keep_alive(self): + return not rospy.is_shutdown() + + def spin(self): + # Retrieve current state from SVEA localization + state = self.svea.wait_for_state() + self.state = [state.x, state.y, state.yaw, state.v] + # If a static path plan has been computed, run the mpc. + if self.static_path_plan.size > 0 : + # If enough time has passed, run the MPC computation + current_time = rospy.get_time() + measured_dt = current_time - self.mpc_last_time + if measured_dt >= self.mpc_dt : + reference_trajectory, distance_to_next_point = self.get_mpc_current_reference() + if self.is_last_point and distance_to_next_point <= self.APPROACH_TARGET_THR and self.UPDATE_MPC_PARAM: + # Update the prediction horizon and final state weight matrix only once when approaching target + new_Qf = np.array([70, 0, 0, 0, + 0, 70, 0, 0, + 0, 0, 20, 0, + 0, 0, 0, 0]).reshape((4, 4)) + self.svea.controller.set_new_weight_matrix('Qf', new_Qf) + self.UPDATE_MPC_PARAM = False + self.RESET_MPC_PARAM = True # Allow resetting when moving away + + elif self.is_last_point and distance_to_next_point > self.APPROACH_TARGET_THR and self.RESET_MPC_PARAM: + # Reset to initial values only once when moving away from target + self.current_horizon = self.initial_horizon + self.svea.controller.set_new_prediction_horizon(self.initial_horizon) + self.svea.controller.set_new_weight_matrix('Qf', self.initial_Qf) + self.UPDATE_MPC_PARAM = True # Allow updating again when re-approaching + self.RESET_MPC_PARAM = False # Prevent repeated resetting + + if not self.is_goal_reached(distance_to_next_point): + # Run the MPC to compute control + steering_rate, acceleration = self.svea.controller.compute_control([self.state[0],self.state[1],self.state[2],self.velocity,self.steering], reference_trajectory) + self.steering += steering_rate * measured_dt + self.velocity += acceleration * measured_dt + self.predicted_state = self.svea.controller.get_optimal_states() + # Publish the predicted path + self.publish_trajectory(self.predicted_state[0:3, :self.current_horizon+1],self.predicted_trajectory_pub) + else: + # Stop the vehicle if the goal is reached + self.steering, self.velocity = 0, 0 + # Update the last time the MPC was computed + self.mpc_last_time = current_time + + # Publish the latest control target and the estimated(mocap) / measured(in/out loc.) speed. + self.publish_to_foxglove(self.steering, self.velocity, self.state[3]) + # Visualization data and send control + self.svea.send_control(self.steering + self.steering_bias, self.velocity) + self.svea.visualize_data() + + def init_publishers(self): + self.steering_pub = rospy.Publisher('/target_steering_angle', Float32, queue_size=1) + self.velocity_pub = rospy.Publisher('/target_speed', Float32, queue_size=1) + self.velocity_measured_pub = rospy.Publisher('/measured_speed', Float32, queue_size=1) # estimated/measured speed + self.predicted_trajectory_pub = rospy.Publisher('/predicted_path', PoseArray, queue_size=1) + self.static_trajectory_pub = rospy.Publisher('/static_path', PoseArray, queue_size=1) + + def init_subscribers(self): + self.mpc_target_sub = rospy.Subscriber('/mpc_target', PoseStamped, self.mpc_target_callback) + + def create_simulator_and_SVEAmanager(self): + # initial state for simulator. + state = VehicleState(*self.STATE) + # Create simulators, models, managers, etc. + if self.IS_SIM: + + # simulator need a model to simulate + self.sim_model = SimpleBicycleModel(state) + + # start the simulator immediately, but paused + self.simulator = SimSVEA(self.sim_model, + dt=self.dt, + run_lidar=True, + start_paused=True, + publish_odometry=True, + publish_pose=True).start() + + # start the SVEA manager (needed for both sim and real world) + self.svea = SVEAManagerMPC(localizer = LocalizationInterface if self.IS_SIM else MotionCaptureInterface, + controller = self.mpc, + data_handler=RVIZPathHandler if self.USE_RVIZ else TrajDataHandler, + vehicle_name='', + controller_config_path = self.mpc_config_ns) + if not self.IS_SIM: + self.svea.localizer.update_name(self.SVEA_MOCAP_NAME) + + self.svea.start(wait=True) + + # everything ready to go -> unpause simulator + if self.IS_SIM: + self.simulator.toggle_pause_simulation() + + def publish_to_foxglove(self,target_steering,target_speed,measured_speed): + self.steering_pub.publish(target_steering) + self.velocity_pub.publish(target_speed) + self.velocity_measured_pub.publish(measured_speed) + + def get_mpc_current_reference(self): + """ + Retrieves the current reference state for the MPC based on the SVEA's current position. + + Updates the current index in the static path plan if the robot is close to the next point. If at the last point, + it calculates the distance to that point instead. + + Returns: + tuple: (x_ref, distance_to_next_point), where x_ref is the reference state for the prediction horizon + (shape: [3, N+1]) and distance_to_next_point is the distance to the next or last reference point. + """ + if self.is_last_point is False: + distance_to_next_point = self.compute_distance(self.state,self.static_path_plan[:,self.current_index_static_plan]) + if self.current_index_static_plan == self.static_path_plan.shape[1] - 1: + self.is_last_point = True + if distance_to_next_point < self.NEW_REFERENCE_THR and not self.is_last_point: + self.current_index_static_plan += 1 + reference_state = self.static_path_plan[:,self.current_index_static_plan] + x_ref = np.tile(reference_state, (self.initial_horizon+1, 1)).T + return x_ref, distance_to_next_point + + else: + distance_to_last_point = self.compute_distance(self.state,self.static_path_plan[:,-1]) + reference_state = self.static_path_plan[:,-1] + x_ref = np.tile(reference_state, (self.initial_horizon+1, 1)).T + return x_ref, distance_to_last_point + + def mpc_target_callback(self, msg): + """ + Callback function that sets a new goal position and calculates a trajectory. + :param msg: PoseStamped message containing the goal position. + """ + # Set the goal position and log the new goal + self.goal_pose = msg + rospy.loginfo(f"New goal position received: ({self.goal_pose.pose.position.x}, {self.goal_pose.pose.position.y})") + # Compute trajectory (straight line from current position to goal) + self.compute_trajectory() + + def get_yaw_from_pose(self,pose_stamped): + """ + Extracts the yaw from a PoseStamped message. + """ + # Convert the quaternion to Euler angles + orientation = pose_stamped.pose.orientation + quaternion = (orientation.x, orientation.y, orientation.z, orientation.w) + + # Use tf to convert quaternion to Euler angles + euler = tf.transformations.euler_from_quaternion(quaternion) + + # Return the yaw + return euler[2] + + def compute_trajectory(self): + """ + Compute a straight-line trajectory from the current position to the goal using DELTA_S, + including the heading for each point, and publish the path. + """ + if not self.goal_pose or not self.state: + rospy.logwarn("Missing goal or current state for trajectory computation.") + return + # Reset previous trajectory related variables + self.current_index_static_plan = 0 + self.is_last_point = False + self.static_path_plan = np.empty((3, 0)) + # reset control actions + self.velocity = 0 + self.steering = 0 + # reset mpc parameters + self.UPDATE_MPC_PARAM = True + self.RESET_MPC_PARAM = False + self.mpc_last_time = rospy.get_time() + self.svea.controller.reset_parameters() + + # Calculate the straight-line trajectory between current state and goal position + start_x, start_y = self.state[0], self.state[1] + goal_x, goal_y = self.goal_pose.pose.position.x, self.goal_pose.pose.position.y + distance = self.compute_distance([start_x, start_y], [goal_x, goal_y]) + goal_yaw = self.get_yaw_from_pose(self.goal_pose) + + # Compute intermediate points at intervals of DELTA_S + num_points = int(distance // self.DELTA_S) + + for i in range(num_points): + ratio = ((i+1) * self.DELTA_S) / distance + x = start_x + ratio * (goal_x - start_x) + y = start_y + ratio * (goal_y - start_y) + + # Calculate the heading for this point + heading = math.atan2(goal_y - start_y, goal_x - start_x) + + # Stack the computed point as a new column in the array + new_point = np.array([[x], [y], [heading]]) + self.static_path_plan = np.hstack((self.static_path_plan, new_point)) + + # Calculate distance between the last appended point and the goal point. + if self.static_path_plan.size != 0: + last_appended_x = self.static_path_plan[0, -1] + last_appended_y = self.static_path_plan[1, -1] + distance = self.compute_distance([last_appended_x, last_appended_y], [goal_x, goal_y]) + + # If the distance to the goal is too small, replace the last point with the goal directly. + if distance < self.DELTA_S / 2: + # Replace the last appended point with the goal point + self.static_path_plan[:, -1] = np.array([goal_x, goal_y, goal_yaw]) + else: + # Otherwise, append the last point as usual + new_point = np.array([[goal_x], [goal_y], [goal_yaw]]) + self.static_path_plan = np.hstack((self.static_path_plan, new_point)) + else: + # Otherwise, append the last point as usual + new_point = np.array([[goal_x], [goal_y], [goal_yaw]]) + self.static_path_plan = np.hstack((self.static_path_plan, new_point)) + + # Publish the static path + self.publish_trajectory(self.static_path_plan,self.static_trajectory_pub) + + def is_goal_reached(self,distance): + if not self.is_last_point: + return False + elif distance < self.GOAL_REACHED_DIST: + yaw_error = self.state[2] - self.static_path_plan[2,-1] + if abs(yaw_error) < self.GOAL_REACHED_YAW: + return True + else: + return False + else: + return False + + def compute_distance(self,point1,point2): + return np.sqrt((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2) + + def publish_trajectory(self, points, publisher): + pointx, pointy, pointyaw = [], [], [] + for i in range(points.shape[1]): # points should be [3, N] where N is the horizon length + pointx.append(points[0, i]) # x values + pointy.append(points[1, i]) # y values + pointyaw.append(points[2, i]) # yaw values + # Publish the trajectory as a PoseArray + publish_pose_array(publisher, pointx, pointy, pointyaw) + +if __name__ == '__main__': + rospy.init_node('mpc_navigation') + node = mpc_navigation(sim_dt = 0.01, mpc = MPC_casadi) + node.run() \ No newline at end of file