diff --git a/lerobot/configs/robot/so100.yaml b/lerobot/configs/robot/so100.yaml index ec6f3e3fe..0978de64e 100644 --- a/lerobot/configs/robot/so100.yaml +++ b/lerobot/configs/robot/so100.yaml @@ -18,7 +18,7 @@ max_relative_target: null leader_arms: main: _target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus - port: /dev/tty.usbmodem585A0077581 + port: /dev/tty.usbmodem58760433331 motors: # name: (index, model) shoulder_pan: [1, "sts3215"] diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_robot.py new file mode 100644 index 000000000..c467b902e --- /dev/null +++ b/lerobot/scripts/eval_robot.py @@ -0,0 +1,637 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Evaluate a policy on an environment by running rollouts and computing metrics. + +Usage examples: + +You want to evaluate a model from the hub (eg: https://huggingface.co/lerobot/diffusion_pusht) +for 10 episodes. + +``` +python lerobot/scripts/eval.py -p lerobot/diffusion_pusht eval.n_episodes=10 +``` + +OR, you want to evaluate a model checkpoint from the LeRobot training script for 10 episodes. + +``` +python lerobot/scripts/eval.py \ + -p outputs/train/diffusion_pusht/checkpoints/005000/pretrained_model \ + eval.n_episodes=10 +``` + +Note that in both examples, the repo/folder should contain at least `config.json`, `config.yaml` and +`model.safetensors`. + +Note the formatting for providing the number of episodes. Generally, you may provide any number of arguments +with `qualified.parameter.name=value`. In this case, the parameter eval.n_episodes appears as `n_episodes` +nested under `eval` in the `config.yaml` found at +https://huggingface.co/lerobot/diffusion_pusht/tree/main. +""" + +import argparse +import json +import logging +import threading +import time +from contextlib import nullcontext +from copy import deepcopy +from datetime import datetime as dt +from pathlib import Path +from typing import Callable + +import einops +import gymnasium as gym +import numpy as np +import torch +from huggingface_hub import snapshot_download +from huggingface_hub.errors import RepositoryNotFoundError +from huggingface_hub.utils._validators import HFValidationError +from torch import Tensor, nn +from tqdm import trange + +from lerobot.common.datasets.factory import make_dataset +from lerobot.common.logger import log_output_dir +from lerobot.common.policies.factory import make_policy +from lerobot.common.policies.policy_protocol import Policy +from lerobot.common.policies.utils import get_device_from_parameters +from lerobot.common.utils.io_utils import write_video +from lerobot.common.utils.utils import ( + get_safe_torch_device, + init_hydra_config, + init_logging, + inside_slurm, + set_global_seed, +) + +from lerobot.common.robot_devices.robots.factory import make_robot, Robot +from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot +from lerobot.scripts.eval import get_pretrained_policy_path +from lerobot.common.utils.utils import log_say +from lerobot.common.robot_devices.control_utils import is_headless, predict_action, busy_wait + + +def rollout( + robot: Robot, + policy: Policy, + fps: int, + control_time_s: float = 20, + num_rollouts: int = 2, + use_amp: bool = True + +) -> dict: + """Run a batched policy rollout once through a batch of environments. + + Note that all environments in the batch are run until the last environment is done. This means some + data will probably need to be discarded (for environments that aren't the first one to be done). + + The return dictionary contains: + (optional) "observation": A a dictionary of (batch, sequence + 1, *) tensors mapped to observation + keys. NOTE the that this has an extra sequence element relative to the other keys in the + dictionary. This is because an extra observation is included for after the environment is + terminated or truncated. + "action": A (batch, sequence, action_dim) tensor of actions applied based on the observations (not + including the last observations). + "reward": A (batch, sequence) tensor of rewards received for applying the actions. + "success": A (batch, sequence) tensor of success conditions (the only time this can be True is upon + environment termination/truncation). + "done": A (batch, sequence) tensor of **cumulative** done conditions. For any given batch element, + the first True is followed by True's all the way till the end. This can be used for masking + extraneous elements from the sequences above. + + Args: + robot: + policy: The policy. Must be a PyTorch nn module. + + Returns: + The dictionary described above. + """ + #assert isinstance(policy, nn.Module), "Policy must be a PyTorch nn module." + #device = get_device_from_parameters(policy) + + # define keyboard listener + listener, events = init_keyboard_listener() + + # Reset the policy. + #policy.reset() + + # Get observation from real robot + observation = robot.capture_observation() + + # Calculate reward + # in HIL-SERL it will be with a reward classifier + reward = calculate_reward(observation) + all_observations = [] + all_actions = [] + all_rewards = [] + + step = 0 + while step < num_rollouts: + start_episode_t = time.perf_counter() + timestamp = 0.0 + while timestamp < control_time_s: + start_loop_t = time.perf_counter() + + all_observations.append(deepcopy(observation)) + #observation = {key: observation[key].to(device, non_blocking=True) for key in observation} + + # Apply the next action. + while events["pause_policy"] and not events["human_intervention_step"]: + busy_wait(0.5) + + if events["human_intervention_step"]: + # take over the robot's actions + observation, action = robot.teleop_step(record_data=True) + else: + # explore with policy + with torch.inference_mode(): + action = robot.follower_arms["main"].read("Present_Position") + robot.send_action(torch.from_numpy(action)) + #action = predict_action(observation, policy, device, use_amp) + + observation = robot.capture_observation() + # Calculate reward + # in HIL-SERL it will be with a reward classifier + reward = calculate_reward(observation) + + #all_actions.append(torch.from_numpy(action)) + #all_rewards.append(torch.from_numpy(reward)) + + dt_s = time.perf_counter() - start_loop_t + busy_wait(1 / fps - dt_s) + + timestamp = time.perf_counter() - start_episode_t + if events["exit_early"]: + events["exit_early"] = False + events["human_intervention_step"] = False + events["pause_policy"] = False + break + step += 1 + + all_observations.append(deepcopy(observation)) + + # Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors. + ret = {} + #ret = { + # "action": torch.stack(all_actions, dim=1), + # "reward": torch.stack(all_rewards, dim=1), + # "observation": torch.stack([obs[key] for obs in all_observations for key in obs], dim=1) + #} + + listener.stop() + if robot.is_connected: + robot.disconnect() + + return ret + + +def eval_policy( + robot: Robot, + policy: torch.nn.Module, + n_episodes: int, + max_episodes_rendered: int = 0, + videos_dir: Path | None = None, + return_episode_data: bool = False, + start_seed: int | None = None, +) -> dict: + """ + Args: + env: The batch of environments. + policy: The policy. + n_episodes: The number of episodes to evaluate. + max_episodes_rendered: Maximum number of episodes to render into videos. + videos_dir: Where to save rendered videos. + return_episode_data: Whether to return episode data for online training. Incorporates the data into + the "episodes" key of the returned dictionary. + start_seed: The first seed to use for the first individual rollout. For all subsequent rollouts the + seed is incremented by 1. If not provided, the environments are not manually seeded. + Returns: + Dictionary with metrics and data regarding the rollouts. + """ + if max_episodes_rendered > 0 and not videos_dir: + raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.") + + #assert isinstance(policy, Policy) + start = time.time() + #policy.eval() + + # Determine how many batched rollouts we need to get n_episodes. Note that if n_episodes is not evenly + # divisible by env.num_envs we end up discarding some data in the last batch. + #n_batches = n_episodes // env.num_envs + int((n_episodes % env.num_envs) != 0) + + # Keep track of some metrics. + sum_rewards = [] + max_rewards = [] + all_successes = [] + all_seeds = [] + threads = [] # for video saving threads + n_episodes_rendered = 0 # for saving the correct number of videos + + # Callback for visualization. + def render_frame(env: gym.vector.VectorEnv): + # noqa: B023 + if n_episodes_rendered >= max_episodes_rendered: + return + n_to_render_now = min(max_episodes_rendered - n_episodes_rendered, env.num_envs) + if isinstance(env, gym.vector.SyncVectorEnv): + ep_frames.append(np.stack([env.envs[i].render() for i in range(n_to_render_now)])) # noqa: B023 + elif isinstance(env, gym.vector.AsyncVectorEnv): + # Here we must render all frames and discard any we don't need. + ep_frames.append(np.stack(env.call("render")[:n_to_render_now])) + + if max_episodes_rendered > 0: + video_paths: list[str] = [] + + if return_episode_data: + episode_data: dict | None = None + + # we dont want progress bar when we use slurm, since it clutters the logs + #progbar = trange(n_batches, desc="Stepping through eval batches", disable=inside_slurm()) + for batch_ix in progbar: + # Cache frames for rendering videos. Each item will be (b, h, w, c), and the list indexes the rollout + # step. + if max_episodes_rendered > 0: + ep_frames: list[np.ndarray] = [] + + if start_seed is None: + seeds = None + else: + seeds = range( + start_seed + (batch_ix * env.num_envs), start_seed + ((batch_ix + 1) * env.num_envs) + ) + rollout_data = rollout( + env, + policy, + seeds=list(seeds) if seeds else None, + return_observations=return_episode_data, + render_callback=render_frame if max_episodes_rendered > 0 else None, + ) + + # Figure out where in each rollout sequence the first done condition was encountered (results after + # this won't be included). + n_steps = rollout_data["done"].shape[1] + # Note: this relies on a property of argmax: that it returns the first occurrence as a tiebreaker. + done_indices = torch.argmax(rollout_data["done"].to(int), dim=1) + + # Make a mask with shape (batch, n_steps) to mask out rollout data after the first done + # (batch-element-wise). Note the `done_indices + 1` to make sure to keep the data from the done step. + mask = (torch.arange(n_steps) <= einops.repeat(done_indices + 1, "b -> b s", s=n_steps)).int() + # Extend metrics. + batch_sum_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "sum") + sum_rewards.extend(batch_sum_rewards.tolist()) + batch_max_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "max") + max_rewards.extend(batch_max_rewards.tolist()) + batch_successes = einops.reduce((rollout_data["success"] * mask), "b n -> b", "any") + all_successes.extend(batch_successes.tolist()) + if seeds: + all_seeds.extend(seeds) + else: + all_seeds.append(None) + + # FIXME: episode_data is either None or it doesn't exist + if return_episode_data: + this_episode_data = _compile_episode_data( + rollout_data, + done_indices, + start_episode_index=batch_ix * env.num_envs, + start_data_index=(0 if episode_data is None else (episode_data["index"][-1].item() + 1)), + fps=env.unwrapped.metadata["render_fps"], + ) + if episode_data is None: + episode_data = this_episode_data + else: + # Some sanity checks to make sure we are correctly compiling the data. + assert episode_data["episode_index"][-1] + 1 == this_episode_data["episode_index"][0] + assert episode_data["index"][-1] + 1 == this_episode_data["index"][0] + # Concatenate the episode data. + episode_data = {k: torch.cat([episode_data[k], this_episode_data[k]]) for k in episode_data} + + # Maybe render video for visualization. + if max_episodes_rendered > 0 and len(ep_frames) > 0: + batch_stacked_frames = np.stack(ep_frames, axis=1) # (b, t, *) + for stacked_frames, done_index in zip( + batch_stacked_frames, done_indices.flatten().tolist(), strict=False + ): + if n_episodes_rendered >= max_episodes_rendered: + break + + videos_dir.mkdir(parents=True, exist_ok=True) + video_path = videos_dir / f"eval_episode_{n_episodes_rendered}.mp4" + video_paths.append(str(video_path)) + thread = threading.Thread( + target=write_video, + args=( + str(video_path), + stacked_frames[: done_index + 1], # + 1 to capture the last observation + env.unwrapped.metadata["render_fps"], + ), + ) + thread.start() + threads.append(thread) + n_episodes_rendered += 1 + + progbar.set_postfix( + {"running_success_rate": f"{np.mean(all_successes[:n_episodes]).item() * 100:.1f}%"} + ) + + # Wait till all video rendering threads are done. + for thread in threads: + thread.join() + + # Compile eval info. + info = { + "per_episode": [ + { + "episode_ix": i, + "sum_reward": sum_reward, + "max_reward": max_reward, + "success": success, + "seed": seed, + } + for i, (sum_reward, max_reward, success, seed) in enumerate( + zip( + sum_rewards[:n_episodes], + max_rewards[:n_episodes], + all_successes[:n_episodes], + all_seeds[:n_episodes], + strict=True, + ) + ) + ], + "aggregated": { + "avg_sum_reward": float(np.nanmean(sum_rewards[:n_episodes])), + "avg_max_reward": float(np.nanmean(max_rewards[:n_episodes])), + "pc_success": float(np.nanmean(all_successes[:n_episodes]) * 100), + "eval_s": time.time() - start, + "eval_ep_s": (time.time() - start) / n_episodes, + }, + } + + if return_episode_data: + info["episodes"] = episode_data + + if max_episodes_rendered > 0: + info["video_paths"] = video_paths + + return info + + +def _compile_episode_data( + rollout_data: dict, done_indices: Tensor, start_episode_index: int, start_data_index: int, fps: float +) -> dict: + """Convenience function for `eval_policy(return_episode_data=True)` + + Compiles all the rollout data into a Hugging Face dataset. + + Similar logic is implemented when datasets are pushed to hub (see: `push_to_hub`). + """ + ep_dicts = [] + total_frames = 0 + for ep_ix in range(rollout_data["action"].shape[0]): + # + 2 to include the first done frame and the last observation frame. + num_frames = done_indices[ep_ix].item() + 2 + total_frames += num_frames + + # Here we do `num_frames - 1` as we don't want to include the last observation frame just yet. + ep_dict = { + "action": rollout_data["action"][ep_ix, : num_frames - 1], + "episode_index": torch.tensor([start_episode_index + ep_ix] * (num_frames - 1)), + "frame_index": torch.arange(0, num_frames - 1, 1), + "timestamp": torch.arange(0, num_frames - 1, 1) / fps, + "next.done": rollout_data["done"][ep_ix, : num_frames - 1], + "next.success": rollout_data["success"][ep_ix, : num_frames - 1], + "next.reward": rollout_data["reward"][ep_ix, : num_frames - 1].type(torch.float32), + } + + # For the last observation frame, all other keys will just be copy padded. + for k in ep_dict: + ep_dict[k] = torch.cat([ep_dict[k], ep_dict[k][-1:]]) + + for key in rollout_data["observation"]: + ep_dict[key] = rollout_data["observation"][key][ep_ix, :num_frames] + + ep_dicts.append(ep_dict) + + data_dict = {} + for key in ep_dicts[0]: + data_dict[key] = torch.cat([x[key] for x in ep_dicts]) + + data_dict["index"] = torch.arange(start_data_index, start_data_index + total_frames, 1) + + return data_dict + + +def main( + robot_path, + robot_overrides, + pretrained_policy_path: Path | None = None, + hydra_cfg_path: str | None = None, + out_dir: str | None = None, + config_overrides: list[str] | None = None, +): + assert (pretrained_policy_path is None) ^ (hydra_cfg_path is None) + if pretrained_policy_path is not None: + hydra_cfg = init_hydra_config(str(pretrained_policy_path / "config.yaml"), config_overrides) + else: + hydra_cfg = init_hydra_config(hydra_cfg_path, config_overrides) + + if hydra_cfg.eval.batch_size > hydra_cfg.eval.n_episodes: + raise ValueError( + "The eval batch size is greater than the number of eval episodes " + f"({hydra_cfg.eval.batch_size} > {hydra_cfg.eval.n_episodes}). As a result, {hydra_cfg.eval.batch_size} " + f"eval environments will be instantiated, but only {hydra_cfg.eval.n_episodes} will be used. " + "This might significantly slow down evaluation. To fix this, you should update your command " + f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={hydra_cfg.eval.batch_size}`), " + f"or lower the batch size (e.g. `eval.batch_size={hydra_cfg.eval.n_episodes}`)." + ) + + if out_dir is None: + out_dir = f"outputs/eval/{dt.now().strftime('%Y-%m-%d/%H-%M-%S')}_{hydra_cfg.env.name}_{hydra_cfg.policy.name}" + + # Check device is available + device = get_safe_torch_device(hydra_cfg.device, log=True) + + torch.backends.cudnn.benchmark = True + torch.backends.cuda.matmul.allow_tf32 = True + set_global_seed(hydra_cfg.seed) + + log_output_dir(out_dir) + + logging.info("Making environment.") + robot_cfg = init_hydra_config(robot_path, robot_overrides) + robot = make_robot(robot_cfg) + if not robot.is_connected: + robot.connect() + + logging.info("Making policy.") + if hydra_cfg_path is None: + policy = make_policy(hydra_cfg=hydra_cfg, pretrained_policy_name_or_path=str(pretrained_policy_path)) + else: + # Note: We need the dataset stats to pass to the policy's normalization modules. + policy = make_policy(hydra_cfg=hydra_cfg, dataset_stats=make_dataset(hydra_cfg).meta.stats) + + assert isinstance(policy, nn.Module) + policy.eval() + + with torch.no_grad(), torch.autocast(device_type=device.type) if hydra_cfg.use_amp else nullcontext(): + info = eval_policy( + robot, + policy, + 10,#hydra_cfg.eval.n_episodes, + max_episodes_rendered=10, + videos_dir=Path(out_dir) / "videos", + #start_seed=hydra_cfg.seed, + ) + print(info["aggregated"]) + + # Save info + with open(Path(out_dir) / "eval_info.json", "w") as f: + json.dump(info, f, indent=2) + + logging.info("End of eval") + if robot.is_connected: + robot.disconnect() + +def calculate_reward(observation): + """ + Method to calculate reward function in some way. + In HIL-SERL this is done through defining a reward classifier + """ + #reward = reward_classifier(observation) + return 0. + +def init_keyboard_listener(): + # Allow to exit early while recording an episode or resetting the environment, + # by tapping the right arrow key '->'. This might require a sudo permission + # to allow your terminal to monitor keyboard events. + events = {} + events["exit_early"] = False + events["rerecord_episode"] = False + events["stop_recording"] = False + events["pause_policy"] = False + events["human_intervention_step"] = False + + if is_headless(): + logging.warning( + "Headless environment detected. On-screen cameras display and keyboard inputs will not be available." + ) + listener = None + return listener, events + + # Only import pynput if not in a headless environment + from pynput import keyboard + + def on_press(key): + try: + if key == keyboard.Key.right: + print("Right arrow key pressed. Exiting loop...") + events["exit_early"] = True + elif key == keyboard.Key.left: + print("Left arrow key pressed. Exiting loop and rerecord the last episode...") + events["rerecord_episode"] = True + events["exit_early"] = True + elif key == keyboard.Key.esc: + print("Escape key pressed. Stopping data recording...") + events["stop_recording"] = True + events["exit_early"] = True + elif key == keyboard.Key.space: + # check if first space press then pause the policy for the user to get ready + # if second space press then the user is ready to start intervention + if not events["pause_policy"]: + print("Space key pressed. Human intervention required.\n" \ + "Place the leader in similar pose to the follower and press space again.") + events["pause_policy"] = True + log_say("Get ready to take over.", play_sounds=True) + else: + events["human_intervention_step"] = True + log_say("Starting human intervention.", play_sounds=True) + + except Exception as e: + print(f"Error handling key press: {e}") + + listener = keyboard.Listener(on_press=on_press) + listener.start() + + return listener, events + + +if __name__ == "__main__": + init_logging() + + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "--robot-path", + type=str, + default="lerobot/configs/robot/koch.yaml", + help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.", + ) + group.add_argument( + "--robot-overrides", + type=str, + nargs="*", + help="Any key=value arguments to override config values (use dots for.nested=overrides)", + ) + group.add_argument( + "-p", + "--pretrained-policy-name-or-path", + help=( + "Either the repo ID of a model hosted on the Hub or a path to a directory containing weights " + "saved using `Policy.save_pretrained`. If not provided, the policy is initialized from scratch " + "(useful for debugging). This argument is mutually exclusive with `--config`." + ), + ) + group.add_argument( + "--config", + help=( + "Path to a yaml config you want to use for initializing a policy from scratch (useful for " + "debugging). This argument is mutually exclusive with `--pretrained-policy-name-or-path` (`-p`)." + ), + ) + parser.add_argument("--revision", help="Optionally provide the Hugging Face Hub revision ID.") + parser.add_argument( + "--out-dir", + help=( + "Where to save the evaluation outputs. If not provided, outputs are saved in " + "outputs/eval/{timestamp}_{env_name}_{policy_name}" + ), + ) + parser.add_argument( + "overrides", + nargs="*", + help="Any key=value arguments to override config values (use dots for.nested=overrides)", + ) + args = parser.parse_args() + + robot_cfg = init_hydra_config(args.robot_path, args.robot_overrides) + robot = make_robot(robot_cfg) + if not robot.is_connected: + robot.connect() + + rollout(robot, None, fps=40, control_time_s=100, num_rollouts=10) + #if args.pretrained_policy_name_or_path is None: + # main(hydra_cfg_path=args.config, out_dir=args.out_dir, config_overrides=args.overrides) + #else: + # pretrained_policy_path = get_pretrained_policy_path( + # arg s.pretrained_policy_name_or_path, revision=args.revision + # ) + + # main( + # pretrained_policy_path=pretrained_policy_path, + # out_dir=args.out_dir, + # config_overrides=args.overrides, + # )