Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions dev/mar_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import time
import numpy as np
import mavcore
import mavcore.messages as messages
import mavcore.protocols as protocols


device = mavcore.MAVDevice("udp:127.0.0.1:14550")

boot_time_ms = int(time.time() * 1000)

local_pos = messages.LocalPositionNED()
device.add_listener(local_pos)

request_local_pos = protocols.RequestMessageProtocol(messages.IntervalMessageID.LOCAL_POSITION_NED, rate_hz=50.0)
device.run_protocol(request_local_pos)

imu = messages.RawIMU()
device.add_listener(imu)

request_imu = protocols.RequestMessageProtocol(messages.IntervalMessageID.RAW_IMU, rate_hz=50.0)
device.run_protocol(request_imu)

time.sleep(1)

gs = np.linalg.norm([imu.xac, imu.yac, imu.zac])
highest = gs
print(f"G: {highest}", flush=True)

time.sleep(1)

request_arm = protocols.ArmProtocol()
device.run_protocol(request_arm)

request_guided = protocols.SetModeProtocol(messages.FlightMode.GUIDED)
device.run_protocol(request_guided)

takeoff = protocols.TakeoffProtocol(300.0)
device.run_protocol(takeoff)

while local_pos.get_pos_ned()[2] > -295.0:
print(f"Altitude: {local_pos.get_pos_ned()[2]} m", flush=True)
time.sleep(1)

dive = protocols.AttitudeSetpointProtocol(local_pos, imu, boot_time_ms)
device.run_protocol(dive)

request_brake = protocols.SetModeProtocol(messages.FlightMode.BRAKE)
device.run_protocol(request_brake)

highest = 0.0
i = 1000

while i > 0:
gs = np.linalg.norm([imu.xac, imu.yac, imu.zac])
if gs > highest:
highest = gs
i -= 1
time.sleep(0.01)
print(f"highest G: {highest}", flush=True)
2 changes: 2 additions & 0 deletions messages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from mavcore.messages.heartbeat_msg import MAVState as MAVState
from mavcore.messages.takeoff_msg import MAVFrame as MAVFrame
from mavcore.messages.status_text_msg import MAVSeverity as MAVSeverity
from mavcore.messages.raw_imu_msg import RawIMU as RawIMU
from mavcore.messages.rc_channels_msg import RCChannels as RCChannels
from mavcore.messages.rc_override_msg import RCOverride as RCOverride
from mavcore.messages.reboot_msg import RebootMsg as RebootMsg
Expand All @@ -45,6 +46,7 @@
)
from mavcore.messages.set_home_msg import SetHome as SetHome
from mavcore.messages.set_mode_msg import SetMode as SetMode
from mavcore.messages.attitude_target_msg import SetpointAttitude as SetpointAttitude
from mavcore.messages.setpoint_local_msg import SetpointLocal as SetpointLocal
from mavcore.messages.setpoint_velocity_msg import SetpointVelocity as SetpointVelocity
from mavcore.messages.status_text_msg import StatusText as StatusText
Expand Down
109 changes: 109 additions & 0 deletions messages/attitude_target_msg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import pymavlink.dialects.v20.all as dialect
import time
import numpy as np
from mavcore.mav_message import MAVMessage


class SetpointAttitude(MAVMessage):
"""
Attitude + thrust setpoint using MAVLink SET_ATTITUDE_TARGET.

Quaternion is (w, x, y, z).
Body rates are in rad/s.
Thrust is typically normalized [0.0, 1.0] on ArduCopter, depending on GUID_OPTIONS.
"""

def __init__(
self,
target_system: int,
target_component: int,
boot_time_ms: int,
q: np.ndarray,
thrust: float,
body_roll_rate: float = 0.0,
body_pitch_rate: float = 0.0,
body_yaw_rate: float = 0.0,
type_mask: int = -1,
):
super().__init__("CUSTOM_SETPOINT_ATTITUDE")

self.target_system = target_system
self.target_component = target_component

self.boot_time_ms = boot_time_ms # time since system boot in milliseconds (for sync)

# quaternion (w, x, y, z)
self.q = np.array(q, dtype=float)
if self.q.shape != (4,):
raise ValueError("q must be a 4-element array: [w, x, y, z]")

# thrust 0..1
self.thrust = float(thrust)

# body rates (rad/s)
self.body_roll_rate = float(body_roll_rate)
self.body_pitch_rate = float(body_pitch_rate)
self.body_yaw_rate = float(body_yaw_rate)

# If not provided, default to "use quaternion + thrust, ignore all body rates"
# MAVLink SET_ATTITUDE_TARGET type_mask bits:
# 0 ignore body roll rate
# 1 ignore body pitch rate
# 2 ignore body yaw rate
# 6 ignore attitude (quaternion)
# 7 ignore thrust
self.type_mask = int(type_mask) if type_mask != -1 else int(0b00000111)

def encode(self, system_id, component_id):
return dialect.MAVLink_set_attitude_target_message(
time_boot_ms=int(time.time() * 1000 - self.boot_time_ms),
target_system=int(self.target_system),
target_component=int(self.target_component),
type_mask=int(self.type_mask),
q=[float(self.q[0]), float(self.q[1]), float(self.q[2]), float(self.q[3])],
body_roll_rate=float(self.body_roll_rate),
body_pitch_rate=float(self.body_pitch_rate),
body_yaw_rate=float(self.body_yaw_rate),
thrust=float(self.thrust),
)

