From 4cc9c18bed1a5124dc725dd31eef075d28572fd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quentin=20Gallou=C3=A9dec?= <45557362+qgallouedec@users.noreply.github.com> Date: Thu, 20 Jun 2024 18:46:02 +0200 Subject: [PATCH] teleoperate --- examples/teleoperate_simulated_robot.py | 56 ++++ gym_lowcostrobot/interface/__init__.py | 0 gym_lowcostrobot/interface/dynamixel.py | 311 ++++++++++++++++++ gym_lowcostrobot/interface/robot.py | 173 ++++++++++ gym_lowcostrobot/interface/simulated_robot.py | 255 ++++++++++++++ 5 files changed, 795 insertions(+) create mode 100644 examples/teleoperate_simulated_robot.py create mode 100644 gym_lowcostrobot/interface/__init__.py create mode 100644 gym_lowcostrobot/interface/dynamixel.py create mode 100644 gym_lowcostrobot/interface/robot.py create mode 100644 gym_lowcostrobot/interface/simulated_robot.py diff --git a/examples/teleoperate_simulated_robot.py b/examples/teleoperate_simulated_robot.py new file mode 100644 index 0000000..4e8cf7c --- /dev/null +++ b/examples/teleoperate_simulated_robot.py @@ -0,0 +1,56 @@ +import threading +import time + +import mujoco.viewer +import numpy as np + +from gym_lowcostrobot.interface.dynamixel import Dynamixel +from gym_lowcostrobot.interface.robot import Robot +from gym_lowcostrobot.interface.simulated_robot import SimulatedRobot + + +def read_leader_position(): + global target_pos + while True: + target_pos = np.array(leader.read_position()) + target_pos = (target_pos / 2048 - 1) * 3.14 + target_pos[1] = -target_pos[1] + target_pos[3] = -target_pos[3] + target_pos[4] = -target_pos[4] + + +if __name__ == "__main__": + leader_dynamixel = Dynamixel.Config(baudrate=1_000_000, device_name="/dev/tty.usbmodem57380045631").instantiate() + leader = Robot(leader_dynamixel, servo_ids=[1, 2, 3, 6, 7]) + leader.set_trigger_torque() + + m = mujoco.MjModel.from_xml_path("gym_lowcostrobot/assets/low_cost_robot_6dof/reach_cube.xml") + d = mujoco.MjData(m) + r = SimulatedRobot(m, d) + + target_pos = np.zeros(5) + + # Start the thread for reading leader position + leader_thread = threading.Thread(target=read_leader_position) + leader_thread.daemon = True + leader_thread.start() + + with mujoco.viewer.launch_passive(m, d) as viewer: + start = time.time() + while viewer.is_running(): + # Use the latest target_pos + step_start = time.time() + target_pos_local = target_pos.copy() + # print(f'target pos copy {time.time() - step_start}') + r.set_target_pos(target_pos_local) + # print(f'set targtee pos copy {time.time() - step_start}') + mujoco.mj_step(m, d) + # print(f'mjstep {time.time() - step_start}') + viewer.sync() + # print(f'viewer sync {time.time() - step_start}') + + # Rudimentary time keeping, will drift relative to wall clock. + time_until_next_step = m.opt.timestep - (time.time() - step_start) + # print(f'time until next step {time_until_next_step}') + if time_until_next_step > 0: + time.sleep(time_until_next_step) diff --git a/gym_lowcostrobot/interface/__init__.py b/gym_lowcostrobot/interface/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gym_lowcostrobot/interface/dynamixel.py b/gym_lowcostrobot/interface/dynamixel.py new file mode 100644 index 0000000..487659d --- /dev/null +++ b/gym_lowcostrobot/interface/dynamixel.py @@ -0,0 +1,311 @@ +from __future__ import annotations + +import enum +import math +import os +from dataclasses import dataclass + +from dynamixel_sdk import * # Uses Dynamixel SDK library + + +class ReadAttribute(enum.Enum): + TEMPERATURE = 146 + VOLTAGE = 145 + VELOCITY = 128 + POSITION = 132 + CURRENT = 126 + PWM = 124 + HARDWARE_ERROR_STATUS = 70 + HOMING_OFFSET = 20 + BAUDRATE = 8 + + +class OperatingMode(enum.Enum): + VELOCITY = 1 + POSITION = 3 + CURRENT_CONTROLLED_POSITION = 5 + PWM = 16 + UNKNOWN = -1 + + +class Dynamixel: + ADDR_TORQUE_ENABLE = 64 + ADDR_GOAL_POSITION = 116 + ADDR_VELOCITY_LIMIT = 44 + ADDR_GOAL_PWM = 100 + OPERATING_MODE_ADDR = 11 + POSITION_I = 82 + POSITION_P = 84 + ADDR_ID = 7 + + @dataclass + class Config: + def instantiate(self): + return Dynamixel(self) + + baudrate: int = 57600 + protocol_version: float = 2.0 + device_name: str = "" # /dev/tty.usbserial-1120' + dynamixel_id: int = 1 + + def __init__(self, config: Config): + self.config = config + self.connect() + + def connect(self): + if self.config.device_name == "": + for port_name in os.listdir("/dev"): + if "ttyUSB" in port_name or "ttyACM" in port_name: + self.config.device_name = "/dev/" + port_name + print(f"using device {self.config.device_name}") + self.portHandler = PortHandler(self.config.device_name) + # self.portHandler.LA + self.packetHandler = PacketHandler(self.config.protocol_version) + if not self.portHandler.openPort(): + raise Exception(f"Failed to open port {self.config.device_name}") + + if not self.portHandler.setBaudRate(self.config.baudrate): + raise Exception(f"failed to set baudrate to {self.config.baudrate}") + + # self.operating_mode = OperatingMode.UNKNOWN + # self.torque_enabled = False + # self._disable_torque() + + self.operating_modes = [None for _ in range(32)] + self.torque_enabled = [None for _ in range(32)] + return True + + def disconnect(self): + self.portHandler.closePort() + + def set_goal_position(self, motor_id, goal_position): + # if self.operating_modes[motor_id] is not OperatingMode.POSITION: + # self._disable_torque(motor_id) + # self.set_operating_mode(motor_id, OperatingMode.POSITION) + + # if not self.torque_enabled[motor_id]: + # self._enable_torque(motor_id) + + # self._enable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx( + self.portHandler, motor_id, self.ADDR_GOAL_POSITION, goal_position + ) + # self._process_response(dxl_comm_result, dxl_error) + # print(f'set position of motor {motor_id} to {goal_position}') + + def set_pwm_value(self, motor_id: int, pwm_value, tries=3): + if self.operating_modes[motor_id] is not OperatingMode.PWM: + self._disable_torque(motor_id) + self.set_operating_mode(motor_id, OperatingMode.PWM) + + if not self.torque_enabled[motor_id]: + self._enable_torque(motor_id) + # print(f'enabling torque') + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx( + self.portHandler, motor_id, self.ADDR_GOAL_PWM, pwm_value + ) + # self._process_response(dxl_comm_result, dxl_error) + # print(f'set pwm of motor {motor_id} to {pwm_value}') + if dxl_comm_result != COMM_SUCCESS: + if tries <= 1: + raise ConnectionError(f"dxl_comm_result: {self.packetHandler.getTxRxResult(dxl_comm_result)}") + else: + print(f"dynamixel pwm setting failure trying again with {tries - 1} tries") + self.set_pwm_value(motor_id, pwm_value, tries=tries - 1) + elif dxl_error != 0: + print(f"dxl error {dxl_error}") + raise ConnectionError(f"dynamixel error: {self.packetHandler.getTxRxResult(dxl_error)}") + + def read_temperature(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.TEMPERATURE, 1) + + def read_velocity(self, motor_id: int): + pos = self._read_value(motor_id, ReadAttribute.VELOCITY, 4) + if pos > 2**31: + pos -= 2**32 + # print(f'read position {pos} for motor {motor_id}') + return pos + + def read_position(self, motor_id: int): + pos = self._read_value(motor_id, ReadAttribute.POSITION, 4) + if pos > 2**31: + pos -= 2**32 + # print(f'read position {pos} for motor {motor_id}') + return pos + + def read_position_degrees(self, motor_id: int) -> float: + return (self.read_position(motor_id) / 4096) * 360 + + def read_position_radians(self, motor_id: int) -> float: + return (self.read_position(motor_id) / 4096) * 2 * math.pi + + def read_current(self, motor_id: int): + current = self._read_value(motor_id, ReadAttribute.CURRENT, 2) + if current > 2**15: + current -= 2**16 + return current + + def read_present_pwm(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.PWM, 2) + + def read_hardware_error_status(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.HARDWARE_ERROR_STATUS, 1) + + def disconnect(self): + self.portHandler.closePort() + + def set_id(self, old_id, new_id, use_broadcast_id: bool = False): + """ + sets the id of the dynamixel servo + @param old_id: current id of the servo + @param new_id: new id + @param use_broadcast_id: set ids of all connected dynamixels if True. + If False, change only servo with self.config.id + @return: + """ + if use_broadcast_id: + current_id = 254 + else: + current_id = old_id + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, current_id, self.ADDR_ID, new_id) + self._process_response(dxl_comm_result, dxl_error, old_id) + self.config.id = id + + def _enable_torque(self, motor_id): + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, self.ADDR_TORQUE_ENABLE, 1) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.torque_enabled[motor_id] = True + + def _disable_torque(self, motor_id): + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, self.ADDR_TORQUE_ENABLE, 0) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.torque_enabled[motor_id] = False + + def _process_response(self, dxl_comm_result: int, dxl_error: int, motor_id: int): + if dxl_comm_result != COMM_SUCCESS: + raise ConnectionError(f"dxl_comm_result for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_comm_result)}") + elif dxl_error != 0: + print(f"dxl error {dxl_error}") + raise ConnectionError(f"dynamixel error for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_error)}") + + def set_operating_mode(self, motor_id: int, operating_mode: OperatingMode): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx( + self.portHandler, motor_id, self.OPERATING_MODE_ADDR, operating_mode.value + ) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.operating_modes[motor_id] = operating_mode + + def set_pwm_limit(self, motor_id: int, limit: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, 36, limit) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_velocity_limit(self, motor_id: int, velocity_limit): + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx( + self.portHandler, motor_id, self.ADDR_VELOCITY_LIMIT, velocity_limit + ) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_P(self, motor_id: int, P: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, self.POSITION_P, P) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_I(self, motor_id: int, I: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, self.POSITION_I, I) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def read_home_offset(self, motor_id: int): + self._disable_torque(motor_id) + # dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + # ReadAttribute.HOMING_OFFSET.value, home_position) + home_offset = self._read_value(motor_id, ReadAttribute.HOMING_OFFSET, 4) + # self._process_response(dxl_comm_result, dxl_error) + self._enable_torque(motor_id) + return home_offset + + def set_home_offset(self, motor_id: int, home_position: int): + self._disable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx( + self.portHandler, motor_id, ReadAttribute.HOMING_OFFSET.value, home_position + ) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self._enable_torque(motor_id) + + def set_baudrate(self, motor_id: int, baudrate): + # translate baudrate into dynamixel baudrate setting id + if baudrate == 57600: + baudrate_id = 1 + elif baudrate == 1_000_000: + baudrate_id = 3 + elif baudrate == 2_000_000: + baudrate_id = 4 + elif baudrate == 3_000_000: + baudrate_id = 5 + elif baudrate == 4_000_000: + baudrate_id = 6 + else: + raise Exception("baudrate not implemented") + + self._disable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx( + self.portHandler, motor_id, ReadAttribute.BAUDRATE.value, baudrate_id + ) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def _read_value(self, motor_id, attribute: ReadAttribute, num_bytes: int, tries=10): + try: + if num_bytes == 1: + value, dxl_comm_result, dxl_error = self.packetHandler.read1ByteTxRx( + self.portHandler, motor_id, attribute.value + ) + elif num_bytes == 2: + value, dxl_comm_result, dxl_error = self.packetHandler.read2ByteTxRx( + self.portHandler, motor_id, attribute.value + ) + elif num_bytes == 4: + value, dxl_comm_result, dxl_error = self.packetHandler.read4ByteTxRx( + self.portHandler, motor_id, attribute.value + ) + except Exception: + if tries == 0: + raise Exception + else: + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + if dxl_comm_result != COMM_SUCCESS: + if tries <= 1: + # print("%s" % self.packetHandler.getTxRxResult(dxl_comm_result)) + raise ConnectionError(f"dxl_comm_result {dxl_comm_result} for servo {motor_id} value {value}") + else: + print(f"dynamixel read failure for servo {motor_id} trying again with {tries - 1} tries") + time.sleep(0.02) + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + elif dxl_error != 0: # # print("%s" % self.packetHandler.getRxPacketError(dxl_error)) + # raise ConnectionError(f'dxl_error {dxl_error} binary ' + "{0:b}".format(37)) + if tries == 0 and dxl_error != 128: + raise Exception(f"Failed to read value from motor {motor_id} error is {dxl_error}") + else: + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + return value + + def set_home_position(self, motor_id: int): + print(f"setting home position for motor {motor_id}") + self.set_home_offset(motor_id, 0) + current_position = self.read_position(motor_id) + print(f"position before {current_position}") + self.set_home_offset(motor_id, -current_position) + # dynamixel.set_home_offset(motor_id, -4096) + # dynamixel.set_home_offset(motor_id, -4294964109) + current_position = self.read_position(motor_id) + # print(f'signed position {current_position - 2** 32}') + print(f"position after {current_position}") + + +if __name__ == "__main__": + dynamixel = Dynamixel.Config(baudrate=1_000_000, device_name="/dev/tty.usbmodem57380045631").instantiate() + motor_id = 1 + pos = dynamixel.read_position(motor_id) + for i in range(10): + s = time.monotonic() + pos = dynamixel.read_position(motor_id) + delta = time.monotonic() - s + print(f"read position took {delta}") + print(f"position {pos}") diff --git a/gym_lowcostrobot/interface/robot.py b/gym_lowcostrobot/interface/robot.py new file mode 100644 index 0000000..aeda544 --- /dev/null +++ b/gym_lowcostrobot/interface/robot.py @@ -0,0 +1,173 @@ +import time +from enum import Enum, auto +from typing import Union + +import numpy as np +from dynamixel import OperatingMode, ReadAttribute +from dynamixel_sdk import DXL_HIBYTE, DXL_HIWORD, DXL_LOBYTE, DXL_LOWORD, GroupSyncRead, GroupSyncWrite + + +class MotorControlType(Enum): + PWM = auto() + POSITION_CONTROL = auto() + DISABLED = auto() + UNKNOWN = auto() + + +class Robot: + # def __init__(self, device_name: str, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + def __init__(self, dynamixel, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + self.servo_ids = servo_ids + self.dynamixel = dynamixel + # self.dynamixel = Dynamixel.Config(baudrate=baudrate, device_name=device_name).instantiate() + self.position_reader = GroupSyncRead( + self.dynamixel.portHandler, self.dynamixel.packetHandler, ReadAttribute.POSITION.value, 4 + ) + for id in self.servo_ids: + self.position_reader.addParam(id) + + self.velocity_reader = GroupSyncRead( + self.dynamixel.portHandler, self.dynamixel.packetHandler, ReadAttribute.VELOCITY.value, 4 + ) + for id in self.servo_ids: + self.velocity_reader.addParam(id) + + self.pos_writer = GroupSyncWrite( + self.dynamixel.portHandler, self.dynamixel.packetHandler, self.dynamixel.ADDR_GOAL_POSITION, 4 + ) + for id in self.servo_ids: + self.pos_writer.addParam(id, [2048]) + + self.pwm_writer = GroupSyncWrite( + self.dynamixel.portHandler, self.dynamixel.packetHandler, self.dynamixel.ADDR_GOAL_PWM, 2 + ) + for id in self.servo_ids: + self.pwm_writer.addParam(id, [2048]) + self._disable_torque() + self.motor_control_state = MotorControlType.DISABLED + + def read_position(self, tries=2): + """ + Reads the joint positions of the robot. 2048 is the center position. 0 and 4096 are 180 degrees in each direction. + :param tries: maximum number of tries to read the position + :return: list of joint positions in range [0, 4096] + """ + result = self.position_reader.txRxPacket() + if result != 0: + if tries > 0: + return self.read_position(tries=tries - 1) + else: + print("failed to read position!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + positions = [] + for id in self.servo_ids: + position = self.position_reader.getData(id, ReadAttribute.POSITION.value, 4) + if position > 2**31: + position -= 2**32 + positions.append(position) + return positions + + def read_velocity(self): + """ + Reads the joint velocities of the robot. + :return: list of joint velocities, + """ + self.velocity_reader.txRxPacket() + velocties = [] + for id in self.servo_ids: + velocity = self.velocity_reader.getData(id, ReadAttribute.VELOCITY.value, 4) + if velocity > 2**31: + velocity -= 2**32 + velocties.append(velocity) + return velocties + + def set_goal_pos(self, action): + """ + + :param action: list or numpy array of target joint positions in range [0, 4096] + """ + if self.motor_control_state is not MotorControlType.POSITION_CONTROL: + self._set_position_control() + for i, motor_id in enumerate(self.servo_ids): + data_write = [ + DXL_LOBYTE(DXL_LOWORD(action[i])), + DXL_HIBYTE(DXL_LOWORD(action[i])), + DXL_LOBYTE(DXL_HIWORD(action[i])), + DXL_HIBYTE(DXL_HIWORD(action[i])), + ] + self.pos_writer.changeParam(motor_id, data_write) + + self.pos_writer.txPacket() + + def set_pwm(self, action): + """ + Sets the pwm values for the servos. + :param action: list or numpy array of pwm values in range [0, 885] + """ + if self.motor_control_state is not MotorControlType.PWM: + self._set_pwm_control() + for i, motor_id in enumerate(self.servo_ids): + data_write = [ + DXL_LOBYTE(DXL_LOWORD(action[i])), + DXL_HIBYTE(DXL_LOWORD(action[i])), + ] + self.pwm_writer.changeParam(motor_id, data_write) + + self.pwm_writer.txPacket() + + def set_trigger_torque(self): + """ + Sets a constant torque torque for the last servo in the chain. This is useful for the trigger of the leader arm + """ + self.dynamixel._enable_torque(self.servo_ids[-1]) + self.dynamixel.set_pwm_value(self.servo_ids[-1], 200) + + def limit_pwm(self, limit: Union[int, list, np.ndarray]): + """ + Limits the pwm values for the servos in for position control + @param limit: 0 ~ 885 + @return: + """ + if isinstance(limit, int): + limits = [ + limit, + ] * 5 + else: + limits = limit + self._disable_torque() + for motor_id, limit in zip(self.servo_ids, limits): + self.dynamixel.set_pwm_limit(motor_id, limit) + self._enable_torque() + + def _disable_torque(self): + print(f"disabling torque for servos {self.servo_ids}") + for motor_id in self.servo_ids: + self.dynamixel._disable_torque(motor_id) + + def _enable_torque(self): + print(f"enabling torque for servos {self.servo_ids}") + for motor_id in self.servo_ids: + self.dynamixel._enable_torque(motor_id) + + def _set_pwm_control(self): + self._disable_torque() + for motor_id in self.servo_ids: + self.dynamixel.set_operating_mode(motor_id, OperatingMode.PWM) + self._enable_torque() + self.motor_control_state = MotorControlType.PWM + + def _set_position_control(self): + self._disable_torque() + for motor_id in self.servo_ids: + self.dynamixel.set_operating_mode(motor_id, OperatingMode.POSITION) + self._enable_torque() + self.motor_control_state = MotorControlType.POSITION_CONTROL + + +if __name__ == "__main__": + robot = Robot(device_name="/dev/tty.usbmodem57380045631") + robot._disable_torque() + for _ in range(10000): + s = time.time() + pos = robot.read_position() + elapsed = time.time() - s + print(f"read took {elapsed} pos {pos}") diff --git a/gym_lowcostrobot/interface/simulated_robot.py b/gym_lowcostrobot/interface/simulated_robot.py new file mode 100644 index 0000000..e9d06e0 --- /dev/null +++ b/gym_lowcostrobot/interface/simulated_robot.py @@ -0,0 +1,255 @@ +import mujoco +import numpy as np + + +## https://alefram.github.io/posts/Basic-inverse-kinematics-in-Mujoco +# Levenberg-Marquardt method +class LevenbegMarquardtIK: + def __init__(self, model, data, tol=0.04, step_size=0.5): + self.model = model + self.data = data + self.jacp = np.zeros((3, model.nv)) # translation jacobian + self.jacr = np.zeros((3, model.nv)) # rotational jacobian + self.step_size = step_size + self.tol = tol + self.alpha = 0.5 + self.damping = 0.15 + + def check_joint_limits(self, q): + """Check if the joints is under or above its limits""" + for i in range(len(q)): + q[i] = max(self.model.jnt_range[i][0], min(q[i], self.model.jnt_range[i][1])) + + # Levenberg-Marquardt pseudocode implementation + def calculate(self, goal, body_id, viewer): + """Calculate the desire joints angles for goal""" + current_pose = self.data.body(body_id).xpos + error = np.subtract(goal, current_pose) + + while np.linalg.norm(error) >= self.tol: + # calculate jacobian + mujoco.mj_jac(self.model, self.data, self.jacp, self.jacr, goal, body_id) + + # calculate delta of joint q + n = self.jacp.shape[1] + I = np.identity(n) + product = self.jacp.T @ self.jacp + self.damping * I + + if np.isclose(np.linalg.det(product), 0): + j_inv = np.linalg.pinv(product) @ self.jacp.T + else: + j_inv = np.linalg.inv(product) @ self.jacp.T + + delta_q = j_inv @ error + + # compute next step + self.data.qpos[:1] += self.step_size * delta_q + + # check limits + # self.check_joint_limits(self.data.qpos) + + # compute forward kinematics + mujoco.mj_forward(self.model, self.data) + viewer.sync() + + # calculate new error + error = np.subtract(goal, self.data.body(body_id).xpos) + print("Error: ", np.linalg.norm(error)) + + +class SimulatedRobot: + def __init__(self, m, d) -> None: + """ + :param m: mujoco model + :param d: mujoco data + """ + self.m = m + self.d = d + + def _pos2pwm(self, pos: np.ndarray) -> np.ndarray: + """ + :param pos: numpy array of joint positions in range [-pi, pi] + :return: numpy array of pwm values in range [0, 4096] + """ + return (pos / 3.14 + 1.0) * 4096 + + def _pwm2pos(self, pwm: np.ndarray) -> np.ndarray: + """ + :param pwm: numpy array of pwm values in range [0, 4096] + :return: numpy array of joint positions in range [-pi, pi] + """ + return (pwm / 2048 - 1) * 3.14 + + def _pwm2norm(self, x: np.ndarray) -> np.ndarray: + """ + :param x: numpy array of pwm values in range [0, 4096] + :return: numpy array of values in range [0, 1] + """ + return x / 4096 + + def _norm2pwm(self, x: np.ndarray) -> np.ndarray: + """ + :param x: numpy array of values in range [0, 1] + :return: numpy array of pwm values in range [0, 4096] + """ + return x * 4096 + + def read_position(self, nb_dof=5) -> np.ndarray: + """ + :return: numpy array of current joint positions in range [0, 4096] + """ + return self.d.qpos[:nb_dof] + + def read_velocity(self): + """ + Reads the joint velocities of the robot. + :return: list of joint velocities, + """ + return self.d.qvel + + def read_ee_pos(self, joint_name="end_effector"): + """ + :param joint_name: name of the end effector joint + :return: numpy array of end effector position + """ + joint_id = self.m.body(joint_name).id + return self.d.geom_xpos[joint_id] + + def inverse_kinematics(self, ee_target_pos, step=0.2, joint_name="end_effector", nb_dof=5): + """ + :param ee_target_pos: numpy array of target end effector position + :param joint_name: name of the end effector joint + """ + joint_id = self.m.body(joint_name).id + + # get the current end effector position + ee_pos = self.d.geom_xpos[joint_id] + + # compute the jacobian + jac = np.zeros((3, self.m.nv)) + mujoco.mj_jacBodyCom(self.m, self.d, jac, None, joint_id) + + # compute target joint velocities + qpos = self.read_position(nb_dof=nb_dof) + qdot = np.dot(np.linalg.pinv(jac[:, :nb_dof]), ee_target_pos - ee_pos) + + # apply the joint velocities + q_target_pos = qpos + qdot * step + return q_target_pos + + def inverse_kinematics_reg(self, ee_target_pos, step=0.2, joint_name="end_effector", nb_dof=5, regularization=1e-6): + """ + Computes the inverse kinematics for a robotic arm to reach the target end effector position. + + :param ee_target_pos: numpy array of target end effector position [x, y, z] + :param step: float, step size for the iteration + :param joint_name: str, name of the end effector joint + :param nb_dof: int, number of degrees of freedom + :param regularization: float, regularization factor for the pseudoinverse computation + :return: numpy array of target joint positions + """ + try: + # Get the joint ID from the name + joint_id = self.m.body(joint_name).id + except KeyError: + raise ValueError(f"Body name '{joint_name}' not found in the model.") + + # Get the current end effector position + # ee_pos = self.d.geom_xpos[joint_id] + ee_id = self.m.body(joint_name).id + ee_pos = self.d.geom_xpos[ee_id] + + # Compute the Jacobian + jac = np.zeros((3, self.m.nv)) + mujoco.mj_jacBodyCom(self.m, self.d, jac, None, joint_id) + + # Compute the difference between target and current end effector positions + delta_pos = ee_target_pos - ee_pos + + # Compute the pseudoinverse of the Jacobian with regularization + jac_reg = jac[:, :nb_dof].T @ jac[:, :nb_dof] + regularization * np.eye(nb_dof) + jac_pinv = np.linalg.inv(jac_reg) @ jac[:, :nb_dof].T + + # Compute target joint velocities + qdot = jac_pinv @ delta_pos + + # Normalize joint velocities to avoid excessive movements + qdot_norm = np.linalg.norm(qdot) + if qdot_norm > 1.0: + qdot /= qdot_norm + + # Read the current joint positions + qpos = self.read_position(nb_dof=nb_dof) + + # Compute the new joint positions + q_target_pos = qpos + qdot * step + + return q_target_pos + + def inverse_kinematics_null_reg( + self, + ee_target_pos, + step=0.2, + joint_name="moving_side", + nb_dof=6, + regularization=1e-6, + home_position=None, + nullspace_weight=0.1, + ): + """ + Computes the inverse kinematics for a robotic arm to reach the target end effector position. + + :param ee_target_pos: numpy array of target end effector position [x, y, z] + :param step: float, step size for the iteration + :param joint_name: str, name of the end effector joint + :param nb_dof: int, number of degrees of freedom + :param regularization: float, regularization factor for the pseudoinverse computation + :param home_position: numpy array of home joint positions to regularize towards + :param nullspace_weight: float, weight for the nullspace regularization + :return: numpy array of target joint positions + """ + if home_position is None: + home_position = np.zeros(nb_dof) # Default to zero if no home position is provided + + try: + # Get the joint ID from the name + joint_id = self.m.body(joint_name).id + except KeyError: + raise ValueError(f"Body name '{joint_name}' not found in the model.") + + # Get the current end effector position + ee_id = self.m.body(joint_name).id + ee_pos = self.d.geom_xpos[ee_id] + + # Compute the Jacobian + jac = np.zeros((3, self.m.nv)) + mujoco.mj_jacBodyCom(self.m, self.d, jac, None, joint_id) + print(jac) + + # Compute the difference between target and current end effector positions + delta_pos = ee_target_pos - ee_pos + + # Compute the pseudoinverse of the Jacobian with regularization + jac_reg = jac[:, :nb_dof].T @ jac[:, :nb_dof] + regularization * np.eye(nb_dof) + jac_pinv = np.linalg.inv(jac_reg) @ jac[:, :nb_dof].T + + # Compute target joint velocities + qdot = jac_pinv @ delta_pos + + # Add nullspace regularization to keep joint positions close to the home position + qpos = self.d.qpos[:nb_dof] + nullspace_reg = nullspace_weight * (home_position - qpos) + qdot += nullspace_reg + + # Normalize joint velocities to avoid excessive movements + qdot_norm = np.linalg.norm(qdot) + if qdot_norm > 1.0: + qdot /= qdot_norm + + # Compute the new joint positions + q_target_pos = qpos + qdot * step + + return q_target_pos + + def set_target_qpos(self, target_pos): + self.d.ctrl = target_pos