diff --git a/mettagrid/config/room/box_world.py b/mettagrid/config/room/box_world.py new file mode 100644 index 00000000..bce407e0 --- /dev/null +++ b/mettagrid/config/room/box_world.py @@ -0,0 +1,147 @@ +""" +BoxWorld +======== + +Densely fills the map with rectangular *boxes*: + +* Interior rectangle; one side is guaranteed ≥ 8 cells. +* Walls 1‑cell thick. +* A single *entrance* (one gap) sits at a randomly chosen corner. +* A single heart **altar** sits in the *diagonally opposite* corner. +* Boxes are stamped until 10 consecutive placement failures, then agents + are spawned in the remaining open corridors. + +Parameters (override in YAML `room:` block if desired): + + short_range – inclusive (min, max) for interior short side + elong_range – inclusive (min_add, max_add) added to short → long side +""" + +from typing import List, Optional, Tuple + +import numpy as np +from mettagrid.config.room.room import Room + + +class BoxWorld(Room): + STYLE_PARAMETERS = {"box_world": {"hearts_count": 0}} + + # ------------------------------------------------------------------ # + # Init + # ------------------------------------------------------------------ # + def __init__( + self, + width: int = 120, + height: int = 120, + agents: int | dict = 0, + short_range: Tuple[int, int] = (4, 6), # interior short side + elong_range: Tuple[int, int] = (4, 8), # + add to short → long + seed: Optional[int] = 42, + border_width: int = 2, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width, self._height = np.random.randint(40,100), np.random.randint(40,100) + self._agents = agents + self._occ = np.zeros((height, width), dtype=bool) + self.short_min, self.short_max = short_range + self.elong_min, self.elong_max = elong_range + + # ------------------------------------------------------------------ # + # Public build + # ------------------------------------------------------------------ # + def _build(self): + grid = np.full((self._height, self._width), "empty", dtype=object) + fails = 0 + while fails < 10: # pack until space gone + if self._place_box(grid, clearance=1): + fails = 0 + else: + fails += 1 + return self._place_agents(grid) + + # ------------------------------------------------------------------ # + # Box helpers + # ------------------------------------------------------------------ # + def _place_box(self, grid, clearance: int) -> bool: + pattern = self._generate_box() + return self._place_region(grid, pattern, clearance) + + def _generate_box(self) -> np.ndarray: + # --- choose interior dimensions -------------------------------- # + short = int(self._rng.integers(self.short_min, self.short_max + 1)) + long_add = int(self._rng.integers(self.elong_min, self.elong_max + 1)) + long_side = max(short + long_add, 8) # ensure ≥ 8 somewhere + + horiz = self._rng.random() < 0.5 + h_int, w_int = (short, long_side) if horiz else (long_side, short) + + h, w = h_int + 2, w_int + 2 # +2 for walls + pat = np.full((h, w), "wall", dtype=object) + pat[1:-1, 1:-1] = "empty" + + # pick entrance corner, altar opposite + corners = [(1, 1), (1, w - 2), (h - 2, 1), (h - 2, w - 2)] + entrance = corners[self._rng.integers(4)] + altar = corners[(corners.index(entrance) + 2) % 4] + er, ec = entrance + + pat[altar] = "altar" + + # carve a single gap adjacent to the entrance corner + gaps = [] + if er == 1: gaps.append((0, ec)) # top wall + if er == h - 2: gaps.append((h - 1, ec)) # bottom wall + if ec == 1: gaps.append((er, 0)) # left wall + if ec == w - 2: gaps.append((er, w - 1)) # right wall + gap = gaps[self._rng.integers(len(gaps))] + pat[gap] = "empty" + return pat + + # ------------------------------------------------------------------ # + # Agent placement + # ------------------------------------------------------------------ # + def _place_agents(self, grid): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + n for n, k in self._agents.items() for _ in range(k)] + ) + for t in tags: + pos = self._rand_empty() + if pos: + grid[pos] = t + self._occ[pos] = True + return grid + + # ------------------------------------------------------------------ # + # Generic helpers (same as LabyrinthWorld) + # ------------------------------------------------------------------ # + def _place_region(self, grid, pat, clearance: int) -> bool: + ph, pw = pat.shape + for r, c in self._free_windows((ph + 2 * clearance, pw + 2 * clearance)): + grid[r + clearance : r + clearance + ph, + c + clearance : c + clearance + pw] = pat + self._occ[r : r + ph + 2 * clearance, + c : c + pw + 2 * clearance] = True + return True + return False + + def _free_windows(self, shape) -> List[Tuple[int, int]]: + h, w = shape + H, W = self._occ.shape + if h > H or w > W: + return [] + view = np.lib.stride_tricks.as_strided( + self._occ, (H - h + 1, W - w + 1, h, w), self._occ.strides * 2 + ) + coords = np.argwhere(view.sum(axis=(2, 3)) == 0) + self._rng.shuffle(coords) + return [tuple(t) for t in coords] + + def _rand_empty(self): + empties = np.flatnonzero(~self._occ) + return None if empties.size == 0 else tuple( + np.unravel_index(self._rng.integers(empties.size), self._occ.shape) + ) \ No newline at end of file diff --git a/mettagrid/config/room/cubicle_world.py b/mettagrid/config/room/cubicle_world.py new file mode 100644 index 00000000..632392e8 --- /dev/null +++ b/mettagrid/config/room/cubicle_world.py @@ -0,0 +1,126 @@ +""" +CubicleWorld +============= + +Creates a cubicle of full‑length walls: vertical lines from top‑to‑bottom and +horizontal lines from left‑to‑right. Interior rectangles of empty space are the +“cubicles.” Door‑sized openings are carved in each interior wall. + +The outermost perimeter is always a solid wall, ensuring the arena is fully enclosed. + +Episode‑time random sampling +---------------------------- +* env_width, env_height ← uniform int samples from width_range / height_range +* gap_x, gap_y ← uniform int samples from gap_range (4‑14) + +Wall coordinates: + vertical walls at 0, gap_x+1, 2*(gap_x+1), …, width‑1 + horizontal walls at 0, gap_y+1, 2*(gap_y+1), …, height‑1 +""" + +from __future__ import annotations +from typing import Sequence +import numpy as np +from mettagrid.config.room.room import Room + + + +class CubicleWorld(Room): + STYLE_PARAMETERS = {"cubicle": {"hearts_count": 0}} + + def __init__( + self, + width_range: Sequence[int] = (120, 320), + height_range: Sequence[int] = (120, 320), + gap_range: Sequence[int] = (4, 14), + altars_count: int = 50, + agents: int | dict = 20, + seed: int | None = 42, + border_object: str = "wall", + border_width: int = 6 + ): + """ + Initialize a CubicleWorld environment. + """ + super().__init__(border_width = border_width, border_object=border_object) + rng = np.random.default_rng(seed) + + self.width = np.random.randint(width_range[0], width_range[1]) + self.height = np.random.randint(height_range[0], height_range[1]) + self.gap_x = np.random.randint(gap_range[0], gap_range[1]) + self.gap_y = np.random.randint(gap_range[0], gap_range[1]) + + self.altars_count = altars_count + self._agents = agents + self._rng = rng + + # --------------------------------------------------------------- # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # --------------------------------------------------------------- # + def _generate_grid(self) -> np.ndarray: + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Draw vertical full‑height walls + col = 0 + while col < W: + grid[:, col] = "wall" + col += self.gap_x + 1 + + # Draw horizontal full‑width walls + row = 0 + while row < H: + grid[row, :] = "wall" + row += self.gap_y + 1 + + # Carve door‑sized breaks in every interior wall + # Vertical walls: skip borders (col == 0 or col == W‑1) + for col in range(self.gap_x + 1, W - 1, self.gap_x + 1): + for row_start in range(0, H, self.gap_y + 1): + if row_start + self.gap_y < H: + door_row = row_start + 1 + self._rng.integers(self.gap_y) + grid[door_row, col] = "empty" + + # Horizontal walls: skip borders (row == 0 or row == H‑1) + for row in range(self.gap_y + 1, H - 1, self.gap_y + 1): + for col_start in range(0, W, self.gap_x + 1): + if col_start + self.gap_x < W: + door_col = col_start + 1 + self._rng.integers(self.gap_x) + grid[row, door_col] = "empty" + + # Ensure the outer perimeter is a solid wall + grid[0, :] = "wall" + grid[-1, :] = "wall" + grid[:, 0] = "wall" + grid[:, -1] = "wall" + + # Occupancy mask for placement + self._occ = np.zeros((H, W), dtype=bool) + return grid + + # --------------------------------------------------------------- # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(grid == "empty"))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + # build tag list + if isinstance(self._agents, int): + tags = ["agent.agent"] * self._agents + else: + tags = ["agent." + t for t, n in self._agents.items() for _ in range(n)] + + empties = np.flatnonzero((grid == "empty") & (~self._occ)) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/cubicle_world_uniform.py b/mettagrid/config/room/cubicle_world_uniform.py new file mode 100644 index 00000000..79786b76 --- /dev/null +++ b/mettagrid/config/room/cubicle_world_uniform.py @@ -0,0 +1,111 @@ +""" +CubicleUniformWorld +=================== + +Variant of *CubicleWorld* in which every interior wall has a single door +**exactly at the midpoint** of the wall segment, producing perfectly aligned +corridors between adjacent cubicles. +""" + +from __future__ import annotations +from typing import Sequence +import numpy as np + +from mettagrid.config.room.room import Room + + +class CubicleUniformWorld(Room): + STYLE_PARAMETERS = {"cubicle_uniform": {"hearts_count": 0}} + + # ------------------------------------------------------------------ # + def __init__( + self, + width_range: Sequence[int] = (120, 320), + height_range: Sequence[int] = (120, 320), + gap_range: Sequence[int] = (4, 14), + altars_count: int = 50, + agents: int | dict = 20, + seed: int | None = 42, + border_object: str = "wall", + ): + super().__init__(border_object=border_object) + rng = np.random.default_rng(seed) + + self.width = rng.integers(width_range[0], width_range[1]) + self.height = rng.integers(height_range[0], height_range[1]) + self.gap_x = rng.integers(gap_range[0], gap_range[1]) + self.gap_y = rng.integers(gap_range[0], gap_range[1]) + + self.altars_count = altars_count + self._agents = agents + self._rng = rng + + # ------------------------------------------------------------------ # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # ------------------------------------------------------------------ # + def _generate_grid(self) -> np.ndarray: + """Create the cubicle lattice with centred doors.""" + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Draw vertical full‑height walls + col = 0 + while col < W: + grid[:, col] = "wall" + col += self.gap_x + 1 + + # Draw horizontal full‑width walls + row = 0 + while row < H: + grid[row, :] = "wall" + row += self.gap_y + 1 + + # Carve centred doors in vertical walls (skip outer perimeter) + for col in range(self.gap_x + 1, W - 1, self.gap_x + 1): + row_start = 0 + while row_start + self.gap_y < H: + door_row = row_start + 1 + self.gap_y // 2 + grid[door_row, col] = "empty" + row_start += self.gap_y + 1 + + # Carve centred doors in horizontal walls + for row in range(self.gap_y + 1, H - 1, self.gap_y + 1): + col_start = 0 + while col_start + self.gap_x < W: + door_col = col_start + 1 + self.gap_x // 2 + grid[row, door_col] = "empty" + col_start += self.gap_x + 1 + + # Ensure solid outer perimeter + grid[0, :] = grid[-1, :] = "wall" + grid[:, 0] = grid[:, -1] = "wall" + + # Occupancy mask + self._occ = np.zeros((H, W), dtype=bool) + return grid + + # ------------------------------------------------------------------ # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(grid == "empty"))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + empties = np.flatnonzero((grid == "empty") & (~self._occ)) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/cylinder_world.py b/mettagrid/config/room/cylinder_world.py new file mode 100644 index 00000000..026055db --- /dev/null +++ b/mettagrid/config/room/cylinder_world.py @@ -0,0 +1,135 @@ +from typing import List, Optional, Tuple + +import numpy as np + +from mettagrid.config.room.room import Room + + +class CylinderWorld(Room): + STYLE_PARAMETERS = { + "cylinder_world": { + "hearts_count": 0, # altars are inside cylinders + "cylinders": {"count": 999}, # ignored; we fill until no room + }, + } + + def __init__( + self, + width: int, + height: int, + agents: int | dict = 0, + seed: Optional[int] = 42, + border_width: int = 0, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + width, height = np.random.randint(40, 100), np.random.randint(40, 100) + self._width, self._height = width, height + self._agents = agents + + # occupancy mask: False = empty + self._occ = np.zeros((height, width), dtype=bool) + + # ------------------------------------------------------------------ # + # Public build + # ------------------------------------------------------------------ # + def _build(self) -> np.ndarray: + return self._build_cylinder_world() + + # ------------------------------------------------------------------ # + # Cylinder‑only build + # ------------------------------------------------------------------ # + def _build_cylinder_world(self) -> np.ndarray: + """ + Keep adding cylinders until *no* size/orientation fits anywhere. + + Strategy: restart the attempt with a fresh random cylinder after every + successful placement. Stop only after ``max_consecutive_fail`` failed + attempts *in a row* (i.e. we tried many random sizes/orientations + without success), which strongly suggests the map is packed. + """ + grid = np.full((self._height, self._width), "empty", dtype=object) + self._occ[:, :] = False + + max_consecutive_fail = 2 + fails = 0 + while fails < max_consecutive_fail: + placed = self._place_cylinder_once(grid, clearance=1) + if placed: + fails = 0 # reset – we still found room + else: + fails += 1 # try a different size/orientation + + + # Finally, spawn any requested agents on leftover empty cells + grid = self._place_agents(grid) + return grid + + # ------------------------------------------------------------------ # + # Cylinder placement helpers + # ------------------------------------------------------------------ # + def _place_cylinder_once(self, grid: np.ndarray, clearance: int = 1) -> bool: + pat = self._generate_cylinder_pattern() + return self._place_region(grid, pat, clearance) + + def _generate_cylinder_pattern(self) -> np.ndarray: + length = int(self._rng.integers(2, 30)) + gap = int(self._rng.integers(1, 4)) + vertical = bool(self._rng.integers(0, 2)) + if vertical: + h, w = length, gap + 2 + pat = np.full((h, w), "empty", dtype=object) + pat[:, 0] = pat[:, -1] = "wall" + pat[h // 2, 1 + gap // 2] = "altar" + else: + h, w = gap + 2, length + pat = np.full((h, w), "empty", dtype=object) + pat[0, :] = pat[-1, :] = "wall" + pat[1 + gap // 2, w // 2] = "altar" + return pat + + # ------------------------------------------------------------------ # + # Agents placement (simplified) + # ------------------------------------------------------------------ # + def _place_agents(self, grid): + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + else: + agents = ["agent." + a for a, n in self._agents.items() for _ in range(n)] + for a in agents: + pos = self._rand_empty() + if pos: + grid[pos] = a + self._occ[pos] = True + return grid + + # ------------------------------------------------------------------ # + # Region placement utilities + # ------------------------------------------------------------------ # + def _place_region(self, grid, pattern: np.ndarray, clearance: int) -> bool: + ph, pw = pattern.shape + for r, c in self._candidate_positions((ph + 2 * clearance, pw + 2 * clearance)): + grid[r + clearance : r + clearance + ph, c + clearance : c + clearance + pw] = pattern + self._occ[r + clearance : r + clearance + ph, c + clearance : c + clearance + pw] |= pattern != "empty" + return True + return False + + def _candidate_positions(self, shape: Tuple[int, int]) -> List[Tuple[int, int]]: + h, w = shape + H, W = self._occ.shape + if H < h or W < w: + return [] + view_shape = (H - h + 1, W - w + 1, h, w) + sub = np.lib.stride_tricks.as_strided(self._occ, view_shape, self._occ.strides * 2) + sums = sub.sum(axis=(2, 3)) + coords = np.argwhere(sums == 0) + self._rng.shuffle(coords) + return [tuple(x) for x in coords] + + def _rand_empty(self) -> Optional[Tuple[int, int]]: + empties = np.flatnonzero(~self._occ) + if not len(empties): + return None + idx = self._rng.integers(0, len(empties)) + return tuple(np.unravel_index(empties[idx], self._occ.shape)) diff --git a/mettagrid/config/room/empty_world.py b/mettagrid/config/room/empty_world.py new file mode 100644 index 00000000..78861e33 --- /dev/null +++ b/mettagrid/config/room/empty_world.py @@ -0,0 +1,87 @@ +""" +EmptyWorld +========== + +Creates an empty rectangular arena—just a solid perimeter wall—with a sampled +number of altars and agents scattered inside. + +Episode‑time sampling +--------------------- +* **width, height** ← uniform integers in `width_range`, `height_range` +* **altars_count** ← uniform integer in `altars_count_range` (default 14 – 40) +""" + +from __future__ import annotations +from typing import Sequence +import numpy as np + +from mettagrid.config.room.room import Room + + +class EmptyWorld(Room): + STYLE_PARAMETERS = {"empty": {"hearts_count": 0}} + + # ------------------------------------------------------------------ # + def __init__( + self, + *, + width_range: Sequence[int] = (60, 130), + height_range: Sequence[int] = (60, 130), + altars_count_range: Sequence[int] = (14, 40), + agents: int | dict = 20, + seed: int | None = None, + border_object: str = "wall", + ): + super().__init__(border_object=border_object) + rng = np.random.default_rng(seed) + + self.width = int(rng.integers(width_range[0], width_range[1] + 1)) + self.height = int(rng.integers(height_range[0], height_range[1] + 1)) + self.altars_count = int( + rng.integers(altars_count_range[0], altars_count_range[1] + 1) + ) + + self._agents = agents + self._rng = rng + + # ------------------------------------------------------------------ # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # ------------------------------------------------------------------ # + def _generate_grid(self) -> np.ndarray: + """Return an object grid with a solid perimeter and empty interior.""" + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Solid outer wall + grid[0, :] = grid[-1, :] = "wall" + grid[:, 0] = grid[:, -1] = "wall" + + self._occ = np.zeros((H, W), dtype=bool) # occupancy mask + self._occ[grid == "wall"] = True + return grid + + # ------------------------------------------------------------------ # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(~self._occ))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + empties = np.flatnonzero(~self._occ) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/giant_maze_world.py b/mettagrid/config/room/giant_maze_world.py new file mode 100644 index 00000000..aaa1836e --- /dev/null +++ b/mettagrid/config/room/giant_maze_world.py @@ -0,0 +1,158 @@ +""" +GiantMazeWorld (v3, continuous) +=============================== + +Builds one perfect maze, then up‑scales it so corridors and walls have +variable thickness **while preserving full connectivity**. + +User‑tunable ranges (sampled each episode): + +* corridor_range – inclusive [min,max] corridor width (≥ 1) +* wall_range – inclusive [min,max] wall thickness (≥ 1) +* altars_count – number of heart altars sprinkled on corridor tiles +""" + +from typing import Optional, Tuple, List +import numpy as np +from mettagrid.config.room.room import Room + + +class GiantMazeWorld(Room): + STYLE_PARAMETERS = {"giant_maze": {"hearts_count": 0}} + + # --------------------------------------------------------------- # + def __init__( + self, + width: int = 121, + height: int = 121, + corridor_range: Tuple[int, int] = (1, 10), + wall_range: Tuple[int, int] = (1, 5), + altars_count: int = 50, + agents: int | dict = 0, + seed: Optional[int] = 42, + border_width: int = 2, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + + width = np.random.randint(width-75,width+15) + height = np.random.randint(height-75,height+15) + + # force odd dims (good for maze generation) + self.base_w = width | 1 + self.base_h = height | 1 + + self.corridor_range = corridor_range + self.wall_range = wall_range + self.altars_count = altars_count + self._agents = agents + + # --------------------------------------------------------------- # + # Public builder + # --------------------------------------------------------------- # + def _build(self): + # sample widths + self.cw = int(self._rng.integers(self.corridor_range[0], + self.corridor_range[1] + 1)) + self.wt = int(self._rng.integers(self.wall_range[0], + self.wall_range[1] + 1)) + + grid = self._generate_maze() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # --------------------------------------------------------------- # + # Maze generation with connectivity‑preserving upscale + # --------------------------------------------------------------- # + def _generate_maze(self) -> np.ndarray: + cw, wt = self.cw, self.wt + cell = cw + wt + # coarse grid size (odd) + small_h = ((self.base_h - 1) // cell) | 1 + small_w = ((self.base_w - 1) // cell) | 1 + + # perfect maze on coarse grid (True=corridor) + coarse = np.full((small_h, small_w), False, dtype=bool) + dirs = [(0, 1), (1, 0), (0, -1), (-1, 0)] + stack = [(1, 1)] + coarse[1, 1] = True + last_dir = None + rng = self._rng + while stack: + r, c = stack[-1] + opts = [ + (dr, dc) + for dr, dc in dirs + if 0 < r + dr * 2 < small_h - 1 + and 0 < c + dc * 2 < small_w - 1 + and not coarse[r + dr * 2, c + dc * 2] + ] + if opts: + if last_dir in opts and rng.random() > 0.3: + dr, dc = last_dir + else: + dr, dc = opts[rng.integers(len(opts))] + coarse[r + dr, c + dc] = True + coarse[r + dr * 2, c + dc * 2] = True + stack.append((r + dr * 2, c + dc * 2)) + last_dir = (dr, dc) + else: + stack.pop() + last_dir = None + + # upscale to full resolution with passages through walls + big_h, big_w = small_h * cell + 1, small_w * cell + 1 + grid = np.full((big_h, big_w), "wall", dtype=object) + + # carve blocks + for sr in range(small_h): + for sc in range(small_w): + if coarse[sr, sc]: + r0, c0 = sr * cell + wt, sc * cell + wt + grid[r0 : r0 + cw, c0 : c0 + cw] = "empty" + + # carve connectors through walls + for sr in range(small_h): + for sc in range(small_w): + if not coarse[sr, sc]: + continue + # right neighbour + if sc + 1 < small_w and coarse[sr, sc + 1]: + r0 = sr * cell + wt + c_bridge = (sc + 1) * cell + grid[r0 : r0 + cw, c_bridge : c_bridge + wt] = "empty" + # bottom neighbour + if sr + 1 < small_h and coarse[sr + 1, sc]: + c0 = sc * cell + wt + r_bridge = (sr + 1) * cell + grid[r_bridge : r_bridge + wt, c0 : c0 + cw] = "empty" + + # occupancy mask + self._height, self._width = big_h, big_w + self._occ = np.zeros((big_h, big_w), dtype=bool) + return grid + + # --------------------------------------------------------------- # + # Altars & agents + # --------------------------------------------------------------- # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(grid == "empty"))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags: List[str] = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + empties = np.flatnonzero((grid == "empty") & (~self._occ)) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/giant_maze_world_varied.py b/mettagrid/config/room/giant_maze_world_varied.py new file mode 100644 index 00000000..c79aa42d --- /dev/null +++ b/mettagrid/config/room/giant_maze_world_varied.py @@ -0,0 +1,132 @@ +""" +GiantMazeWorldVaried +==================== + +Variant of GiantMazeWorld with: + +• Map dimensions sampled between 200 – 400 tiles. +• Main corridor width sampled from corridor_range. +• Fractal‑style side branches carved off main corridors. + +Extra parameters +---------------- +branch_prob – chance an empty tile spawns a branch (0–1) +branch_scale – branch width = int(main_width × branch_scale) +max_branch_length – upper bound on branch length (tiles) +""" + +from __future__ import annotations + +from typing import Sequence + +import numpy as np +from mettagrid.config.room.giant_maze_world import GiantMazeWorld + + +def _sample_int(v: int | Sequence[int], rng: np.random.Generator) -> int: + """Return v if int, else uniformly sample an int in [v[0], v[1]].""" + if isinstance(v, Sequence): + lo, hi = int(v[0]), int(v[1]) + return int(rng.integers(lo, hi + 1)) + return int(v) + + +def _sample_num(v, rng): + """Sample float or int ranges transparently.""" + if isinstance(v, Sequence): + lo, hi = v[0], v[1] + if isinstance(lo, int) and isinstance(hi, int): + return int(rng.integers(int(lo), int(hi) + 1)) + return float(rng.uniform(float(lo), float(hi))) + return v + + +class GiantMazeWorldVaried(GiantMazeWorld): + STYLE_PARAMETERS = {"giant_maze_varied": {"hearts_count": 0}} + + def __init__( + self, + width: int | Sequence[int] = (200, 400), + height: int | Sequence[int] = (200, 400), + corridor_range: Sequence[int] = (1, 10), + wall_range: Sequence[int] = (1, 8), + branch_prob: float = 0.3, + branch_scale: float = 0.5, + max_branch_length: int = 40, + altars_count: int = 50, + agents: int | dict = 0, + seed: int | None = 42, + border_width: int = 2, + border_object: str = "wall", + ): + rng_ext = np.random.default_rng(seed) + width = _sample_int(width, rng_ext) + height = _sample_int(height, rng_ext) + + super().__init__( + width=width, + height=height, + corridor_range=tuple(corridor_range), + wall_range=tuple(wall_range), + altars_count=altars_count, + agents=agents, + seed=int(rng_ext.integers(2**32)), + border_width=border_width, + border_object=border_object, + ) + + self.branch_prob = float(_sample_num(branch_prob, rng_ext)) + self.branch_scale = float(_sample_num(branch_scale, rng_ext)) + self.max_branch_length = int(_sample_num(max_branch_length, rng_ext)) + + # ------------------------------------------------------------------ + # Maze generation with fractal branches + # ------------------------------------------------------------------ + def _generate_maze(self) -> np.ndarray: + grid = super()._generate_maze() # base perfect maze + + branch_w = max(1, int(self.cw * self.branch_scale)) + directions = [(0, 1), (1, 0), (0, -1), (-1, 0)] + H, W = grid.shape + rng = self._rng # inherited RNG + + def wall(r, c): + return 0 <= r < H and 0 <= c < W and grid[r, c] == "wall" + + for r in range(1, H - 1): + for c in range(1, W - 1): + if grid[r, c] != "empty" or rng.random() > self.branch_prob: + continue + + dr, dc = directions[rng.integers(4)] + length = int(rng.integers(3, self.max_branch_length + 1)) + rr, cc = r + dr, c + dc + + for _ in range(length): + if not wall(rr, cc): + break + + grid[rr, cc] = "empty" + + # widen perpendicular + for off in range(1, branch_w): + if dr == 0: # horizontal branch -> widen vertically + for s in (-1, 1): + rw = rr + s * off + if wall(rw, cc): + grid[rw, cc] = "empty" + else: # vertical branch -> widen horizontally + for s in (-1, 1): + cw = cc + s * off + if wall(rr, cw): + grid[rr, cw] = "empty" + + rr += dr + cc += dc + + # 30 % chance to turn + if rng.random() < 0.3: + dr, dc = directions[rng.integers(4)] + + self._occ = np.zeros_like(grid, dtype=bool) + return grid \ No newline at end of file diff --git a/mettagrid/config/room/hex_world.py b/mettagrid/config/room/hex_world.py new file mode 100644 index 00000000..1332d9ca --- /dev/null +++ b/mettagrid/config/room/hex_world.py @@ -0,0 +1,154 @@ +""" +HexWorld +======== + +Generates a *regular tiling* of flat‑top hexagons carved out of a square grid. + +Key properties +-------------- +* **hex_size ∈ [5 .. 10]** (inclusive, forced odd) – edge length of each hex. +* **gap** – fixed corridor width between neighbouring hexes (default 3). +* Every hex is *hollow* with a single doorway at the centre of its **east** side. +* A solid perimeter wall encloses the arena. +* Altars and agents are dropped uniformly in the empty cells. + +Coordinate conventions assume (row, col) indexing. +""" + +from __future__ import annotations +from typing import Sequence +import numpy as np + +from mettagrid.config.room.room import Room + + +# --------------------------------------------------------------------------- # +# Helper: draw a single flat‑top hexagon outline at (r0, c0) with side length s +# --------------------------------------------------------------------------- # +def _draw_hex(grid: np.ndarray, r0: int, c0: int, s: int): + """Draws walls for a hexagon; leaves interior empty. Adds an east‑side door.""" + H, W = grid.shape + + def safe_set(r: int, c: int): + if 0 <= r < H and 0 <= c < W: + grid[r, c] = "wall" + + # Upper half (rows 0 .. s‑1) + for dr in range(s): + row = r0 + dr + span = s + dr + col_start = c0 - dr + col_end = col_start + span - 1 + safe_set(row, col_start) # west edge + safe_set(row, col_end) # east edge + + # Lower half (rows s .. 2s‑1) + for dr in range(s, 2 * s): + row = r0 + dr + span = s + (2 * s - dr - 1) + col_start = c0 - (2 * s - dr - 1) + col_end = col_start + span - 1 + safe_set(row, col_start) + safe_set(row, col_end) + + # Door at centre of east side (upper middle row) + door_r = r0 + s - 1 + door_c = c0 + s + if 0 <= door_r < H and 0 <= door_c < W: + grid[door_r, door_c] = "empty" + + +# --------------------------------------------------------------------------- # +class HexWorld(Room): + """Room builder that tiles the arena with uniform hexagonal cells.""" + + STYLE_PARAMETERS = {"hex_grid": {"hearts_count": 0}} + + # --------------------------------------------------------------- # + def __init__( + self, + *, + width_range: Sequence[int] = (60, 140), + height_range: Sequence[int] = (60, 140), + hex_size_range: Sequence[int] = (5, 10), + gap: int = 3, + altars_count: int = 50, + agents: int | dict = 20, + seed: int | None = None, + border_object: str = "wall", + ): + super().__init__(border_object=border_object) + rng = np.random.default_rng(seed) + + # Sample odd side length for centred doorway + s = rng.integers(hex_size_range[0], hex_size_range[1] + 1) + self.hex_size = s if s % 2 == 1 else (s + 1 if s < hex_size_range[1] else s - 1) + self.gap = gap + + self.width = rng.integers(width_range[0], width_range[1] + 1) + self.height = rng.integers(height_range[0], height_range[1] + 1) + + self.altars_count = altars_count + self._agents = agents + self._rng = rng + + # --------------------------------------------------------------- # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # --------------------------------------------------------------- # + def _generate_grid(self) -> np.ndarray: + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Perimeter walls + grid[0, :] = grid[-1, :] = "wall" + grid[:, 0] = grid[:, -1] = "wall" + + s = self.hex_size + w = 3 * s - 1 # bounding‑box width + h = 2 * s # bounding‑box height + stride_x = w - (s - 1) # horizontal step between hex centres + stride_y = h # vertical step + + offset_y = 1 + s # leave gap from top border + offset_x = 1 + s + + row = offset_y + parity = 0 + while row + s < H - 1: + col = offset_x + parity * (stride_x // 2) + while col + s < W - 1: + _draw_hex(grid, row - (s - 1), col, s) + col += stride_x + row += stride_y + parity ^= 1 # stagger every second row + + # Occupancy mask tracks filled cells for later placement + self._occ = np.zeros((H, W), dtype=bool) + self._occ[grid == "wall"] = True + return grid + + # --------------------------------------------------------------- # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(~self._occ))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + empties = np.flatnonzero(~self._occ) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/homogenous_tile.py b/mettagrid/config/room/homogenous_tile.py new file mode 100644 index 00000000..783ed165 --- /dev/null +++ b/mettagrid/config/room/homogenous_tile.py @@ -0,0 +1,157 @@ +""" +HomogenousTile +============== + +Tiles the arena with identical rooms separated by *single‑cell walls* and +one‑cell corridors. Each episode: + +* Samples **tile_width , tile_height** ∈ [5 .. 10] +* Samples **arena width , height** ∈ [60 .. 130] +* Samples **altars_count** ∈ [14 .. 40] +* Picks a **shape** for every tile interior from + {rectangle, cross, hollow, diagonal}. + +Doors are carved at the centre of every interior wall so the agent can move +freely between tiles. +""" +from __future__ import annotations +from typing import Sequence +import numpy as np +import random + +from mettagrid.config.room.room import Room + + +class HomogenousTile(Room): + STYLE_PARAMETERS = {"homogenous_tile": {"hearts_count": 0}} + + # ------------------------------------------------------------------ # + def __init__( + self, + *, + width_range: Sequence[int] = (60, 130), + height_range: Sequence[int] = (60, 130), + tile_width_range: Sequence[int] = (5, 10), + tile_height_range: Sequence[int] = (5, 10), + altars_count_range: Sequence[int] = (14, 40), + agents: int | dict = 20, + seed: int | None = None, + border_object: str = "wall", + ): + super().__init__(border_object=border_object) + rng = np.random.default_rng(seed) + + self.width = int(rng.integers(width_range[0], width_range[1] + 1)) + self.height = int(rng.integers(height_range[0], height_range[1] + 1)) + + self.tile_w = int(rng.integers(tile_width_range[0], tile_width_range[1] + 1)) + self.tile_h = int(rng.integers(tile_height_range[0], tile_height_range[1] + 1)) + + self.altars_count = int( + rng.integers(altars_count_range[0], altars_count_range[1] + 1) + ) + + self.shape = random.choice(["rectangle", "cross", "hollow", "diagonal"]) + self.gap = 1 # single‑cell corridor + self._agents = agents + self._rng = rng + + # ------------------------------------------------------------------ # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # ------------------------------------------------------------------ # + def _generate_grid(self) -> np.ndarray: + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Perimeter + grid[0, :] = grid[-1, :] = "wall" + grid[:, 0] = grid[:, -1] = "wall" + + tw, th, gap = self.tile_w, self.tile_h, self.gap + stride_x = tw + gap + 1 # tile + corridor + wall + stride_y = th + gap + 1 + + # Draw lattice walls + for c in range(0, W, stride_x): + grid[:, c] = "wall" + for r in range(0, H, stride_y): + grid[r, :] = "wall" + + # Carve interior patterns + for r0 in range(0, H, stride_y): + for c0 in range(0, W, stride_x): + r_start = r0 + 1 # first cell inside north wall + c_start = c0 + 1 + r_end = min(r_start + th, H - 1) + c_end = min(c_start + tw, W - 1) + + if r_end >= H - 1 or c_end >= W - 1: + continue # skip partial tile at border + + # Fill interior with walls, then carve shape + grid[r_start:r_end, c_start:c_end] = "wall" + mid_r = r_start + th // 2 + mid_c = c_start + tw // 2 + + if self.shape == "rectangle": + grid[r_start:r_end, c_start:c_end] = "empty" + + elif self.shape == "cross": + grid[r_start:r_end, mid_c] = "empty" + grid[mid_r, c_start:c_end] = "empty" + + elif self.shape == "hollow": + grid[r_start, c_start:c_end] = "empty" + grid[r_end-1, c_start:c_end] = "empty" + grid[r_start:r_end, c_start] = "empty" + grid[r_start:r_end, c_end-1] = "empty" + + elif self.shape == "diagonal": + for d in range(min(th, tw)): + rr, cc = r_start + d, c_start + d + if rr < r_end and cc < c_end: + grid[rr, cc] = "empty" + + # Carve doors (centred) in vertical walls + for c in range(stride_x, W - 1, stride_x): + for r0 in range(0, H, stride_y): + door_r = r0 + 1 + th // 2 + if door_r < H - 1 and grid[door_r, c] == "wall": + grid[door_r, c] = "empty" + + # Carve doors (centred) in horizontal walls + for r in range(stride_y, H - 1, stride_y): + for c0 in range(0, W, stride_x): + door_c = c0 + 1 + tw // 2 + if door_c < W - 1 and grid[r, door_c] == "wall": + grid[r, door_c] = "empty" + + self._occ = np.zeros((H, W), dtype=bool) + self._occ[grid == "wall"] = True + return grid + + # ------------------------------------------------------------------ # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(~self._occ))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + empties = np.flatnonzero(~self._occ) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/labyrinth_world.py b/mettagrid/config/room/labyrinth_world.py new file mode 100644 index 00000000..b509a108 --- /dev/null +++ b/mettagrid/config/room/labyrinth_world.py @@ -0,0 +1,174 @@ +""" +LabyrinthWorld +============== + +A map densely tiled with elongated mini‑mazes. Each maze … + +* is rectangular and elongated (long side = short side + 3–7 cells). +* has one entrance on a short edge. +* hides a single heart **altar** in the diagonally opposite corner. +* is marked fully “occupied”, so agents are spawned only in the open + corridors **between** labyrinths. + +The builder keeps dropping labyrinths until it records 10 consecutive +placement failures, which means the map is essentially full. +""" + +from typing import List, Optional, Tuple + +import numpy as np + +from mettagrid.config.room.room import Room + + +class LabyrinthWorld(Room): + STYLE_PARAMETERS = { + "labyrinth_world": { + "hearts_count": 0, # altars live inside labyrinths + "labyrinths": {"count": 999}, + }, + } + + # ------------------------------------------------------------------ # + # Initialise + # ------------------------------------------------------------------ # + def __init__( + self, + width: int, + height: int, + agents: int | dict = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + # overall map size chosen randomly if not overridden by YAML + width, height = np.random.randint(80,120), np.random.randint(80,120) + self._width = width + self._height = height + self._agents = agents + self._occ = np.zeros((height, width), dtype=bool) + + # ------------------------------------------------------------------ # + # Public build entry + # ------------------------------------------------------------------ # + def _build(self) -> np.ndarray: + return self._build_world() + + # ------------------------------------------------------------------ # + # World construction + # ------------------------------------------------------------------ # + def _build_world(self) -> np.ndarray: + grid = np.full((self._height, self._width), "empty", dtype=object) + + consecutive_failures = 0 + while consecutive_failures < 10: # pack the map + if self._place_labyrinth(grid, clearance=1): + consecutive_failures = 0 + else: + consecutive_failures += 1 + + return self._place_agents(grid) + + # ------------------------------------------------------------------ # + # Single labyrinth generation / placement + # ------------------------------------------------------------------ # + def _place_labyrinth(self, grid: np.ndarray, clearance: int) -> bool: + pattern = self._generate_labyrinth() + return self._place_region(grid, pattern, clearance) + + def _generate_labyrinth(self) -> np.ndarray: + # --- choose elongated size (smaller) -------------------------- # + short = int(self._rng.integers(4, 8)) # 4–7 + long = int(self._rng.integers(short + 3, short + 12)) # +3–7 + if self._rng.random() < 0.5: # horizontal + h, w, entrance = short, long, (1, 0) + else: # vertical + h, w, entrance = long, short, (0, 1) + altar = (h - 2, w - 2) + + # keep dimensions odd for maze carving + h += h % 2 == 0 + w += w % 2 == 0 + + wind = float(self._rng.random()) # 0 = straight, 1 = twisty + pat = np.full((h, w), "wall", dtype=object) + dirs = [(0, 1), (1, 0), (0, -1), (-1, 0)] + + start = (1, 1) + pat[start] = "empty" + stack = [start] + last_dir: Optional[Tuple[int, int]] = None + + while stack: + r, c = stack[-1] + options = [ + (dr, dc) + for dr, dc in dirs + if 0 < r + dr * 2 < h - 1 and 0 < c + dc * 2 < w - 1 and pat[r + dr * 2, c + dc * 2] == "wall" + ] + if options: + if last_dir in options and self._rng.random() > wind: + dr, dc = last_dir + else: + dr, dc = options[self._rng.integers(len(options))] + pat[r + dr, c + dc] = "empty" + pat[r + dr * 2, c + dc * 2] = "empty" + stack.append((r + dr * 2, c + dc * 2)) + last_dir = (dr, dc) + else: + stack.pop() + last_dir = None + + # carve entrance and place altar + pat[entrance] = "empty" + pat[altar] = "altar" + return pat + + # ------------------------------------------------------------------ # + # Agent placement (outside labyrinths) + # ------------------------------------------------------------------ # + def _place_agents(self, grid: np.ndarray): + agent_tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + for tag in agent_tags: + pos = self._random_empty() + if pos: + grid[pos] = tag + self._occ[pos] = True + return grid + + # ------------------------------------------------------------------ # + # Region placement utilities + # ------------------------------------------------------------------ # + def _place_region(self, grid, pattern: np.ndarray, clearance: int) -> bool: + ph, pw = pattern.shape + for r, c in self._free_windows((ph + 2 * clearance, pw + 2 * clearance)): + # stamp the pattern + grid[r + clearance : r + clearance + ph, c + clearance : c + clearance + pw] = pattern + # mark the *entire rectangle* as occupied so agents stay outside + self._occ[r : r + ph + 2 * clearance, c : c + pw + 2 * clearance] = True + return True + return False + + def _free_windows(self, shape: Tuple[int, int]) -> List[Tuple[int, int]]: + h, w = shape + H, W = self._occ.shape + if h > H or w > W: + return [] + view_shape = (H - h + 1, W - w + 1, h, w) + sub = np.lib.stride_tricks.as_strided(self._occ, view_shape, self._occ.strides * 2) + coords = np.argwhere(sub.sum(axis=(2, 3)) == 0) + self._rng.shuffle(coords) + return [tuple(t) for t in coords] + + def _random_empty(self) -> Optional[Tuple[int, int]]: + empties = np.flatnonzero(~self._occ) + if empties.size == 0: + return None + idx = self._rng.integers(empties.size) + return tuple(np.unravel_index(empties[idx], self._occ.shape)) diff --git a/mettagrid/config/room/memory_box_world.py b/mettagrid/config/room/memory_box_world.py new file mode 100644 index 00000000..ae9d728b --- /dev/null +++ b/mettagrid/config/room/memory_box_world.py @@ -0,0 +1,185 @@ +""" +MemoryBoxWorld (random sizes) +============================= + +• Interior width/height sampled from `size_range` (default 9‑14). +• Entrance gap (3 cells) on a random side. +• Two corridor walls whose length is randomly sampled from + `corridor_range` **but clipped so they always fit** and always ≥ 6. +• A single altar sits in the corner on the same side as the entrance, + opposite the gap. +• Boxes are stamped until 10 consecutive placement failures; agents spawn + outside the boxes. +• Exactly 20 agents are spawned in every episode. +""" + +from typing import List, Optional, Tuple + +import numpy as np +from mettagrid.config.room.room import Room + + +class MemoryBoxWorld(Room): + STYLE_PARAMETERS = {"memory_box": {"hearts_count": 0}} + + def __init__( + self, + width: int = 120, + height: int = 120, + agents: int | dict = 20, + size_range: Tuple[int, int] = (9, 14), # interior side length + corridor_range: Tuple[int, int] = (6, 10), # sampled per box + seed: Optional[int] = None, + border_width: int = 2, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + width, height = np.random.randint(80,120), np.random.randint(80,120) + self._width, self._height = width, height + # Always spawn exactly 20 agents, ignoring caller‑supplied values + self._agents = agents + self._occ = np.zeros((height, width), dtype=bool) + + self.smin, self.smax = size_range + self.cmin, self.cmax = corridor_range + + # ------------------------------------------------------------------ # + @property + def agents(self) -> int: + """Always returns 20 — the fixed number of agents in MemoryBoxWorld.""" + return 20 + + # -------------------- build world -------------------- # + def _build(self): + grid = np.full((self._height, self._width), "empty", dtype=object) + fails = 0 + while fails < 10: + if self._place_box(grid, clearance=1): + fails = 0 + else: + fails += 1 + return self._place_agents(grid) + + # -------------------- box helpers ------------------- # + def _place_box(self, grid, clearance: int) -> bool: + pat = self._generate_box() + return self._place_region(grid, pat, clearance) + + def _generate_box(self) -> np.ndarray: + # interior size + h_int = int(self._rng.integers(self.smin, self.smax + 1)) + w_int = int(self._rng.integers(self.smin, self.smax + 1)) + h, w = h_int + 2, w_int + 2 + pat = np.full((h, w), "wall", dtype=object) + pat[1:-1, 1:-1] = "empty" + + # choose entrance side + side = self._rng.choice(["top", "bottom", "left", "right"]) + + # sample corridor length, clip so it fits and ≥6 + L_max_vert = h_int - 2 + L_max_horz = w_int - 2 + max_len = L_max_vert if side in ("top", "bottom") else L_max_horz + L = int(self._rng.integers(self.cmin, min(self.cmax, max_len) + 1)) + + if side in ("top", "bottom"): + gap_c = int(self._rng.integers(3, w - 3)) + wall_row = 0 if side == "top" else h - 1 + inner_r = 1 if side == "top" else h - 2 + step = 1 if side == "top" else -1 + + # entrance + for dc in (-1, 0, 1): + pat[wall_row, gap_c + dc] = "empty" + # corridor walls + for k in range(L): + r = inner_r + k * step + pat[r, gap_c - 2] = "wall" + pat[r, gap_c + 2] = "wall" + # altar + altar_c = 1 if gap_c > w // 2 else w - 2 + altar = (1, altar_c) if side == "top" else (h - 2, altar_c) + else: # left / right + gap_r = int(self._rng.integers(3, h - 3)) + wall_col = 0 if side == "left" else w - 1 + inner_c = 1 if side == "left" else w - 2 + step = 1 if side == "left" else -1 + + for dr in (-1, 0, 1): + pat[gap_r + dr, wall_col] = "empty" + for k in range(L): + c = inner_c + k * step + pat[gap_r - 2, c] = "wall" + pat[gap_r + 2, c] = "wall" + altar_r = 1 if gap_r > h // 2 else h - 2 + altar = (altar_r, 1) if side == "left" else (altar_r, w - 2) + + pat[altar] = "altar" + return pat + + # ---------------- agent placement -------------- # + def _place_agents(self, grid): + """Place agents so they are roughly uniformly spread out. + + A greedy Poisson‑disk sampler is used: each new agent spawn + must be at least *min_dist* Manhattan units away from every + already‑placed agent. If the constraint cannot be satisfied + after scanning all empties once, any remaining agents are + placed randomly. + """ + min_dist = 10 + empties = np.flatnonzero(~self._occ) + self._rng.shuffle(empties) + + chosen: List[Tuple[int, int]] = [] + H, W = self._occ.shape + for idx in empties.tolist(): + if len(chosen) == self._agents: + break + r, c = divmod(idx, W) + if all(abs(r - rr) + abs(c - cc) >= min_dist for rr, cc in chosen): + chosen.append((r, c)) + + # Fallback: fill any remaining agents without the distance constraint + if len(chosen) < self._agents: + remaining = [divmod(i, W) for i in empties.tolist() + if divmod(i, W) not in chosen] + for pos in remaining[: self._agents - len(chosen)]: + chosen.append(pos) + + # Stamp agents into the grid / occupancy mask + for pos in chosen: + grid[pos] = "agent.agent" + self._occ[pos] = True + + return grid + + # -------------- generic helpers --------------- # + def _place_region(self, grid, pat, clearance: int): + ph, pw = pat.shape + for r, c in self._free_windows((ph + 2 * clearance, pw + 2 * clearance)): + grid[r + clearance : r + clearance + ph, + c + clearance : c + clearance + pw] = pat + self._occ[r : r + ph + 2 * clearance, + c : c + pw + 2 * clearance] = True + return True + return False + + def _free_windows(self, shape): + h, w = shape + H, W = self._occ.shape + if h > H or w > W: + return [] + view = np.lib.stride_tricks.as_strided( + self._occ, (H - h + 1, W - w + 1, h, w), self._occ.strides * 2 + ) + coords = np.argwhere(view.sum(axis=(2, 3)) == 0) + self._rng.shuffle(coords) + return [tuple(t) for t in coords] + + def _rand_empty(self): + empties = np.flatnonzero(~self._occ) + return None if empties.size == 0 else tuple( + np.unravel_index(self._rng.integers(empties.size), self._occ.shape) + ) \ No newline at end of file diff --git a/mettagrid/config/room/square_world.py b/mettagrid/config/room/square_world.py new file mode 100644 index 00000000..1c4b0992 --- /dev/null +++ b/mettagrid/config/room/square_world.py @@ -0,0 +1,107 @@ +""" +SquareWorld +=========== + +Creates a regular lattice of *solid‑wall squares* separated by uniform +corridors: + +* **square_size ∈ [3 .. 7]** ─ edge length of every solid square block +* **gap ∈ [3 .. 6]** ─ corridor width between squares (and between + the squares and the outer perimeter) + +Both parameters are sampled independently each episode. +A solid perimeter wall encloses the arena. +""" + +from __future__ import annotations + +from typing import Sequence +import numpy as np + +from mettagrid.config.room.room import Room + + +class SquareWorld(Room): + STYLE_PARAMETERS = {"square_grid": {"hearts_count": 0}} + + # ------------------------------------------------------------------ # + def __init__( + self, + *, + width_range: Sequence[int] = (60, 140), + height_range: Sequence[int] = (60, 140), + square_size_range: Sequence[int] = (3, 7), + gap_range: Sequence[int] = (3, 6), + altars_count: int = 50, + agents: int | dict = 20, + seed: int | None = 42, + border_object: str = "wall", + ): + super().__init__(border_object=border_object) + rng = np.random.default_rng(seed) + + self.square_size = rng.integers(square_size_range[0], square_size_range[1] + 1) + self.gap = rng.integers(gap_range[0], gap_range[1] + 1) + + self.width = rng.integers(width_range[0], width_range[1] + 1) + self.height = rng.integers(height_range[0], height_range[1] + 1) + + self.altars_count = altars_count + self._agents = agents + self._rng = rng + + # ------------------------------------------------------------------ # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # ------------------------------------------------------------------ # + def _generate_grid(self) -> np.ndarray: + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Solid perimeter + grid[0, :] = "wall" + grid[-1, :] = "wall" + grid[:, 0] = "wall" + grid[:, -1] = "wall" + + s = self.square_size + g = self.gap + + # Tile solid squares across the arena + row = g + 1 + while row + s < H - 1: + col = g + 1 + while col + s < W - 1: + grid[row : row + s, col : col + s] = "wall" + col += s + g + row += s + g + + # Occupancy mask for later placement + self._occ = np.zeros((H, W), dtype=bool) + return grid + + # ------------------------------------------------------------------ # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(grid == "empty"))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + + empties = np.flatnonzero((grid == "empty") & (~self._occ)) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/terrain_from_numpy.py b/mettagrid/config/room/terrain_from_numpy.py new file mode 100644 index 00000000..bbf2b0a3 --- /dev/null +++ b/mettagrid/config/room/terrain_from_numpy.py @@ -0,0 +1,129 @@ +import os +import random +import time +import zipfile + +import boto3 +import numpy as np +from botocore.exceptions import NoCredentialsError +from filelock import FileLock + +from mettagrid.config.room.room import Room + + +def safe_load(path, retries=5, delay=1.0): + for attempt in range(retries): + try: + return np.load(path, allow_pickle=True) + except ValueError: + if attempt < retries - 1: + time.sleep(delay) + continue + raise + + +def download_from_s3(s3_path: str, save_path: str, location: str = "us-east-1"): + if not s3_path.startswith("s3://"): + raise ValueError(f"Invalid S3 path: {s3_path}. Must start with s3://") + + s3_parts = s3_path[5:].split("/", 1) + if len(s3_parts) < 2: + raise ValueError(f"Invalid S3 path: {s3_path}. Must be in format s3://bucket/path") + + bucket = s3_parts[0] + key = s3_parts[1] + + try: + # Create directory if it doesn't exist + os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True) + # Download the file directly to disk + s3_client = boto3.client("s3") + s3_client.download_file(Bucket=bucket, Key=key, Filename=save_path) + print(f"Successfully downloaded s3://{bucket}/{key} to {save_path}") + + except NoCredentialsError as e: + raise e + except Exception as e: + raise e + + +class TerrainFromNumpy(Room): + """ + This class is used to load a terrain environment from numpy arrays on s3 + + These maps each have 10 agents in them . + """ + + def __init__( + self, dir, border_width: int = 0, border_object: str = "wall", num_agents: int = 10, generators: bool = False + ): + zipped_dir = dir + ".zip" + lock_path = zipped_dir + ".lock" + # Only one process can hold this lock at a time: + with FileLock(lock_path): + if not os.path.exists(dir) and not os.path.exists(zipped_dir): + s3_path = f"s3://softmax-public/maps/{zipped_dir}" + download_from_s3(s3_path, zipped_dir) + if not os.path.exists(dir) and os.path.exists(zipped_dir): + with zipfile.ZipFile(zipped_dir, "r") as zip_ref: + zip_ref.extractall(os.path.dirname(dir)) + + # if not os.path.exists(dir) and not os.path.exists(zipped_dir): + # s3_path = f"s3://softmax-public/maps/{zipped_dir}" + # download_from_s3(s3_path, zipped_dir) + # if not os.path.exists(dir) and os.path.exists(zipped_dir): + # with zipfile.ZipFile(zipped_dir, "r") as zip_ref: + # zip_ref.extractall(os.path.dirname(dir)) + self.files = os.listdir(dir) + self.dir = dir + self.num_agents = num_agents + self.generators = generators + super().__init__(border_width=border_width, border_object=border_object) + + def get_valid_positions(self, level): + valid_positions = [] + for i in range(1, level.shape[0] - 1): + for j in range(1, level.shape[1] - 1): + if level[i, j] == "empty": + # Check if position is accessible from at least one direction + if ( + level[i - 1, j] == "empty" + or level[i + 1, j] == "empty" + or level[i, j - 1] == "empty" + or level[i, j + 1] == "empty" + ): + valid_positions.append((i, j)) + return valid_positions + + def _build(self): + # TODO: add some way of sampling + uri = np.random.choice(self.files) + level = safe_load(f"{self.dir}/{uri}") + + # remove agents to then repopulate + agents = level == "agent.agent" + level[agents] = "empty" + + valid_positions = self.get_valid_positions(level) + positions = random.sample(valid_positions, self.num_agents) + for pos in positions: + level[pos] = "agent.agent" + + area = level.shape[0] * level.shape[1] + num_hearts = area // random.randint(66, 180) + # Find valid empty spaces surrounded by empty + valid_positions = self.get_valid_positions(level) + + # Randomly place hearts in valid positions + positions = random.sample(valid_positions, min(num_hearts, len(valid_positions))) + for pos in positions: + level[pos] = "altar" + + if self.generators: + num_mines = area // random.randint(66, 180) + valid_positions = self.get_valid_positions(level) + positions = random.sample(valid_positions, min(num_mines, len(valid_positions))) + for pos in positions: + level[pos] = "generator" + self._level = level + return self._level diff --git a/mettagrid/config/room/varied_obstacle_shapes.py b/mettagrid/config/room/varied_obstacle_shapes.py new file mode 100644 index 00000000..db14891f --- /dev/null +++ b/mettagrid/config/room/varied_obstacle_shapes.py @@ -0,0 +1,130 @@ +from typing import Optional +import numpy as np +from omegaconf import DictConfig + +from mettagrid.config.room.room import Room + +class VariedObstacleShapes(Room): + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall" + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + + # Define a list of obstacle patterns with varying numbers of wall blocks (from 1 up to 7) + self.obstacle_patterns = [ + np.array([["wall"]]), # 1 block + np.array([["wall", "wall"]]), # 2 blocks (horizontal line) + np.array([["wall", "wall", "wall"]]), # 3 blocks (horizontal line) + np.array([["wall", "wall"], + ["wall", "wall"]]), # 4 blocks (2x2 square) + np.array([["empty", "wall", "empty"], + ["wall", "wall", "wall"], + ["empty", "wall", "empty"]]), # 5 blocks (cross) + np.array([["wall", "wall", "wall"], + ["wall", "empty", "empty"], + ["wall", "wall", "empty"]]), # 6 blocks (L-shaped variant) + np.array([["wall", "wall", "empty"], + ["empty", "wall", "wall"], + ["wall", "wall", "wall"]]) # 7 blocks (zigzag) + ] + + def _build(self): + # Prepare agent symbols + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + # Adjust object counts if total exceeds 2/3 of room area + total_objects = sum(self._objects.values()) + len(agents) + area = self._width * self._height + while total_objects > 2 * area / 3: + for obj_name in self._objects: + self._objects[obj_name] = max(1, self._objects[obj_name] // 2) + total_objects = sum(self._objects.values()) + len(agents) + + # Create an empty grid + grid = np.full((self._height, self._width), "empty", dtype=object) + + # Use the "wall" count to determine the number of obstacles to place + obstacle_count = self._objects.get("wall", 0) + grid = self._place_obstacles(grid, obstacle_count) + + # Place remaining objects (excluding walls) in random empty cells + for obj, count in self._objects.items(): + if obj == "wall": + continue + for _ in range(count): + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + raise ValueError("No empty space available for object placement.") + pos_idx = self._rng.integers(0, len(empty_positions)) + pos = empty_positions[pos_idx] + grid[pos[0], pos[1]] = obj + + # Place agents in remaining empty cells + for agent in agents: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + raise ValueError("No empty space available for agent placement.") + pos_idx = self._rng.integers(0, len(empty_positions)) + pos = empty_positions[pos_idx] + grid[pos[0], pos[1]] = agent + + return grid + + def _place_obstacles(self, grid, obstacle_count: int): + """ + Places composite obstacle objects in the grid. For each obstacle, a pattern is chosen randomly from a list + of shapes (each with a different number of wall blocks, from 1 to 7). The pattern is then placed with a one-cell + clearance around it to ensure traversability. + """ + placed = 0 + attempts = 0 + height, width = grid.shape + + while placed < obstacle_count and attempts < obstacle_count * 100: + attempts += 1 + # Randomly select one of the obstacle patterns + pattern = self.obstacle_patterns[self._rng.integers(0, len(self.obstacle_patterns))] + p_h, p_w = pattern.shape + + # Set clearance (one cell on each side) + clearance = 1 + effective_h = p_h + 2 * clearance + effective_w = p_w + 2 * clearance + + if height - effective_h < 0 or width - effective_w < 0: + break + + # Choose a random top-left coordinate for the effective region + r = self._rng.integers(0, height - effective_h + 1) + c = self._rng.integers(0, width - effective_w + 1) + + # Check if the effective region is completely empty + region = grid[r:r+effective_h, c:c+effective_w] + if np.all(region == "empty"): + # Place the pattern in the center of the effective region + start_r = r + clearance + start_c = c + clearance + grid[start_r:start_r+p_h, start_c:start_c+p_w] = pattern + placed += 1 + + if placed < obstacle_count: + print(f"Warning: Only placed {placed} out of {obstacle_count} obstacles after {attempts} attempts.") + + return grid \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain.py b/mettagrid/config/room/varied_terrain.py index 376234fc..6e9c6e15 100644 --- a/mettagrid/config/room/varied_terrain.py +++ b/mettagrid/config/room/varied_terrain.py @@ -77,6 +77,17 @@ class VariedTerrain(Room): "blocks": {"count": 8}, "clumpiness": 5, }, + # New style: maze-like with predominant labyrinth features. + "maze": { + "hearts_count": 25, # Altars placed after obstacles; keeps the grid sparse for maze corridors. + "large_obstacles": {"size_range": [10, 25], "count": 0}, # Disable large obstacles. + "small_obstacles": {"size_range": [3, 6], "count": 0}, # Disable small obstacles. + "crosses": {"count": 0}, # No cross obstacles. + "labyrinths": {"count": 10}, # Increase labyrinth count to generate more maze segments. + "scattered_walls": {"count": 0}, # Avoid adding extra walls that could break up maze consistency. + "blocks": {"count": 0}, # No rectangular blocks. + "clumpiness": 0, # Clumpiness is not necessary when only labyrinths are used. + }, } def __init__( @@ -91,6 +102,9 @@ def __init__( style: str = "balanced", ): super().__init__(border_width=border_width, border_object=border_object) + + width = np.random.randint(40, 100) + height = np.random.randint(40, 100) self._rng = np.random.default_rng(seed) self._width = width self._height = height @@ -342,10 +356,8 @@ def _generate_cross_pattern(self) -> np.ndarray: def _generate_labyrinth_pattern(self) -> np.ndarray: # Choose dimensions between 11 and 13, then clamp to 11 and force odd. - h = int(self._rng.integers(11, 14)) - w = int(self._rng.integers(11, 14)) - h = 11 if h > 11 else h - w = 11 if w > 11 else w + h = int(self._rng.integers(11, 26)) + w = int(self._rng.integers(11, 26)) if h % 2 == 0: h -= 1 if w % 2 == 0: @@ -373,18 +385,6 @@ def _generate_labyrinth_pattern(self) -> np.ndarray: else: stack.pop() - # Apply thickening based on a random probability between 0.3 and 1.0. - thick_prob = 0.3 + 0.7 * self._rng.random() - maze_thick = maze.copy() - for i in range(1, h - 1): - for j in range(1, w - 1): - if maze[i, j] == "empty": - if self._rng.random() < thick_prob and j + 1 < w: - maze_thick[i, j + 1] = "empty" - if self._rng.random() < thick_prob and i + 1 < h: - maze_thick[i + 1, j] = "empty" - maze = maze_thick - # Ensure each border has at least two contiguous empty cells. if w > 3 and not self._has_gap(maze[0, 1 : w - 1]): maze[0, 1:3] = "empty" @@ -398,8 +398,20 @@ def _generate_labyrinth_pattern(self) -> np.ndarray: # Scatter hearts in empty cells with 30% probability. for i in range(h): for j in range(w): - if maze[i, j] == "empty" and self._rng.random() < 0.3: - maze[i, j] = "heart" + if maze[i, j] == "empty" and self._rng.random() < 0.05: + maze[i, j] = "altar" + + # Apply thickening based on a random probability between 0.3 and 1.0. + thick_prob = 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j + 1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i + 1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick return maze def _has_gap(self, line: np.ndarray) -> bool: diff --git a/mettagrid/config/room/varied_terrain_cylinder.py b/mettagrid/config/room/varied_terrain_cylinder.py new file mode 100644 index 00000000..abff5f63 --- /dev/null +++ b/mettagrid/config/room/varied_terrain_cylinder.py @@ -0,0 +1,138 @@ +""" +VariedTerrainCylinder +--------------------- +A room builder that fills the map with disjoint *cylinders*. + +A *cylinder* is two parallel walls (length 4‑14) separated by a 1‑ or 2‑cell gap. +An altar sits dead‑centre in that gap. Cylinders may be vertical or horizontal. +By default they’re vertical so you can visually verify things; set +orientation="horizontal" or "random" when you like. +""" + +from typing import Tuple, List, Optional +import numpy as np +from mettagrid.config.room.room import Room + + +class VariedTerrainCylinder(Room): + """Scatter as many cylinders as will fit, then drop agents.""" + + def __init__( + self, + width: int, + height: int, + agents: int | dict = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + orientation: str = "vertical", # "vertical", "horizontal", or "random" + style: str = "cylinder_world", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._agents = agents + self._orientation = orientation + + if style != "cylinder_world": + raise ValueError( + f"Only style 'cylinder_world' is implemented here (got '{style}')." + ) + + # Tracks which cells are busy. + self._occupancy = np.zeros((height, width), dtype=bool) + + # ------------------------------------------------------------------ # + # Public build + # ------------------------------------------------------------------ # + def _build(self) -> np.ndarray: + grid = np.full((self._height, self._width), "empty", dtype=object) + + # 1. Scatter cylinders until no room remains. + while True: + pattern = self._generate_cylinder_pattern() + if not self._place_candidate_region(grid, pattern, clearance=1): + break + + # 2. Drop agents. + for agent in self._make_agent_list(): + pos = self._choose_random_empty() + if pos is None: + break + r, c = pos + grid[r, c] = agent + self._occupancy[r, c] = True + + return grid + + # ------------------------------------------------------------------ # + # Cylinder generation + # ------------------------------------------------------------------ # + def _generate_cylinder_pattern(self) -> np.ndarray: + length = int(self._rng.integers(4, 15)) # 4‑14 + gap = int(self._rng.integers(1, 3)) # 1‑2 + + ori = self._orientation + if ori == "random": + ori = "vertical" if self._rng.random() < 0.5 else "horizontal" + + if ori == "vertical": + h, w = length, gap + 2 + pattern = np.full((h, w), "empty", dtype=object) + pattern[:, 0] = pattern[:, -1] = "wall" + pattern[h // 2, 1 + gap // 2] = "altar" + else: # horizontal + h, w = gap + 2, length + pattern = np.full((h, w), "empty", dtype=object) + pattern[0, :] = pattern[-1, :] = "wall" + pattern[1 + gap // 2, w // 2] = "altar" + + return pattern + + # ------------------------------------------------------------------ # + # Helpers (fast occupancy ops) + # ------------------------------------------------------------------ # + def _make_agent_list(self) -> List[str]: + if isinstance(self._agents, int): + return ["agent.agent"] * self._agents + return [ + "agent." + name + for name, n in self._agents.items() + for _ in range(n) + ] + + def _update_occupancy(self, top_left: Tuple[int, int], pattern: np.ndarray) -> None: + r, c = top_left + ph, pw = pattern.shape + self._occupancy[r : r + ph, c : c + pw] |= (pattern != "empty") + + def _find_candidates(self, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + rh, rw = region_shape + H, W = self._occupancy.shape + if rh > H or rw > W: + return [] + shape = (H - rh + 1, W - rw + 1, rh, rw) + strides = self._occupancy.strides * 2 + sub = np.lib.stride_tricks.as_strided(self._occupancy, shape=shape, strides=strides) + sums = sub.sum(axis=(2, 3)) + return [tuple(idx) for idx in np.argwhere(sums == 0)] + + def _place_candidate_region( + self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0 + ) -> bool: + ph, pw = pattern.shape + cand = self._find_candidates((ph + 2 * clearance, pw + 2 * clearance)) + if not cand: + return False + r, c = cand[self._rng.integers(0, len(cand))] + grid[r + clearance : r + clearance + ph, c + clearance : c + clearance + pw] = pattern + self._update_occupancy((r + clearance, c + clearance), pattern) + return True + + def _choose_random_empty(self) -> Optional[Tuple[int, int]]: + empties = np.flatnonzero(~self._occupancy) + if not len(empties): + return None + idx = self._rng.integers(0, len(empties)) + return np.unravel_index(empties[idx], self._occupancy.shape) \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_diverse.py b/mettagrid/config/room/varied_terrain_diverse.py new file mode 100644 index 00000000..ff015fad --- /dev/null +++ b/mettagrid/config/room/varied_terrain_diverse.py @@ -0,0 +1,417 @@ +""" +This file defines the VariedTerrainDiverse environment. +It creates a grid world with configurable features including: + - Large obstacles and small obstacles: randomly generated, connected shapes. + - Cross obstacles: cross-shaped patterns. + - Mini labyrinths: maze-like structures (≈11×11) with passages thickened probabilistically, + and with at least two-cell gaps along the borders; empty cells may be replaced with "heart". + - Scattered single walls: individual wall cells placed at random empty cells. + - Blocks: new rectangular objects whose width and height are sampled uniformly between 2 and 14. + The number of blocks (block count) is sampled from 0 to 10. + - Altars: objects are placed with a configurable number of hearts. + - A clumpiness factor that biases object placement. +All objects are placed with at least a one-cell clearance and if no space is found for a new object, placement is skipped. +The build order is: + mini labyrinths → obstacles (large, small, crosses) → scattered walls → blocks → remaining objects → agents. +""" + +from typing import Optional, Tuple, List +import numpy as np +from omegaconf import DictConfig +from mettagrid.config.room.room import Room + +class VariedTerrainDiverse(Room): + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + occupancy_threshold: float = 0.66, # maximum fraction of grid cells to occupy + **kwargs # Extra parameters + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + self._occupancy_threshold = occupancy_threshold + + # Obstacle parameters. + self._large_obstacles = kwargs.pop("large_obstacles", {"size_range": [10, 25], "count": 0}) + self._small_obstacles = kwargs.pop("small_obstacles", {"size_range": [3, 6], "count": 0}) + self._crosses = kwargs.pop("crosses", {"count": 0}) + # Altars: hearts_count parameter overrides the count for altars. + self._hearts_count = kwargs.pop("hearts_count", 50) + self._clumpiness = kwargs.pop("clumpiness", 0) + # Labyrinths: number of mini labyrinths. + self._labyrinths = kwargs.pop("labyrinths", {"count": 0}) + # Scattered single walls. + self._scattered_walls = kwargs.pop("scattered_walls", {"count": 0}) + # NEW: Blocks – rectangular objects. + self._blocks = kwargs.pop("blocks", {"count": 0}) + + def _build(self) -> np.ndarray: + # Prepare agent symbols. + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, dict) or isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + # Pre-scale objects if overall occupancy might exceed threshold. + area = self._width * self._height + total_objects = sum(self._objects.values()) + len(agents) + if total_objects > self._occupancy_threshold * area: + scale = (self._occupancy_threshold * area) / total_objects + for obj in self._objects: + if self._objects[obj] > 0: + self._objects[obj] = max(1, int(self._objects[obj] * scale)) + + # Create an empty grid. + grid = np.full((self._height, self._width), "empty", dtype=object) + + # Place features in order. + grid = self._place_labyrinths(grid) + grid = self._place_all_obstacles(grid) + grid = self._place_scattered_walls(grid) + grid = self._place_blocks(grid) + for obj, count in self._objects.items(): + if obj == "altar": + count = self._hearts_count + for _ in range(count): + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for object placement.") + grid[pos[0], pos[1]] = obj + for agent in agents: + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for agent placement.") + grid[pos[0], pos[1]] = agent + + return grid + + # --------------------------- + # Helper Functions + # --------------------------- + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + p_h, p_w = pattern.shape + eff_h, eff_w = p_h + 2*clearance, p_w + 2*clearance + candidates = self._find_candidates(grid, (eff_h, eff_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance:r+clearance+p_h, c+clearance:c+clearance+p_w] = pattern + return True + return False + + # --------------------------- + # Placement Routines + # --------------------------- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Place large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Place small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Place cross obstacles (no extra clearance). + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + """ + Places rectangular blocks on the grid. + Each block's width and height are sampled uniformly between 2 and 14. + The number of blocks is determined by self._blocks["count"] (0 to 10). + A candidate region of the sampled dimensions must be completely empty. + If no such region exists, placement of that block is skipped. + """ + block_count = self._blocks.get("count", 0) + h, w = grid.shape + for _ in range(block_count): + block_w = self._rng.integers(2, 15) # [2,14] + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + # --------------------------- + # Pattern Generation Functions + # --------------------------- + def _generate_random_shape(self, num_blocks: int) -> np.ndarray: + shape_cells = {(0, 0)} + while len(shape_cells) < num_blocks: + candidates = [] + for (r, c) in shape_cells: + for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]: + candidate = (r + dr, c + dc) + if candidate not in shape_cells: + candidates.append(candidate) + if not candidates: + break + new_cell = candidates[self._rng.integers(0, len(candidates))] + shape_cells.add(new_cell) + min_r = min(r for r, _ in shape_cells) + min_c = min(c for _, c in shape_cells) + max_r = max(r for r, _ in shape_cells) + max_c = max(c for _, c in shape_cells) + pattern = np.full((max_r - min_r + 1, max_c - min_c + 1), "empty", dtype=object) + for r, c in shape_cells: + pattern[r - min_r, c - min_c] = "wall" + return pattern + + def _generate_cross_pattern(self) -> np.ndarray: + cross_w = self._rng.integers(1, 9) + cross_h = self._rng.integers(1, 9) + pattern = np.full((cross_h, cross_w), "empty", dtype=object) + center_row = cross_h // 2 + center_col = cross_w // 2 + pattern[center_row, :] = "wall" + pattern[:, center_col] = "wall" + return pattern + + def _generate_labyrinth_pattern(self) -> np.ndarray: + # Choose dimensions between 11 and 13, then clamp to 11 and force odd. + h = int(self._rng.integers(11, 14)) + w = int(self._rng.integers(11, 14)) + h = 11 if h > 11 else h + w = 11 if w > 11 else w + if h % 2 == 0: h -= 1 + if w % 2 == 0: w -= 1 + + maze = np.full((h, w), "wall", dtype=object) + start = (1, 1) + maze[start] = "empty" + stack = [start] + directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] + while stack: + r, c = stack[-1] + neighbors = [] + for dr, dc in directions: + nr, nc = r + dr, c + dc + if 0 <= nr < h and 0 <= nc < w and maze[nr, nc] == "wall": + neighbors.append((nr, nc)) + if neighbors: + next_cell = neighbors[self._rng.integers(0, len(neighbors))] + nr, nc = next_cell + wall_r, wall_c = r + (nr - r) // 2, c + (nc - c) // 2 + maze[wall_r, wall_c] = "empty" + maze[nr, nc] = "empty" + stack.append(next_cell) + else: + stack.pop() + + # Apply thickening based on a random probability between 0.3 and 1.0. + thick_prob = 0.3 + 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j + 1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i + 1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick + + # Ensure each border has at least two contiguous empty cells. + if w > 3 and not self._has_gap(maze[0, 1:w-1]): + maze[0, 1:3] = "empty" + if w > 3 and not self._has_gap(maze[h - 1, 1:w-1]): + maze[h - 1, 1:3] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, 0]): + maze[1:3, 0] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, w - 1]): + maze[1:3, w - 1] = "empty" + + # Scatter hearts in empty cells with 30% probability. + for i in range(h): + for j in range(w): + if maze[i, j] == "empty" and self._rng.random() < 0.3: + maze[i, j] = "heart" + return maze + + def _has_gap(self, line: np.ndarray) -> bool: + contiguous = 0 + for cell in line: + contiguous = contiguous + 1 if cell == "empty" else 0 + if contiguous >= 2: + return True + return False + + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Place large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Place small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Place cross obstacles. + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + """ + Places rectangular block objects on the grid. + For each block, the width and height are sampled uniformly between 2 and 14. + The number of blocks is determined by self._blocks["count"]. + The block is placed in a candidate region that is completely empty. + """ + block_count = self._blocks.get("count", 0) + for _ in range(block_count): + block_w = self._rng.integers(2, 15) # 2 to 14 inclusive. + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + p_h, p_w = pattern.shape + eff_shape = (p_h + 2 * clearance, p_w + 2 * clearance) + candidates = self._find_candidates(grid, eff_shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance:r+clearance+p_h, c+clearance:c+clearance+p_w] = pattern + return True + return False + + def _choose_random_empty_region(self, grid: np.ndarray, region_shape: Tuple[int,int]) -> Optional[Tuple[int,int]]: + candidates = self._find_candidates(grid, region_shape) + if candidates: + return candidates[self._rng.integers(0, len(candidates))] + return None + +# End of VariedTerrainDiverse class implementation \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_diverse_style.py b/mettagrid/config/room/varied_terrain_diverse_style.py new file mode 100644 index 00000000..d577bbc0 --- /dev/null +++ b/mettagrid/config/room/varied_terrain_diverse_style.py @@ -0,0 +1,474 @@ +""" +This file defines the VariedTerrainDiverseStyle environment. +It creates a grid world with a variety of obstacles and objects: + - Large obstacles and small obstacles: random connected shapes. + - Cross obstacles: cross-shaped patterns. + - Mini labyrinths: maze-like structures (~11×11) generated via recursive backtracking, + with passages thickened probabilistically and with border gaps of at least two cells; + empty cells may be replaced with "heart" with ~30% chance. + - Scattered single walls: randomly scattered wall cells. + - Blocks: new rectangular objects with width and height sampled uniformly between 2 and 14. + - Altars: objects with a reward count, overridden by hearts_count. + - A clumpiness factor that biases placement. +Extra style parameters, in particular "domain", are accepted to override distributions. +The build order is: + mini labyrinths → obstacles (large, small, crosses) → scattered walls → blocks → remaining objects → agents. +If no space is found for an object, placement is skipped (for agents, a fallback places them in the center). +""" + +from typing import Optional, Tuple, List +import numpy as np +from omegaconf import DictConfig +from mettagrid.config.room.room import Room + +class VariedTerrainDiverseStyle(Room): + # Predefined style overrides keyed by domain. + _STYLE_OVERRIDES = { + "sparse_cityscape": { + "blocks": {"count": (25, 35)}, + "large_obstacles": {"count": (1, 3)}, + "small_obstacles": {"count": (1, 3)}, + "crosses": {"count": (1, 3)}, + "labyrinths": {"count": 0}, + "scattered_walls": {"count": (1, 6)}, + "hearts_count": (80, 100), + "clumpiness": (0, 1) + }, + "ancient_ruins": { + "blocks": {"count": (0, 3)}, + "large_obstacles": {"count": (3, 6)}, + "small_obstacles": {"count": (3, 6)}, + "crosses": {"count": (1, 3)}, + "labyrinths": {"count": (1, 2)}, + "scattered_walls": {"count": (3, 6)}, + "hearts_count": (70, 90), + "clumpiness": (2, 3) + }, + "cross_forest": { + "blocks": {"count": (3, 8)}, + "large_obstacles": {"count": (1, 3)}, + "small_obstacles": {"count": (1, 3)}, + "crosses": {"count": (5, 8)}, # many crosses + "labyrinths": {"count": (0, 1)}, + "scattered_walls": {"count": (1, 4)}, + "hearts_count": (60, 80), + "clumpiness": (1, 2) + }, + "jungle": { + "blocks": {"count": (0, 10)}, + "large_obstacles": {"count": (0, 10)}, + "small_obstacles": {"count": (0, 10)}, + "crosses": {"count": (0, 10)}, + "labyrinths": {"count": (0, 10)}, + "scattered_walls": {"count": (0, 10)}, + "hearts_count": (0, 150), + "clumpiness": (0, 5) + }, + "desert_adventure": { + "blocks": {"count": (0, 3)}, + "large_obstacles": {"count": (0, 1)}, + "small_obstacles": {"count": (0, 1)}, + "crosses": {"count": (0, 1)}, + "labyrinths": {"count": (8, 12)}, + "scattered_walls": {"count": (0, 3)}, + "hearts_count": (40, 60), + "clumpiness": (0, 2) + }, + "foreign_planet": { + "blocks": {"count": (5, 10)}, + "large_obstacles": {"count": (3, 7)}, + "small_obstacles": {"count": (3, 7)}, + "crosses": {"count": (3, 7)}, + "labyrinths": {"count": (2, 4)}, + "scattered_walls": {"count": (3, 7)}, + "hearts_count": (50, 70), + "clumpiness": (1, 3) + } + } + + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + occupancy_threshold: float = 0.66, + **kwargs # Extra parameters including domain + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + self._occupancy_threshold = occupancy_threshold + + # Optional style parameter: domain. + self._domain = kwargs.pop("domain", None) + + # Obstacle parameters. + self._large_obstacles = kwargs.pop("large_obstacles", {"size_range": [10, 25], "count": 0}) + self._small_obstacles = kwargs.pop("small_obstacles", {"size_range": [3, 6], "count": 0}) + self._crosses = kwargs.pop("crosses", {"count": 0}) + self._hearts_count = kwargs.pop("hearts_count", 50) + self._clumpiness = kwargs.pop("clumpiness", 0) + self._labyrinths = kwargs.pop("labyrinths", {"count": 0}) + self._scattered_walls = kwargs.pop("scattered_walls", {"count": 0}) + self._blocks = kwargs.pop("blocks", {"count": 0}) + + self._apply_style_overrides() + + def _apply_style_overrides(self): + if self._domain: + style = self._STYLE_OVERRIDES.get(self._domain) + if style: + # Override each parameter if provided as a range (tuple) or a constant. + if "blocks" in style: + val = style["blocks"]["count"] + self._blocks["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "large_obstacles" in style: + val = style["large_obstacles"]["count"] + self._large_obstacles["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "small_obstacles" in style: + val = style["small_obstacles"]["count"] + self._small_obstacles["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "crosses" in style: + val = style["crosses"]["count"] + self._crosses["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "labyrinths" in style: + self._labyrinths["count"] = style["labyrinths"]["count"] if isinstance(style["labyrinths"]["count"], int) else int(style["labyrinths"]["count"]) + if "scattered_walls" in style: + val = style["scattered_walls"]["count"] + self._scattered_walls["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "hearts_count" in style: + val = style["hearts_count"] + self._hearts_count = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "clumpiness" in style: + val = style["clumpiness"] + self._clumpiness = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + + def _build(self) -> np.ndarray: + # Ensure one agent per room (map builder should create one agent per room). + if isinstance(self._agents, int) and self._agents != 1: + self._agents = 1 + elif isinstance(self._agents, DictConfig): + # If agent counts are provided as a dict, override with 1 per room. + for key in self._agents: + self._agents[key] = 1 + + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + area = self._width * self._height + total_objects = sum(self._objects.values()) + len(agents) + if total_objects > self._occupancy_threshold * area: + scale = (self._occupancy_threshold * area) / total_objects + for obj in self._objects: + if self._objects[obj] > 0: + self._objects[obj] = max(1, int(self._objects[obj] * scale)) + grid = np.full((self._height, self._width), "empty", dtype=object) + + grid = self._place_labyrinths(grid) + grid = self._place_all_obstacles(grid) + grid = self._place_scattered_walls(grid) + grid = self._place_blocks(grid) + for obj, count in self._objects.items(): + if obj == "altar": + count = self._hearts_count + for _ in range(count): + pos = self._choose_random_empty(grid) + if pos is not None: + grid[pos[0], pos[1]] = obj + else: + print(f"Warning: No empty space available for object {obj}. Skipping placement.") + for agent in agents: + pos = self._choose_random_empty(grid) + if pos is not None: + grid[pos[0], pos[1]] = agent + else: + # Fallback placement: center of grid. + center = (self._height // 2, self._width // 2) + grid[center[0], center[1]] = agent + print("Warning: No empty space found for agent; forced placement at center.") + return grid + + # --- Helper Functions --- + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + p_h, p_w = pattern.shape + eff_shape = (p_h + 2 * clearance, p_w + 2 * clearance) + candidates = self._find_candidates(grid, eff_shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance:r+clearance+p_h, c+clearance:c+clearance+p_w] = pattern + return True + return False + + # --- Placement Routines --- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Cross obstacles. + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + block_count = self._blocks.get("count", 0) + for _ in range(block_count): + block_w = self._rng.integers(2, 15) + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + # --- Pattern Generation Functions --- + def _generate_random_shape(self, num_blocks: int) -> np.ndarray: + shape_cells = {(0, 0)} + while len(shape_cells) < num_blocks: + candidates = [] + for (r, c) in shape_cells: + for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]: + candidate = (r + dr, c + dc) + if candidate not in shape_cells: + candidates.append(candidate) + if not candidates: + break + new_cell = candidates[self._rng.integers(0, len(candidates))] + shape_cells.add(new_cell) + min_r = min(r for r, _ in shape_cells) + min_c = min(c for _, c in shape_cells) + max_r = max(r for r, _ in shape_cells) + max_c = max(c for _, c in shape_cells) + pattern = np.full((max_r - min_r + 1, max_c - min_c + 1), "empty", dtype=object) + for r, c in shape_cells: + pattern[r - min_r, c - min_c] = "wall" + return pattern + + def _generate_cross_pattern(self) -> np.ndarray: + cross_w = self._rng.integers(1, 9) + cross_h = self._rng.integers(1, 9) + pattern = np.full((cross_h, cross_w), "empty", dtype=object) + center_row = cross_h // 2 + center_col = cross_w // 2 + pattern[center_row, :] = "wall" + pattern[:, center_col] = "wall" + return pattern + + def _generate_labyrinth_pattern(self) -> np.ndarray: + # Choose dimensions between 11 and 13, clamp to 11, force odd. + h = int(self._rng.integers(11, 14)) + w = int(self._rng.integers(11, 14)) + h = 11 if h > 11 else h + w = 11 if w > 11 else w + if h % 2 == 0: h -= 1 + if w % 2 == 0: w -= 1 + + maze = np.full((h, w), "wall", dtype=object) + start = (1, 1) + maze[start] = "empty" + stack = [start] + directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] + while stack: + r, c = stack[-1] + neighbors = [] + for dr, dc in directions: + nr, nc = r + dr, c + dc + if 0 <= nr < h and 0 <= nc < w and maze[nr, nc] == "wall": + neighbors.append((nr, nc)) + if neighbors: + next_cell = neighbors[self._rng.integers(0, len(neighbors))] + nr, nc = next_cell + wall_r, wall_c = r + (nr - r) // 2, c + (nc - c) // 2 + maze[wall_r, wall_c] = "empty" + maze[nr, nc] = "empty" + stack.append(next_cell) + else: + stack.pop() + + # Apply thickening with a random probability to vary density. + thick_prob = 0.3 + 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j + 1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i + 1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick + + # Enforce at least two contiguous empty cells along each border. + if w > 3 and not self._has_gap(maze[0, 1:w-1]): + maze[0, 1:3] = "empty" + if w > 3 and not self._has_gap(maze[h - 1, 1:w-1]): + maze[h - 1, 1:3] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, 0]): + maze[1:3, 0] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, w - 1]): + maze[1:3, w - 1] = "empty" + + # Scatter hearts in empty cells with a 30% probability. + for i in range(h): + for j in range(w): + if maze[i, j] == "empty" and self._rng.random() < 0.3: + maze[i, j] = "heart" + return maze + + def _has_gap(self, line: np.ndarray) -> bool: + contiguous = 0 + for cell in line: + contiguous = contiguous + 1 if cell == "empty" else 0 + if contiguous >= 2: + return True + return False + + # --- Placement Routines for Obstacles and Blocks --- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Cross obstacles. + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + block_count = self._blocks.get("count", 0) + for _ in range(block_count): + block_w = self._rng.integers(2, 15) + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + +# End of VariedTerrainDiverseStyle class implementation \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_labyrinths.py b/mettagrid/config/room/varied_terrain_labyrinths.py new file mode 100644 index 00000000..fd85ce1e --- /dev/null +++ b/mettagrid/config/room/varied_terrain_labyrinths.py @@ -0,0 +1,323 @@ +from typing import Optional, Tuple, List +import numpy as np +from omegaconf import DictConfig +from mettagrid.config.room.room import Room + +class VariedTerrainLabyrinths(Room): + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + occupancy_threshold: float = 0.66, # maximum fraction of grid cells to occupy + **kwargs # Accept extra parameters + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + self._occupancy_threshold = occupancy_threshold + + # Obstacle parameters. + self._large_obstacles = kwargs.pop("large_obstacles", {"size_range": [10, 25], "count": 0}) + self._small_obstacles = kwargs.pop("small_obstacles", {"size_range": [3, 6], "count": 0}) + self._crosses = kwargs.pop("crosses", {"count": 0}) + # Altars: hearts_count overrides number of altars. + self._hearts_count = kwargs.pop("hearts_count", 50) + self._clumpiness = kwargs.pop("clumpiness", 0) + # Labyrinths: count of mini labyrinths (mazes). + self._labyrinths = kwargs.pop("labyrinths", {"count": 0}) + # Scattered single walls. + self._scattered_walls = kwargs.pop("scattered_walls", {"count": 0}) + + def _build(self) -> np.ndarray: + # Prepare agent symbols. + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, dict) or isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + # --- Pre-scale objects by estimated occupancy --- + area = self._width * self._height + total_objects = sum(self._objects.values()) + len(agents) + if total_objects > self._occupancy_threshold * area: + scale = (self._occupancy_threshold * area) / total_objects + for obj in self._objects: + if self._objects[obj] > 0: + # Ensure at least one instance if originally nonzero. + self._objects[obj] = max(1, int(self._objects[obj] * scale)) + + # Create an empty grid. + grid = np.full((self._height, self._width), "empty", dtype=object) + + # Place features in order: + grid = self._place_labyrinths(grid) + grid = self._place_all_obstacles(grid) + grid = self._place_scattered_walls(grid) + + # --- Place remaining objects (e.g., altars, other objects) --- + for obj, count in self._objects.items(): + if obj == "altar": + count = self._hearts_count + for _ in range(count): + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for object placement.") + grid[pos[0], pos[1]] = obj + + # --- Place agents --- + for agent in agents: + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for agent placement.") + grid[pos[0], pos[1]] = agent + + return grid + + # -------------------------------------------------------------------------- + # Helper Functions for Candidate Search and Placement + # -------------------------------------------------------------------------- + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + """ + Returns list of top-left indices where a sub-region of given shape is completely "empty". + """ + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + """ + Returns a random empty cell (as a tuple) from the grid. + """ + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + """ + Attempts to place a given pattern on grid. If clearance > 0, an effective region is + computed by padding the pattern dimensions with the clearance. + """ + p_h, p_w = pattern.shape + effective_h, effective_w = p_h + 2 * clearance, p_w + 2 * clearance + candidates = self._find_candidates(grid, (effective_h, effective_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance: r+clearance+p_h, c+clearance: c+clearance+p_w] = pattern + return True + return False + + # -------------------------------------------------------------------------- + # Placement Routines + # -------------------------------------------------------------------------- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + """ + Place mini labyrinths onto the grid. + """ + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + """ + Places large obstacles, small obstacles, and cross obstacles on the grid. + For obstacles, a one-cell clearance is enforced. + """ + clearance = 1 + + # Place large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target_blocks = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target_blocks) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target_blocks} blocks.") + + # Place small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target_blocks = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target_blocks) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target_blocks} blocks.") + + # Place cross obstacles (no extra clearance assumed). + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + """ + Scatter single wall cells randomly on empty grid locations. + """ + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + # Randomly select indices (without replacement) for wall placement. + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "block" + return grid + + # -------------------------------------------------------------------------- + # Pattern Generation Functions (unchanged in spirit) + # -------------------------------------------------------------------------- + def _generate_random_shape(self, num_blocks: int) -> np.ndarray: + """ + Generates a random connected shape with num_blocks cells set to "wall" using a random walk. + The shape is normalized (top-left at (0,0)). + """ + shape_cells = {(0, 0)} + # Use candidate growth until desired number of blocks is reached. + while len(shape_cells) < num_blocks: + candidates = [] + for (r, c) in shape_cells: + for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]: + candidate = (r + dr, c + dc) + if candidate not in shape_cells: + candidates.append(candidate) + if not candidates: + break + new_cell = candidates[self._rng.integers(0, len(candidates))] + shape_cells.add(new_cell) + # Normalize shape coordinates. + min_r = min(r for r, _ in shape_cells) + min_c = min(c for _, c in shape_cells) + max_r = max(r for r, _ in shape_cells) + max_c = max(c for _, c in shape_cells) + pattern = np.full((max_r - min_r + 1, max_c - min_c + 1), "empty", dtype=object) + for r, c in shape_cells: + pattern[r - min_r, c - min_c] = "wall" + return pattern + + def _generate_cross_pattern(self) -> np.ndarray: + """ + Generates a cross-shaped obstacle. + Both width and height are sampled between 1 and 8 (inclusive). + The center row and column are set to "wall". + """ + cross_w = self._rng.integers(1, 9) + cross_h = self._rng.integers(1, 9) + pattern = np.full((cross_h, cross_w), "empty", dtype=object) + center_row = cross_h // 2 + center_col = cross_w // 2 + pattern[center_row, :] = "wall" + pattern[:, center_col] = "wall" + return pattern + + def _generate_labyrinth_pattern(self) -> np.ndarray: + """ + Generates a mini labyrinth (maze) pattern. + Dimensions are chosen randomly near 11×11 and forced to be odd. + A recursive backtracking algorithm carves passages. After that, passages + are thickened probabilistically, border gaps are ensured, and empty cells + are randomly replaced with "heart". + """ + # Choose dimensions between 11 and 13 then clamp and force oddness. + h = int(self._rng.integers(11, 14)) + w = int(self._rng.integers(11, 14)) + h = 11 if h > 11 else h + w = 11 if w > 11 else w + if h % 2 == 0: h -= 1 + if w % 2 == 0: w -= 1 + + maze = np.full((h, w), "wall", dtype=object) + start = (1, 1) + maze[start] = "empty" + stack = [start] + directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] + while stack: + r, c = stack[-1] + neighbors = [] + for dr, dc in directions: + nr, nc = r + dr, c + dc + if 0 <= nr < h and 0 <= nc < w and maze[nr, nc] == "wall": + neighbors.append((nr, nc)) + if neighbors: + next_cell = neighbors[self._rng.integers(0, len(neighbors))] + nr, nc = next_cell + wall_r, wall_c = r + (nr - r) // 2, c + (nc - c) // 2 + maze[wall_r, wall_c] = "empty" + maze[nr, nc] = "empty" + stack.append(next_cell) + else: + stack.pop() + + # Apply thickening to passages. + thick_prob = 0.3 + 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j+1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i+1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick + + # Ensure border gaps of at least 2 cells. + if w > 3 and not self._has_gap(maze[0, 1:w-1]): + maze[0, 1:3] = "empty" + if w > 3 and not self._has_gap(maze[h - 1, 1:w-1]): + maze[h - 1, 1:3] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, 0]): + maze[1:3, 0] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, w - 1]): + maze[1:3, w - 1] = "empty" + + # Randomly scatter hearts over empty cells with 30% probability. + for i in range(h): + for j in range(w): + if maze[i, j] == "empty" and self._rng.random() < 0.3: + maze[i, j] = "heart" + + return maze + + def _has_gap(self, line: np.ndarray) -> bool: + """ + Returns True if a 1D array has at least 2 contiguous "empty" cells. + """ + contiguous = 0 + for cell in line: + contiguous = contiguous + 1 if cell == "empty" else 0 + if contiguous >= 2: + return True + return False + +# ============================================================================== +# End of VariedTerrainLabyrinths +# ============================================================================== diff --git a/mettagrid/mettagrid_env.py b/mettagrid/mettagrid_env.py index 56f03293..acd20e67 100644 --- a/mettagrid/mettagrid_env.py +++ b/mettagrid/mettagrid_env.py @@ -1,5 +1,5 @@ import copy -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional import gymnasium as gym import hydra @@ -180,21 +180,20 @@ class MettaGridEnvSet(MettaGridEnv): def __init__( self, env_cfg: DictConfig, - probabilities: List[float] | None, render_mode: str, buf=None, **kwargs, ): - self._env_cfgs = env_cfg.envs + self._envs = list(env_cfg.envs.keys()) + self._probabilities = list(env_cfg.envs.values()) self._num_agents_global = env_cfg.num_agents - self._probabilities = probabilities self._env_cfg = self._get_new_env_cfg() super().__init__(env_cfg, render_mode, buf, **kwargs) self._cfg_template = None # we don't use this with multiple envs, so we clear it to emphasize that fact def _get_new_env_cfg(self): - selected_env = np.random.choice(self._env_cfgs, p=self._probabilities) + selected_env = np.random.choice(self._envs, p=self._probabilities) env_cfg = config_from_path(selected_env) if self._num_agents_global != env_cfg.game.num_agents: raise ValueError(