def load_quat_thrust(self, q: np.ndarray, thrust: float):
q = np.array(q, dtype=float)
if q.shape != (4,):
raise ValueError("q must be a 4-element array: [w, x, y, z]")
self.q = q
self.thrust = float(thrust)

def load_body_rates(self, pqr: np.ndarray):
pqr = np.array(pqr, dtype=float)
if pqr.shape != (3,):
raise ValueError("pqr must be 3-element array: [p, q, r] rad/s")
self.body_roll_rate = float(pqr[0])
self.body_pitch_rate = float(pqr[1])
self.body_yaw_rate = float(pqr[2])

def set_ignore_body_rates(self, ignore: bool = True):
if ignore:
self.type_mask |= (1 << 0) | (1 << 1) | (1 << 2)
else:
self.type_mask &= ~((1 << 0) | (1 << 1) | (1 << 2))

def set_ignore_attitude(self, ignore: bool = True):
if ignore:
self.type_mask |= (1 << 6)
else:
self.type_mask &= ~(1 << 6)

def set_ignore_thrust(self, ignore: bool = True):
if ignore:
self.type_mask |= (1 << 7)
else:
self.type_mask &= ~(1 << 7)

def __repr__(self):
return (
super().__repr__()
+ f", boot: {self.boot_time_ms}, q(wxyz): [{self.q[0]:.4f}, {self.q[1]:.4f}, {self.q[2]:.4f}, {self.q[3]:.4f}]"
+ f", thrust: {self.thrust:.3f}, pqr(rad/s): [{self.body_roll_rate:.3f}, {self.body_pitch_rate:.3f}, {self.body_yaw_rate:.3f}]"
+ f", type_mask: {bin(self.type_mask)}"
)
16 changes: 16 additions & 0 deletions messages/raw_imu_msg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import numpy as np
from mavcore.mav_message import MAVMessage, thread_safe


class RawIMU(MAVMessage):

def __init__(self):
super().__init__("RAW_IMU")
self.xac = 0.0
self.yac = 0.0
self.zac = 0.0

def decode(self, msg):
self.xac = msg.xacc
self.yac = msg.yacc
self.zac = msg.zacc
1 change: 1 addition & 0 deletions messages/request_msg_interval_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class IntervalMessageID(Enum):
SYS_STATUS = 1
SYSTEM_TIME = 2
GPS_RAW_INT = 24
RAW_IMU = 27
ATTITUDE = 30
ATTITUDE_QUATERNION = 31
LOCAL_POSITION_NED = 32
Expand Down
1 change: 1 addition & 0 deletions protocols/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from mavcore.protocols.attitude_setpoint_protocol import AttitudeSetpointProtocol as AttitudeSetpointProtocol
from mavcore.protocols.battery_update_protocol import (
UpdateBatteryProtocol as UpdateBatteryProtocol,
)
Expand Down
43 changes: 43 additions & 0 deletions protocols/attitude_setpoint_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from mavcore.mav_protocol import MAVProtocol
from mavcore.messages.local_position_msg import LocalPosition as LocalPositionNED
from mavcore.messages.raw_imu_msg import RawIMU
from mavcore.messages.attitude_target_msg import SetpointAttitude
import time
import numpy as np


class AttitudeSetpointProtocol(MAVProtocol):

def __init__(
self,
current_pos: LocalPositionNED,
imu: RawIMU,
boot_time_ms: int,
target_system: int = 1,
target_component: int = 0,
):
super().__init__()
self.current_pos = current_pos
self.imu = imu
self.boot_time_ms = boot_time_ms
self.target_system = target_system
self.target_component = target_component

self.q = np.array([0.0, 1.0, 0.0, 0.0])
self.thrust = 0.1

self.setpoint_msg = SetpointAttitude(
self.target_system, self.target_component, self.boot_time_ms, self.q, self.thrust
)

def run(self, sender, receiver):
highest = 0.0
while self.current_pos.get_pos_ned()[2] < -200.0:
sender.send_msg(self.setpoint_msg)
time.sleep(0.02)
gs = np.linalg.norm([self.imu.xac, self.imu.yac, self.imu.zac])
if gs > highest:
highest = gs
#print(f"highest G: {highest}", flush=True)


5 changes: 4 additions & 1 deletion protocols/request_msg_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def __init__(
target_system: int = 1,
target_component: int = 0,
rate_hz: float = 4.0,
wait_for_ack: bool = True
):
super().__init__()
self.msg_id = msg_id
self.target_system = target_system
self.target_component = target_component
self.wait_for_ack = wait_for_ack

self.mode_msg = RequestMessageInterval(
self.target_system, self.target_component, self.msg_id, rate_hz
Expand All @@ -29,4 +31,5 @@ def __init__(
def run(self, sender, receiver):
future_ack = receiver.wait_for_msg(self.ack_msg, blocking=False)
sender.send_msg(self.mode_msg)
future_ack.wait_until_finished()
if self.wait_for_ack:
future_ack.wait_until_finished()
Loading