diff --git a/.gitignore b/.gitignore index 946666c3..85d2a94f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ __pycache__/ src/.vscode/ venv/ -site/ \ No newline at end of file +site/ diff --git a/requirements.txt b/requirements.txt index ff78f39e..a10d9f8d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ pyserial pyubx2 geopy utm +# This is for NMPC +casadi==3.5.5 diff --git a/src/svea_core/params/.gitkeep b/src/svea_core/params/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/src/svea_core/params/mpc_default.yaml b/src/svea_core/params/mpc_default.yaml new file mode 100644 index 00000000..df131bef --- /dev/null +++ b/src/svea_core/params/mpc_default.yaml @@ -0,0 +1,59 @@ +## +## Configuration file for MPC Controller +## + +mpc: + ## Core Parameters + # The time step in which the optimization problem is divided (unit [s]) + time_step: 0.4 + + # The prediction horizon steps for the mpc optimization problem + prediction_horizon: 10 + + ## Objective Function J + + # Q1 + state_weight_matrix: [1, 0, 0, 0, + 0, 1, 0, 0, + 0, 0, 1, 0, + 0, 0, 0, 0] + + # Q2 + control_rate_weight_matrix: [0, 0, + 0, 0] + + # Q3 + control_weight_matrix: [0.1, 0, + 0, 0.1] + + # Qf + final_state_weight_matrix: [1, 0, 0, 0, + 0, 1, 0, 0, + 0, 0, 1, 0, + 0, 0, 0, 0] + + # Qv + # Note: A big value encourage forward movements rather than backwards. + # FEATURE DOES NOT WORK SO FAR. + forward_speed_weight: 0 + + ## Model Parameters + + # The min/max steering angle allowed for SVEA (unit [degrees]) + steering_min: -40 + steering_max: 40 + + # The min/max steering rate allowed for SVEA (unit [degrees/s]) + steering_rate_min: -35 + steering_rate_max: 35 + + # The min/max velocity allowed for SVEA (unit [m/s^2]) + velocity_min: -0.4 + velocity_max: 0.4 + + # The min/max acceleration allowed for SVEA (unit [m/s^2]) + acceleration_min: -0.2 + acceleration_max: 0.4 + + # The wheelbase of the vehicle (unit [m]). + wheelbase: 0.32 \ No newline at end of file diff --git a/src/svea_core/src/svea/controllers/mpc.py b/src/svea_core/src/svea/controllers/mpc.py new file mode 100755 index 00000000..082c17c6 --- /dev/null +++ b/src/svea_core/src/svea/controllers/mpc.py @@ -0,0 +1,297 @@ +#! /usr/bin/env python3 + +import casadi as ca +import numpy as np + +from svea.helpers import load_param + +class MPC_casadi: + + def __init__(self, vehicle_name='', config_ns='~mpc'): + """ + This is the release 0 of a general-purpose Nonlinear Model Predictive Controller (NMPC) + designed for the SVEA platform. It offers the following features: + + - Nonlinear dynamics with state representation [x, y, theta, v, steering] + and control inputs [steering_rate, acceleration]. + - Accepts a reference trajectory as input (see `compute_control` method) + and optimizes the predicted trajectory to minimize the deviation of + the predicted points from the next reference point. + - Functions as both an optimal path planner and path tracker: + - For path planning and tracking: Reference points should be spaced apart to give + the MPC the freedom to determine the optimal path. + - For tracking only: Adjust parameters such as prediction horizon to + follow the reference trajectory closely. + + Limitations: + - Neither static nor dynamic obstacles are considered + in this implementation. + + Initialize the MPC controller with the given parameters: + + :param L: Wheelbase of the vehicle (unit [m]) + :param N: Prediction horizon steps + :param dt: Sampling time + :param min_steering: Minimum steering angle [rad] + :param max_steering: Maximum steering angle [rad] + :param min_acceleration: Minimum acceleration [m/s^2] + :param max_acceleration: Maximum acceleration [m/s^2] + :param min_velocity: Minimum velovity [m/s] + :param max_velocity: Maximum velocity [m/s] + :param Q1: State weight matrix (4x4) + :param Q2: Control rate weight matrix (2x2) + :param Q3: Control weight matrix (2x2) + :param Qf: Final state weight matrix (4x4) + :param Qv: Forward_speed_weight scalar + """ + + ## Core Parameters + + # The prediction horizon steps for the mpc optimization problem. + self.N = load_param(f'{config_ns}/prediction_horizon') + self.current_horizon = self.N # Initially set to max horizon + + # The time step in which the optimization problem is divided (unit [s]). + self.dt = ca.DM(load_param(f'{config_ns}/time_step')) + + ## Load weight matrices + # Note: we convert to dense matrix to allow symbolic operations. + + self.Q1_list = load_param(f'{config_ns}/state_weight_matrix') + self.Q1 = ca.DM(np.array(self.Q1_list).reshape((4, 4))) + + self.Q2_list = load_param(f'{config_ns}/control_rate_weight_matrix') + self.Q2 = ca.DM(np.array(self.Q2_list).reshape((2, 2))) + + self.Q3_list = load_param(f'{config_ns}/control_weight_matrix') + self.Q3 = ca.DM(np.array(self.Q3_list).reshape((2, 2))) + + self.Qf_list = load_param(f'{config_ns}/final_state_weight_matrix') + self.Qf = ca.DM(np.array(self.Qf_list).reshape((4, 4))) + + self.Qv_num = load_param(f'{config_ns}/forward_speed_weight') + self.Qv = ca.DM(self.Qv_num) + + ## Model Parameters + + self.min_steering = np.radians(load_param(f'{config_ns}/steering_min')) + self.max_steering = np.radians(load_param(f'{config_ns}/steering_max')) + + self.min_steering_rate = np.radians(load_param(f'{config_ns}/steering_rate_min')) + self.max_steering_rate = np.radians(load_param(f'{config_ns}/steering_rate_max')) + + self.min_velocity = load_param(f'{config_ns}/velocity_min') + self.max_velocity = load_param(f'{config_ns}/velocity_max') + + self.min_acceleration = load_param(f'{config_ns}/acceleration_min') + self.max_acceleration = load_param(f'{config_ns}/acceleration_max') + + # Wheelbase of the vehicle (unit [m]). + self.L = ca.DM(load_param(f'{config_ns}/wheelbase')) + + ## Setup CasADi + + self.opti = ca.Opti() + + self.define_state_and_control_variables() + self.set_objective_function() + self.set_state_constraints() + self.set_control_input_constraints() + self.set_solver_options() + + def compute_control(self, state, reference_trajectory): + """ + Compute the control actions (steering, acceleration) using MPC. + + :param state: Current state of the vehicle [x, y, theta, v, delta] + :param reference_trajectory: Reference trajectory [4, N+1] + :return: steering, velocity + """ + # Bound the initial state to respect constraints + bounded_state = self.bound_initial_state(state) + + # Enlarge the reference trajectory with a fictitious velocity and steering value for feasibility. + # Add two rows of zeros to the reference_trajectory matrix. + reference_trajectory = ca.vertcat(reference_trajectory, ca.DM.zeros(2, reference_trajectory.shape[1])) + + # Set current state and reference trajectory for the active part of the horizon + self.opti.set_value(self.x_init, bounded_state) + self.opti.set_value(self.x_ref[:, :self.current_horizon+1], reference_trajectory[:, :self.current_horizon+1]) + + # Solve the optimization problem + self.sol = self.opti.solve() + + # Extract control actions (acceleration and steering rate) + acceleration = self.sol.value(self.u[0, 0]) + steering_rate = self.sol.value(self.u[1, 0]) + + return steering_rate, acceleration + + + def get_optimal_control(self, all=True): + """ + This method returns the optimal control computed by the mpc. + :param all: Flag to indicate if all of the optimal control for the + whole prediction should be returned or only the first step. + :type all: bool + :return: The optimal control computed by the mpc controller. + :rtype: NumPy array + """ + if self.sol is not None: + u_opt = np.array(self.sol.value(self.u)) + if all: + return u_opt + else: + return u_opt[:,0] + else: + return None + + def get_optimal_states(self): + """ + This method returns the optimal states computed by the mpc controller + for the whole prediction horizon. + :return: The optimal states for the whole prediction horizon. + :rtype: NumPy array + """ + if self.sol is not None: + return np.array(self.sol.value(self.x)) # Note: to get only optimized one: self.sol.value(self.x[:, :self.current_horizon]) + else: + return None + + def define_state_and_control_variables(self): + # Define state and control variables + self.x = self.opti.variable(5, self.N + 1) # state = [x, y, theta, v, steering] + self.u = self.opti.variable(2, self.N) # input = [steering_rate, acceleration] + + self.x_init = self.opti.parameter(5) # Initial state + self.x_ref = self.opti.parameter(5, self.N + 1) # Reference trajectory + + def set_objective_function(self): + # Define the objective function: + # J = (x[k]-x_ref[k])^T Q1 (x[k]-x_ref[k]) + (u[k+1] - u[k])^T Q2 (u[k+1] - u[k]) + u[k]^T Q3 u[k] + Qv max(0,-x[3])^2 + self.objective = 0 + for k in range(self.current_horizon): + # State error term (ignore delta in reference trajectory) + state_error = self.x[0:4, k] - self.x_ref[0:4, k] + + # Control input rate of change term (u[k+1] - u[k]) + if k < self.current_horizon - 1: + input_cost = self.u[:, k+1] - self.u[:, k] + else: + input_cost = ca.DM.zeros(self.u.shape[0], 1) + + # Penalize for negative velocity (soft constraint) + velocity_penalty = ca.fmax(0, -self.x[3, k]) # Penalize if v < 0 + + # Accumulate the terms into the objective + self.objective += (ca.mtimes([state_error.T, self.Q1, state_error]) + + ca.mtimes([input_cost.T, self.Q2, input_cost]) + + ca.mtimes([self.u[:, k].T, self.Q3, self.u[:, k]]) + + ca.mtimes([velocity_penalty.T, self.Qv, velocity_penalty])) + + # Final state cost + final_state_error = self.x[0:4, self.current_horizon] - self.x_ref[0:4, self.current_horizon] + self.objective += ca.mtimes([final_state_error.T, self.Qf, final_state_error]) + + # Specify type of optimization problem + self.opti.minimize(self.objective) + + def set_state_constraints(self): + # Initial state constraint + self.opti.subject_to(self.x[:, 0] == self.x_init) + + # Vehicle dynamics constraints - Simple kinematic bycicle model + for k in range(self.N): + x_next = self.x[0, k] + self.dt * self.x[3, k] * ca.cos(self.x[2, k]) # x_k+1 = x_k + dt * v_k * cos(theta_k) + y_next = self.x[1, k] + self.dt * self.x[3, k] * ca.sin(self.x[2, k]) # y_k+1 = y_k + dt * v_k * sin(theta_k) + theta_next = self.x[2, k] + self.dt * (self.x[3, k] / self.L) * ca.tan(self.x[4, k]) # theta_k+1 = theta_k + dt * v_k * tan(delta_k) / L + v_next = self.x[3, k] + self.dt * self.u[0, k] # v_k+1 = v_k + dt * a_k + delta_next = self.x[4, k] + self.dt * self.u[1, k] # delta_k+1 = delta_k + dt * steering_rate_k + + self.opti.subject_to(self.x[0, k + 1] == x_next) + self.opti.subject_to(self.x[1, k + 1] == y_next) + self.opti.subject_to(self.x[2, k + 1] == theta_next) + self.opti.subject_to(self.x[3, k + 1] == v_next) + self.opti.subject_to(self.x[4, k + 1] == delta_next) + + # Velocity constraints + self.opti.subject_to(self.x[3, k] <= self.max_velocity) + self.opti.subject_to(self.x[3, k] >= self.min_velocity) + + # Steering angle constraints + self.opti.subject_to(self.x[4, k] <= self.max_steering) + self.opti.subject_to(self.x[4, k] >= self.min_steering) + + def set_control_input_constraints(self): + # Input constraints (acceleration, steering rate) + for k in range(self.N): + self.opti.subject_to(self.min_acceleration <= self.u[0, k]) + self.opti.subject_to(self.u[0, k] <= self.max_acceleration) + + # Steering rate constraint + self.opti.subject_to(self.min_steering_rate <= self.u[1, k]) + self.opti.subject_to(self.u[1, k] <= self.max_steering_rate) + + def set_solver_options(self): + # Set solver options + opts = {"ipopt.print_level": 0, "print_time": 0} + self.opti.solver("ipopt", opts) + + def bound_initial_state(self,state): + """ + This method checks if the initial state provided to the mpc, which could come from the localization stack, + is within the allowed bounds. If not, it clamps it to make the optimization problem feasible. + """ + # Ensure the velocity is within the specified bounds + clamped_velocity = max(self.min_velocity, min(state[3], self.max_velocity)) + + # Create a new state with the clamped velocity + bounded_state = state.copy() + bounded_state[3] = clamped_velocity + + return bounded_state + + def set_new_weight_matrix(self, matrix_name, new_value): + """ + Dynamically adjust one of the weight matrices. + Check if the matrix exists as an attribute and if so, update it. + """ + if hasattr(self, matrix_name): + try: + # Overwrite with the new dense matrix + setattr(self, matrix_name, ca.DM(new_value)) + # Redefine objective function with updated matrix. + self.set_objective_function() + except Exception as e: + print(f"Failed to update {matrix_name}: {e}") + else: + print(f"Matrix {matrix_name} does not exist in MPC class.") + + + def set_new_prediction_horizon(self, new_horizon): + """ + Dynamically adjust the active horizon for the optimization problem. + When the horizon is reduced, you simply freeze unused variables and update the reference trajectory + and state for the active part of the horizon. The solver will then only optimize over the active steps. + """ + self.current_horizon = new_horizon + + # Redefine objective function with new horizon. + self.set_objective_function() + + + def reset_parameters(self): + """ + Reset the core parameters and weight matrices of the MPC instance to their initial values. + Useful for restoring values to their original state after runtime modifications. + """ + self.current_horizon = self.N # Reset to max horizon + + # Reset weight matrices + self.Q1 = ca.DM(np.array(self.Q1_list).reshape((4, 4))) + self.Q2 = ca.DM(np.array(self.Q2_list).reshape((2, 2))) + self.Q3 = ca.DM(np.array(self.Q3_list).reshape((2, 2))) + self.Qf = ca.DM(np.array(self.Qf_list).reshape((4, 4))) + self.Qv = ca.DM(self.Qv_num) + # Reset objective function with initial values. + self.set_objective_function() \ No newline at end of file diff --git a/src/svea_core/src/svea/helpers.py b/src/svea_core/src/svea/helpers.py new file mode 100644 index 00000000..3a474d4b --- /dev/null +++ b/src/svea_core/src/svea/helpers.py @@ -0,0 +1,17 @@ +"""Helper functions for doing annoying ROS stuff.""" + +import rospy + +def load_param(name, value=None): + """Function used to get parameters from ROS parameter server + + :param name: name of the parameter + :type name: string + :param value: default value of the parameter, defaults to None + :type value: _type_, optional + :return: value of the parameter + :rtype: _type_ + """ + if value is None: + assert rospy.has_param(name), f'Missing parameter "{name}"' + return rospy.get_param(name, value) diff --git a/src/svea_core/src/svea/simulators/sim_SVEA.py b/src/svea_core/src/svea/simulators/sim_SVEA.py index 069fc60a..742e56de 100755 --- a/src/svea_core/src/svea/simulators/sim_SVEA.py +++ b/src/svea_core/src/svea/simulators/sim_SVEA.py @@ -87,6 +87,7 @@ def __init__(self, self.vehicle_name = namespace.split('/')[-2] self._map_frame_id = 'map' + self._laser_frame_id = sub_namespace + 'laser' self._odom_frame_id = sub_namespace + 'odom' self._base_link_frame_id = sub_namespace + 'base_link' diff --git a/src/svea_core/src/svea/svea_managers/path_following_sveas.py b/src/svea_core/src/svea/svea_managers/path_following_sveas.py index 6676cc02..b976392b 100755 --- a/src/svea_core/src/svea/svea_managers/path_following_sveas.py +++ b/src/svea_core/src/svea/svea_managers/path_following_sveas.py @@ -11,6 +11,9 @@ from svea.data import TrajDataHandler from svea.controllers import pure_pursuit from svea.controllers.pure_pursuit import PurePursuitController +from svea.interfaces import ActuationInterface +from svea.sensors import Lidar +from svea.data import BasicDataHandler __license__ = "MIT" __maintainer__ = "Frank Jiang, Tobias Bolin" @@ -163,3 +166,62 @@ def send_accel(self, accel, dt): self.controller.target_velocity += accel * dt steering, velocity = self.compute_control() self.send_control(steering, velocity) + + + +class SVEAManagerMPC(SVEAManager): + """Extended SVEAManager class with an additional controller_config_path parameter used to ease MPC setup. + + This class inherits from SVEAManager and adds the capability to pass a + configuration path specifically to the controller. + + The __init__ method is redefined to delay the creation of the controller, + allowing the controller to be initialized with both vehicle_name and + controller_config_path, which is not supported in the base SVEAManager. + + :param localizer: A chosen localization interface class constructor + :type localizer: class + :param controller: A chosen controller class constructor + :type controller: class + :param actuation: A chosen actuation interface class constructor, + defaults to ActuationInterface + :type actuation: class + :param data_handler: A chosen data handler class constructor, + defaults to BasicDataHandler + :type data_handler: class + :param vehicle_name: Name of vehicle; used to initialize each + software module, defaults to '' + :type vehicle_name: str + :param controller_config_path: Path to the configuration file for the controller, + defaults to '' + :type controller_config_path: str + """ + + def __init__(self, localizer, controller, + actuation=ActuationInterface, + data_handler=BasicDataHandler, + vehicle_name='', + controller_config_path=''): + + # Manually initialize components from SVEAManager except for the controller + self.vehicle_name = vehicle_name + sub_namespace = vehicle_name + '/' if vehicle_name else '' + self._emergency_topic = '{}lli/emergency'.format(sub_namespace) + + # Initialize localizer, actuation, and data handler + self.localizer = localizer(vehicle_name) + self.actuation = actuation(vehicle_name) + self.data_handler = data_handler(vehicle_name) + + # Initialize the controller with the additional controller_config_path parameter + self.controller = controller(vehicle_name, controller_config_path) + + # Initialize additional components from SVEAManager + self.lidar = Lidar() + + # Bring localizer state out into manager + self.state = self.localizer.state + self.last_state_time = None + + # Set up automatic state logging + self.localizer.add_callback(self.data_handler.log_state) \ No newline at end of file diff --git a/src/svea_examples/launch/mpc_navigation.launch b/src/svea_examples/launch/mpc_navigation.launch new file mode 100644 index 00000000..66e4b51a --- /dev/null +++ b/src/svea_examples/launch/mpc_navigation.launch @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/svea_examples/scripts/mocap_examples/mpc.py b/src/svea_examples/scripts/mocap_examples/mpc.py new file mode 100755 index 00000000..ad8f5324 --- /dev/null +++ b/src/svea_examples/scripts/mocap_examples/mpc.py @@ -0,0 +1,331 @@ +#! /usr/bin/env python3 + +import numpy as np +import math +import rospy +import tf +from svea.models.bicycle import SimpleBicycleModel +from svea.states import VehicleState +from svea.simulators.sim_SVEA import SimSVEA +from svea.interfaces import LocalizationInterface +try: + from svea_mocap.mocap import MotionCaptureInterface +except ImportError: + pass +from svea.controllers.mpc import MPC_casadi +from svea.svea_managers.path_following_sveas import SVEAManagerMPC +from svea.data import TrajDataHandler, RVIZPathHandler +from std_msgs.msg import Float32 +from geometry_msgs.msg import PoseArray, PoseStamped +from svea.simulators.viz_utils import publish_pose_array + + +def load_param(name, value=None): + if value is None: + assert rospy.has_param(name), f'Missing parameter "{name}"' + return rospy.get_param(name, value) + +class mpc_navigation: + """ + ROS Node for controlling and simulating the SVEA vehicle autonomously. + """ + + def __init__(self,sim_dt,mpc): + self.dt = sim_dt + self.mpc = mpc + + ## ROS Parameters + self.USE_RVIZ = load_param('~use_rviz', False) + self.IS_SIM = load_param('~is_sim', False) + self.STATE = load_param('~state', [-3, 0, 0, 0]) # [x,y,yaw,v] wrt map frame. Initial state for simulator. + self.MPC_FREQ = load_param('~mpc_freq', 10) + self.SVEA_MOCAP_NAME = load_param('~svea_mocap_name') + self.DELTA_S = load_param('~delta_s', 5) # static path discretization lenght + self.mpc_config_ns = load_param('~mpc_config_ns') + self.initial_horizon = load_param(f'{self.mpc_config_ns}/prediction_horizon') + self.initial_Qf = load_param(f'{self.mpc_config_ns}/final_state_weight_matrix') + + ## MPC parameters + self.GOAL_REACHED_DIST = 0.2 # The distance threshold (in meters) within which the goal is considered reached. + self.GOAL_REACHED_YAW = 0.2 # The yaw angle threshold (in radians) within which the goal orientation is considered reached. + self.UPDATE_MPC_PARAM = True # A flag indicating if the MPC parameters can be updated when the system is approaching the target. + self.RESET_MPC_PARAM = False # A flag indicating if the MPC parameters should be reset when the system is moving away from the target. + self.predicted_state = None + self.mpc_last_time = rospy.get_time() + self.mpc_dt = 1.0 / self.MPC_FREQ + self.current_horizon = self.initial_horizon + + ## Static Planner parameters + self.APPROACH_TARGET_THR = 5 # The distance threshold (in meters) to define when the system is "approaching" the target. + self.NEW_REFERENCE_THR = 1 # The distance threshold (in meters) to update the next intermediate reference point. + self.goal_pose = None + self.static_path_plan = np.empty((3, 0)) + self.current_index_static_plan = 0 + self.is_last_point = False + + ## Other parameters + self.steering = 0 + self.velocity = 0 + self.state = [] + + if self.IS_SIM is False: + # add steering bias of sveas. {(svea0:28),(svea7:7)} + unitless_steering = 7 + PERC_TO_LLI_COEFF = 1.27 + MAX_STEERING_ANGLE = 40 * math.pi / 180 + steer_percent = unitless_steering / PERC_TO_LLI_COEFF + self.steering_bias = (steer_percent / 100.0) * MAX_STEERING_ANGLE + else: + self.steering_bias = 0 + + self.create_simulator_and_SVEAmanager() + self.init_publishers() + self.init_subscribers() + + def run(self): + while self.keep_alive(): + self.spin() + + def keep_alive(self): + return not rospy.is_shutdown() + + def spin(self): + # Retrieve current state from SVEA localization + state = self.svea.wait_for_state() + self.state = [state.x, state.y, state.yaw, state.v] + # If a static path plan has been computed, run the mpc. + if self.static_path_plan.size > 0 : + # If enough time has passed, run the MPC computation + current_time = rospy.get_time() + measured_dt = current_time - self.mpc_last_time + if measured_dt >= self.mpc_dt : + reference_trajectory, distance_to_next_point = self.get_mpc_current_reference() + if self.is_last_point and distance_to_next_point <= self.APPROACH_TARGET_THR and self.UPDATE_MPC_PARAM: + # Update the prediction horizon and final state weight matrix only once when approaching target + new_Qf = np.array([70, 0, 0, 0, + 0, 70, 0, 0, + 0, 0, 20, 0, + 0, 0, 0, 0]).reshape((4, 4)) + self.svea.controller.set_new_weight_matrix('Qf', new_Qf) + self.UPDATE_MPC_PARAM = False + self.RESET_MPC_PARAM = True # Allow resetting when moving away + + elif self.is_last_point and distance_to_next_point > self.APPROACH_TARGET_THR and self.RESET_MPC_PARAM: + # Reset to initial values only once when moving away from target + self.current_horizon = self.initial_horizon + self.svea.controller.set_new_prediction_horizon(self.initial_horizon) + self.svea.controller.set_new_weight_matrix('Qf', self.initial_Qf) + self.UPDATE_MPC_PARAM = True # Allow updating again when re-approaching + self.RESET_MPC_PARAM = False # Prevent repeated resetting + + if not self.is_goal_reached(distance_to_next_point): + # Run the MPC to compute control + steering_rate, acceleration = self.svea.controller.compute_control([self.state[0],self.state[1],self.state[2],self.velocity,self.steering], reference_trajectory) + self.steering += steering_rate * measured_dt + self.velocity += acceleration * measured_dt + self.predicted_state = self.svea.controller.get_optimal_states() + # Publish the predicted path + self.publish_trajectory(self.predicted_state[0:3, :self.current_horizon+1],self.predicted_trajectory_pub) + else: + # Stop the vehicle if the goal is reached + self.steering, self.velocity = 0, 0 + # Update the last time the MPC was computed + self.mpc_last_time = current_time + + # Publish the latest control target and the estimated(mocap) / measured(in/out loc.) speed. + self.publish_to_foxglove(self.steering, self.velocity, self.state[3]) + # Visualization data and send control + self.svea.send_control(self.steering + self.steering_bias, self.velocity) + self.svea.visualize_data() + + def init_publishers(self): + self.steering_pub = rospy.Publisher('/target_steering_angle', Float32, queue_size=1) + self.velocity_pub = rospy.Publisher('/target_speed', Float32, queue_size=1) + self.velocity_measured_pub = rospy.Publisher('/measured_speed', Float32, queue_size=1) # estimated/measured speed + self.predicted_trajectory_pub = rospy.Publisher('/predicted_path', PoseArray, queue_size=1) + self.static_trajectory_pub = rospy.Publisher('/static_path', PoseArray, queue_size=1) + + def init_subscribers(self): + self.mpc_target_sub = rospy.Subscriber('/mpc_target', PoseStamped, self.mpc_target_callback) + + def create_simulator_and_SVEAmanager(self): + # initial state for simulator. + state = VehicleState(*self.STATE) + # Create simulators, models, managers, etc. + if self.IS_SIM: + + # simulator need a model to simulate + self.sim_model = SimpleBicycleModel(state) + + # start the simulator immediately, but paused + self.simulator = SimSVEA(self.sim_model, + dt=self.dt, + run_lidar=True, + start_paused=True, + publish_odometry=True, + publish_pose=True).start() + + # start the SVEA manager (needed for both sim and real world) + self.svea = SVEAManagerMPC(localizer = LocalizationInterface if self.IS_SIM else MotionCaptureInterface, + controller = self.mpc, + data_handler=RVIZPathHandler if self.USE_RVIZ else TrajDataHandler, + vehicle_name='', + controller_config_path = self.mpc_config_ns) + if not self.IS_SIM: + self.svea.localizer.update_name(self.SVEA_MOCAP_NAME) + + self.svea.start(wait=True) + + # everything ready to go -> unpause simulator + if self.IS_SIM: + self.simulator.toggle_pause_simulation() + + def publish_to_foxglove(self,target_steering,target_speed,measured_speed): + self.steering_pub.publish(target_steering) + self.velocity_pub.publish(target_speed) + self.velocity_measured_pub.publish(measured_speed) + + def get_mpc_current_reference(self): + """ + Retrieves the current reference state for the MPC based on the SVEA's current position. + + Updates the current index in the static path plan if the robot is close to the next point. If at the last point, + it calculates the distance to that point instead. + + Returns: + tuple: (x_ref, distance_to_next_point), where x_ref is the reference state for the prediction horizon + (shape: [3, N+1]) and distance_to_next_point is the distance to the next or last reference point. + """ + if self.is_last_point is False: + distance_to_next_point = self.compute_distance(self.state,self.static_path_plan[:,self.current_index_static_plan]) + if self.current_index_static_plan == self.static_path_plan.shape[1] - 1: + self.is_last_point = True + if distance_to_next_point < self.NEW_REFERENCE_THR and not self.is_last_point: + self.current_index_static_plan += 1 + reference_state = self.static_path_plan[:,self.current_index_static_plan] + x_ref = np.tile(reference_state, (self.initial_horizon+1, 1)).T + return x_ref, distance_to_next_point + + else: + distance_to_last_point = self.compute_distance(self.state,self.static_path_plan[:,-1]) + reference_state = self.static_path_plan[:,-1] + x_ref = np.tile(reference_state, (self.initial_horizon+1, 1)).T + return x_ref, distance_to_last_point + + def mpc_target_callback(self, msg): + """ + Callback function that sets a new goal position and calculates a trajectory. + :param msg: PoseStamped message containing the goal position. + """ + # Set the goal position and log the new goal + self.goal_pose = msg + rospy.loginfo(f"New goal position received: ({self.goal_pose.pose.position.x}, {self.goal_pose.pose.position.y})") + # Compute trajectory (straight line from current position to goal) + self.compute_trajectory() + + def get_yaw_from_pose(self,pose_stamped): + """ + Extracts the yaw from a PoseStamped message. + """ + # Convert the quaternion to Euler angles + orientation = pose_stamped.pose.orientation + quaternion = (orientation.x, orientation.y, orientation.z, orientation.w) + + # Use tf to convert quaternion to Euler angles + euler = tf.transformations.euler_from_quaternion(quaternion) + + # Return the yaw + return euler[2] + + def compute_trajectory(self): + """ + Compute a straight-line trajectory from the current position to the goal using DELTA_S, + including the heading for each point, and publish the path. + """ + if not self.goal_pose or not self.state: + rospy.logwarn("Missing goal or current state for trajectory computation.") + return + # Reset previous trajectory related variables + self.current_index_static_plan = 0 + self.is_last_point = False + self.static_path_plan = np.empty((3, 0)) + # reset control actions + self.velocity = 0 + self.steering = 0 + # reset mpc parameters + self.UPDATE_MPC_PARAM = True + self.RESET_MPC_PARAM = False + self.mpc_last_time = rospy.get_time() + self.svea.controller.reset_parameters() + + # Calculate the straight-line trajectory between current state and goal position + start_x, start_y = self.state[0], self.state[1] + goal_x, goal_y = self.goal_pose.pose.position.x, self.goal_pose.pose.position.y + distance = self.compute_distance([start_x, start_y], [goal_x, goal_y]) + goal_yaw = self.get_yaw_from_pose(self.goal_pose) + + # Compute intermediate points at intervals of DELTA_S + num_points = int(distance // self.DELTA_S) + + for i in range(num_points): + ratio = ((i+1) * self.DELTA_S) / distance + x = start_x + ratio * (goal_x - start_x) + y = start_y + ratio * (goal_y - start_y) + + # Calculate the heading for this point + heading = math.atan2(goal_y - start_y, goal_x - start_x) + + # Stack the computed point as a new column in the array + new_point = np.array([[x], [y], [heading]]) + self.static_path_plan = np.hstack((self.static_path_plan, new_point)) + + # Calculate distance between the last appended point and the goal point. + if self.static_path_plan.size != 0: + last_appended_x = self.static_path_plan[0, -1] + last_appended_y = self.static_path_plan[1, -1] + distance = self.compute_distance([last_appended_x, last_appended_y], [goal_x, goal_y]) + + # If the distance to the goal is too small, replace the last point with the goal directly. + if distance < self.DELTA_S / 2: + # Replace the last appended point with the goal point + self.static_path_plan[:, -1] = np.array([goal_x, goal_y, goal_yaw]) + else: + # Otherwise, append the last point as usual + new_point = np.array([[goal_x], [goal_y], [goal_yaw]]) + self.static_path_plan = np.hstack((self.static_path_plan, new_point)) + else: + # Otherwise, append the last point as usual + new_point = np.array([[goal_x], [goal_y], [goal_yaw]]) + self.static_path_plan = np.hstack((self.static_path_plan, new_point)) + + # Publish the static path + self.publish_trajectory(self.static_path_plan,self.static_trajectory_pub) + + def is_goal_reached(self,distance): + if not self.is_last_point: + return False + elif distance < self.GOAL_REACHED_DIST: + yaw_error = self.state[2] - self.static_path_plan[2,-1] + if abs(yaw_error) < self.GOAL_REACHED_YAW: + return True + else: + return False + else: + return False + + def compute_distance(self,point1,point2): + return np.sqrt((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2) + + def publish_trajectory(self, points, publisher): + pointx, pointy, pointyaw = [], [], [] + for i in range(points.shape[1]): # points should be [3, N] where N is the horizon length + pointx.append(points[0, i]) # x values + pointy.append(points[1, i]) # y values + pointyaw.append(points[2, i]) # yaw values + # Publish the trajectory as a PoseArray + publish_pose_array(publisher, pointx, pointy, pointyaw) + +if __name__ == '__main__': + rospy.init_node('mpc_navigation') + node = mpc_navigation(sim_dt = 0.01, mpc = MPC_casadi) + node.run() \ No newline at end of file