diff --git a/benchmarks/data_collection/atari.py b/benchmarks/data_collection/atari.py new file mode 100644 index 00000000000..918eeaf2c58 --- /dev/null +++ b/benchmarks/data_collection/atari.py @@ -0,0 +1,129 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +Atari game data collection benchmark +==================================== + +Runs an Atari game with a random policy using a multiprocess async data collector. + +Image size: torch.Size([210, 160, 3]) + +Performance results with default configuration: ++-------------------------------+--------------------------------------------------+ +| Machine specs | 3x A100 GPUs, | +| | Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz | +| | | ++===============================+==================================================+ +| Batched transforms | 1775.2762 fps | ++-------------------------------+--------------------------------------------------+ +| Single env transform | 2593.7481 fps | ++-------------------------------+--------------------------------------------------+ + +""" +import argparse +import time + +import torch.cuda +import tqdm + +from torchrl.collectors.collectors import MultiaSyncDataCollector, RandomPolicy +from torchrl.envs import ( + Compose, + EnvCreator, + GrayScale, + ParallelEnv, + Resize, + ToTensorImage, + TransformedEnv, +) +from torchrl.envs.libs.gym import GymEnv + +total_frames = 100000 + +parser = argparse.ArgumentParser() + +parser.add_argument( + "--batched", + action="store_true", + help="if True, the transforms will be applied on batches of images.", +) +parser.add_argument( + "--n_envs", + type=int, + default=16, + help="Number of environments to be run in parallel in each collector.", +) +parser.add_argument( + "--n_workers_collector", + type=int, + default=3, + help="Number sub-collectors in the data collector.", +) +parser.add_argument( + "--n_frames", + type=int, + default=64, + help="Number of frames in each batch of data collected.", +) + +if __name__ == "__main__": + + def make_env(): + return GymEnv("ALE/Pong-v5") + + # print the raw env output + print(make_env().fake_tensordict()) + + def make_transformed_env(env): + return TransformedEnv( + env, + Compose( + ToTensorImage(), + GrayScale(), + Resize(84, 84), + ), + ) + + args = parser.parse_args() + if args.batched: + parallel_env = make_transformed_env( + ParallelEnv(args.n_envs, EnvCreator(lambda: make_env())) + ) + else: + parallel_env = ParallelEnv( + args.n_envs, EnvCreator(lambda: make_transformed_env(make_env())) + ) + devices = list(range(torch.cuda.device_count()))[: args.n_workers_collector] + if len(devices) == 1: + devices = devices[0] + elif len(devices) < args.n_workers_collector: + raise RuntimeError( + "This benchmark requires at least as many GPUs as the number of collector workers." + ) + collector = MultiaSyncDataCollector( + [ + parallel_env, + ] + * args.n_workers_collector, + RandomPolicy(parallel_env.action_spec), + total_frames=total_frames, + frames_per_batch=args.n_frames, + devices=devices, + passing_devices=devices, + split_trajs=False, + ) + frames = 0 + pbar = tqdm.tqdm(total=total_frames) + for i, data in enumerate(collector): + pbar.update(data.numel()) + if i == 10: + t = time.time() + if i >= 10: + frames += data.numel() + t = time.time() - t + del collector + print(f"\n\nframes per sec: {frames/t: 4.4f} (frames={frames}, t={t})\n\n") + exit() diff --git a/benchmarks/data_collection/atari_sb.py b/benchmarks/data_collection/atari_sb.py new file mode 100644 index 00000000000..f50af5af09a --- /dev/null +++ b/benchmarks/data_collection/atari_sb.py @@ -0,0 +1,62 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +Atari game data collection benchmark with stable-baselines3 +=========================================================== + +Runs an Atari game with a random policy using a multiprocess async data collector. + +Image size: torch.Size([210, 160, 3]) + +Performance results with default configuration: ++-------------------------------+--------------------------------------------------+ +| Machine specs | 3x A100 GPUs, | +| | Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz | +| | | ++===============================+==================================================+ +| | 1176.7944 fps | ++-------------------------------+--------------------------------------------------+ + +""" + +import time + +import tqdm +from stable_baselines3 import A2C +from stable_baselines3.common.env_util import make_atari_env +from stable_baselines3.common.vec_env import VecFrameStack + +# There already exists an environment generator +# that will make and wrap atari environments correctly. +# Here we are also multi-worker training (n_envs=4 => 4 environments) +n_envs = 32 +env = make_atari_env("PongNoFrameskip-v4", n_envs=n_envs, seed=0) +# Frame-stacking with 4 frames +env = VecFrameStack(env, n_stack=4) + +model = A2C("CnnPolicy", env, verbose=1) + +frames = 0 +total_frames = 100_000 +pbar = tqdm.tqdm(total=total_frames) +obs = env.reset() +action = None + +i = 0 +while True: + if i == 10: + t0 = time.time() + elif i >= 10: + frames += n_envs + pbar.update(n_envs) + if action is None: + action, _states = model.predict(obs) + obs, rewards, dones, info = env.step(action) + if frames > total_frames: + break + i += 1 +t = frames / (time.time() - t0) +print(f"fps: {t}") diff --git a/benchmarks/data_collection/dmc.py b/benchmarks/data_collection/dmc.py new file mode 100644 index 00000000000..1592c6c119f --- /dev/null +++ b/benchmarks/data_collection/dmc.py @@ -0,0 +1,129 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +DeepMind control suite data collection benchmark +================================================ + +Runs a "cheetah"-"run" dm-control task with a random policy using a multiprocess async data collector. + +Image size: torch.Size([240, 320, 3]) + +Performance results with default configuration: ++-------------------------------+--------------------------------------------------+ +| Machine specs | 3x A100 GPUs, | +| | Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz | +| | | ++===============================+==================================================+ +| Batched transforms | 1885.2913 fps | ++-------------------------------+--------------------------------------------------+ +| Single env transform | 1903.3575 fps | ++-------------------------------+--------------------------------------------------+ + +""" +import argparse +import time + +import torch.cuda +import tqdm + +from torchrl.collectors.collectors import MultiaSyncDataCollector, RandomPolicy +from torchrl.envs import ( + Compose, + EnvCreator, + GrayScale, + ParallelEnv, + Resize, + ToTensorImage, + TransformedEnv, +) +from torchrl.envs.libs.dm_control import DMControlEnv + +total_frames = 100000 + +parser = argparse.ArgumentParser() + +parser.add_argument( + "--batched", + action="store_true", + help="if True, the transforms will be applied on batches of images.", +) +parser.add_argument( + "--n_envs", + type=int, + default=8, + help="Number of environments to be run in parallel in each collector.", +) +parser.add_argument( + "--n_workers_collector", + type=int, + default=4, + help="Number sub-collectors in the data collector.", +) +parser.add_argument( + "--n_frames", + type=int, + default=64, + help="Number of frames in each batch of data collected.", +) + +if __name__ == "__main__": + + def make_env(): + return DMControlEnv("cheetah", "run", from_pixels=True) + + # print the raw env output + print(make_env().fake_tensordict()) + + def make_transformed_env(env): + return TransformedEnv( + env, + Compose( + ToTensorImage(), + GrayScale(), + Resize(84, 84), + ), + ) + + args = parser.parse_args() + if args.batched: + parallel_env = make_transformed_env( + ParallelEnv(args.n_envs, EnvCreator(make_env)) + ) + else: + parallel_env = ParallelEnv( + args.n_envs, EnvCreator(lambda: make_transformed_env(make_env())) + ) + devices = list(range(torch.cuda.device_count()))[: args.n_workers_collector] + if len(devices) == 1: + devices = devices[0] + elif len(devices) < args.n_workers_collector: + raise RuntimeError( + "This benchmark requires at least as many GPUs as the number of collector workers." + ) + collector = MultiaSyncDataCollector( + [ + parallel_env, + ] + * args.n_workers_collector, + RandomPolicy(parallel_env.action_spec), + total_frames=total_frames, + frames_per_batch=args.n_frames, + devices=devices, + passing_devices=devices, + split_trajs=False, + ) + frames = 0 + pbar = tqdm.tqdm(total=total_frames) + for i, data in enumerate(collector): + pbar.update(data.numel()) + if i == 10: + t = time.time() + if i >= 10: + frames += data.numel() + t = time.time() - t + del collector + print(f"\n\nframes per sec: {frames/t: 4.4f} (frames={frames}, t={t})\n\n") + exit() diff --git a/benchmarks/data_collection/habitat_env.py b/benchmarks/data_collection/habitat_env.py new file mode 100644 index 00000000000..6d3043c47a4 --- /dev/null +++ b/benchmarks/data_collection/habitat_env.py @@ -0,0 +1,138 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +DeepMind control suite data collection benchmark +================================================ + +Runs a "cheetah"-"run" dm-control task with a random policy using a multiprocess async data collector. + +Image size: torch.Size([240, 320, 3]) + +Performance results with default configuration: ++-------------------------------+--------------------------------------------------+ +| Machine specs | 3x A100 GPUs, | +| | Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz | +| | | ++===============================+==================================================+ +| Batched transforms | 843.6275 fps | ++-------------------------------+--------------------------------------------------+ +| Single env transform | 863.7805 fps | ++-------------------------------+--------------------------------------------------+ + +""" +import argparse +import time + +import torch.cuda +import tqdm + +from torchrl.collectors.collectors import MultiaSyncDataCollector, RandomPolicy +from torchrl.envs import ( + Compose, + EnvCreator, + GrayScale, + ParallelEnv, + Resize, + ToTensorImage, + TransformedEnv, +) +import gc +from torchrl.envs.libs.habitat import HabitatEnv + +total_frames = 100000 + +parser = argparse.ArgumentParser() + +parser.add_argument( + "--batched", + action="store_true", + help="if True, the transforms will be applied on batches of images.", +) +parser.add_argument( + "--n_envs", + type=int, + default=8, + help="Number of environments to be run in parallel in each collector.", +) +parser.add_argument( + "--n_workers_collector", + type=int, + default=4, + help="Number sub-collectors in the data collector.", +) +parser.add_argument( + "--n_frames", + type=int, + default=64, + help="Number of frames in each batch of data collected.", +) +parser.add_argument( + "--perf_mode", + action="store_true", + help="If True, the env are created in performance mode (lower rendering quality, higher throughput)", +) +if __name__ == "__main__": + args = parser.parse_args() + + def make_env(device=0): + return HabitatEnv("HabitatPick-v0" if args.perf_mode else "HabitatRenderPick-v0", from_pixels=True, device=device) + + # print the raw env output + env = make_env(3) + r = env.rollout(3) + env.close() + del env, r + gc.collect() + + def make_transformed_env(env): + return TransformedEnv( + env, + Compose( + ToTensorImage(), + GrayScale(), + Resize(84, 84), + ), + ) + + if args.batched: + parallel_env = make_transformed_env( + ParallelEnv(args.n_envs, EnvCreator(make_env)) + ) + else: + parallel_env = ParallelEnv( + args.n_envs, EnvCreator(lambda: make_transformed_env(make_env())) + ) + devices = list(range(torch.cuda.device_count()))[1:(args.n_workers_collector+1)] + if len(devices) == 1: + devices = devices[0] + elif len(devices) < args.n_workers_collector: + raise RuntimeError( + "This benchmark requires one more GPU than the number of collector workers." + ) + collector = MultiaSyncDataCollector( + [ + parallel_env, + ] + * args.n_workers_collector, + RandomPolicy(parallel_env.action_spec), + total_frames=total_frames, + frames_per_batch=args.n_frames, + devices=devices, + passing_devices=devices, + split_trajs=False, + ) + frames = 0 + pbar = tqdm.tqdm(total=total_frames) + for i, data in enumerate(collector): + pbar.update(data.numel()) + if i == 10: + t = time.time() + if i >= 10: + frames += data.numel() + t = time.time() - t + del collector + print(f"\n\nframes per sec: {frames/t: 4.4f} (frames={frames}, t={t})\n\n") + exit() diff --git a/benchmarks/data_collection/r3m.py b/benchmarks/data_collection/r3m.py new file mode 100644 index 00000000000..b2656e02e3c --- /dev/null +++ b/benchmarks/data_collection/r3m.py @@ -0,0 +1,79 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +Visual representation with DNN data collection benchmark +======================================================== + +""" + +import warnings + +warnings.filterwarnings("ignore") + +import time + +import tqdm + +from torchrl.collectors.collectors import MultiaSyncDataCollector, RandomPolicy +from torchrl.envs import EnvCreator, ParallelEnv, R3MTransform, TransformedEnv +from torchrl.envs.libs.dm_control import DMControlEnv + +total_frames = 10000 +if __name__ == "__main__": + + def make_env(): + return DMControlEnv("cheetah", "run", from_pixels=True) + + def make_transformed_env(env): + return TransformedEnv( + env, + R3MTransform("resnet50", in_keys=["pixels"]), + ) + + for method in range(2): + if method == 0: + + def parallel_env(): + return make_transformed_env(ParallelEnv(16, EnvCreator(make_env))) + + policy = RandomPolicy(make_env().action_spec) + else: + parallel_env = ParallelEnv( + 16, EnvCreator(lambda: make_transformed_env(make_env())) + ) + policy = RandomPolicy(parallel_env.action_spec) + collector = MultiaSyncDataCollector( + [ + parallel_env, + parallel_env, + ], + policy=policy, + total_frames=total_frames, + frames_per_batch=64, + devices=[ + "cuda:0", + "cuda:1", + ], + passing_devices=[ + "cuda:0", + "cuda:1", + ], + split_trajs=False, + ) + frames = 0 + pbar = tqdm.tqdm(total=total_frames) + for i, data in enumerate(collector): + pbar.update(data.numel()) + if i == 10: + t = time.time() + if i >= 10: + frames += data.numel() + t = time.time() - t + print( + f"batched={method==0}, frames per sec: {frames/t: 4.4f} (frames={frames}, t={t})" + ) + del collector + exit() diff --git a/benchmarks/data_collection/replay_buffer.py b/benchmarks/data_collection/replay_buffer.py new file mode 100644 index 00000000000..292d4c3a7e1 --- /dev/null +++ b/benchmarks/data_collection/replay_buffer.py @@ -0,0 +1,104 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +Replay Buffer usage benchmark +============================= + +""" + +import warnings + +warnings.filterwarnings("ignore") + +import time + +import tqdm +from torchrl.collectors.collectors import MultiaSyncDataCollector, RandomPolicy + +from torchrl.data import LazyMemmapStorage, ListStorage, ReplayBuffer +from torchrl.envs import ( + Compose, + EnvCreator, + GrayScale, + ParallelEnv, + Resize, + ToTensorImage, + TransformedEnv, +) +from torchrl.envs.libs.dm_control import DMControlEnv + +total_frames = 100_000 +if __name__ == "__main__": + + def make_env(): + return DMControlEnv("cheetah", "run", from_pixels=True) + + def make_transformed_env(env): + return TransformedEnv( + env, + Compose( + ToTensorImage(), + GrayScale(), + Resize(84, 84), + ), + ) + + parallel_env = make_transformed_env(ParallelEnv(8, EnvCreator(make_env))) + + for method in range(2): + + def parallel_env(): + return make_transformed_env(ParallelEnv(16, EnvCreator(make_env))) + + policy = RandomPolicy(make_env().action_spec) + if method == 0: + replay_buffer = ReplayBuffer( + 1_000_000, storage=ListStorage(1_000_000), prefetch=10 + ) + else: + replay_buffer = ReplayBuffer( + 1_000_000, storage=LazyMemmapStorage(1_000_000), prefetch=10 + ) + collector = MultiaSyncDataCollector( + [ + parallel_env, + parallel_env, + ], + policy=policy, + total_frames=total_frames, + frames_per_batch=64, + devices=[ + "cuda:0", + "cuda:1", + ], + passing_devices=[ + "cuda:0", + "cuda:1", + ], + split_trajs=False, + ) + frames = 0 + pbar = tqdm.tqdm(total=total_frames) + for _data in collector: + break + data = _data + for i in range(total_frames // data.numel()): + pbar.update(data.numel()) + if i == 10: + t = time.time() + if i >= 10: + frames += data.numel() + replay_buffer.extend(data.view(-1).cpu()) + sample = replay_buffer.sample(128).contiguous() + print(sample) + data = sample["pixels"] + + t = time.time() - t + print( + f"memmap={method==1}, frames per sec: {frames/t: 4.4f} (frames={frames}, t={t})" + ) + del collector + exit() diff --git a/torchrl/envs/libs/dm_control.py b/torchrl/envs/libs/dm_control.py index aece9e33dda..c1de1c1b961 100644 --- a/torchrl/envs/libs/dm_control.py +++ b/torchrl/envs/libs/dm_control.py @@ -11,7 +11,7 @@ import numpy as np import torch -from torchrl.data import ( +from torchrl.data.tensor_specs import ( BoundedTensorSpec, CompositeSpec, TensorSpec, @@ -19,13 +19,12 @@ UnboundedDiscreteTensorSpec, ) -from ...data.utils import DEVICE_TYPING, numpy_to_torch_dtype_dict -from ..gym_like import GymLikeEnv +from torchrl.data.utils import DEVICE_TYPING, numpy_to_torch_dtype_dict +from torchrl.envs.gym_like import GymLikeEnv if torch.has_cuda and torch.cuda.device_count() > 1: n = torch.cuda.device_count() - 1 os.environ["EGL_DEVICE_ID"] = str(1 + (os.getpid() % n)) - print("EGL_DEVICE_ID: ", os.environ["EGL_DEVICE_ID"]) try: @@ -38,7 +37,7 @@ except ImportError as err: _has_dmc = False - IMPORT_ERR = str(err) + IMPORT_ERR = err __all__ = ["DMControlEnv", "DMControlWrapper"] @@ -299,10 +298,8 @@ class DMControlEnv(DMControlWrapper): def __init__(self, env_name, task_name, **kwargs): if not _has_dmc: raise ImportError( - f"""dm_control python package was not found. Please install this dependency. -(Got the error message: {IMPORT_ERR}). -""" - ) + """dm_control python package was not found. Please install this dependency.""" + ) from IMPORT_ERR kwargs["env_name"] = env_name kwargs["task_name"] = task_name super().__init__(**kwargs) diff --git a/torchrl/envs/libs/gym.py b/torchrl/envs/libs/gym.py index 088e7eefbae..8a4a514cb17 100644 --- a/torchrl/envs/libs/gym.py +++ b/torchrl/envs/libs/gym.py @@ -8,7 +8,9 @@ from warnings import warn import torch -from torchrl.data import ( + +from torchrl._utils import implement_for +from torchrl.data.tensor_specs import ( BinaryDiscreteTensorSpec, BoundedTensorSpec, CompositeSpec, @@ -19,12 +21,10 @@ TensorSpec, UnboundedContinuousTensorSpec, ) +from torchrl.data.utils import numpy_to_torch_dtype_dict -from ..._utils import implement_for -from ...data.utils import numpy_to_torch_dtype_dict - -from ..gym_like import default_info_dict_reader, GymLikeEnv -from ..utils import _classproperty +from torchrl.envs.gym_like import default_info_dict_reader, GymLikeEnv +from torchrl.envs.utils import _classproperty try: import gym