diff --git a/mettagrid/config/room/box_world.py b/mettagrid/config/room/box_world.py new file mode 100644 index 00000000..bce407e0 --- /dev/null +++ b/mettagrid/config/room/box_world.py @@ -0,0 +1,147 @@ +""" +BoxWorld +======== + +Densely fills the map with rectangular *boxes*: + +* Interior rectangle; one side is guaranteed ≥ 8 cells. +* Walls 1‑cell thick. +* A single *entrance* (one gap) sits at a randomly chosen corner. +* A single heart **altar** sits in the *diagonally opposite* corner. +* Boxes are stamped until 10 consecutive placement failures, then agents + are spawned in the remaining open corridors. + +Parameters (override in YAML `room:` block if desired): + + short_range – inclusive (min, max) for interior short side + elong_range – inclusive (min_add, max_add) added to short → long side +""" + +from typing import List, Optional, Tuple + +import numpy as np +from mettagrid.config.room.room import Room + + +class BoxWorld(Room): + STYLE_PARAMETERS = {"box_world": {"hearts_count": 0}} + + # ------------------------------------------------------------------ # + # Init + # ------------------------------------------------------------------ # + def __init__( + self, + width: int = 120, + height: int = 120, + agents: int | dict = 0, + short_range: Tuple[int, int] = (4, 6), # interior short side + elong_range: Tuple[int, int] = (4, 8), # + add to short → long + seed: Optional[int] = 42, + border_width: int = 2, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width, self._height = np.random.randint(40,100), np.random.randint(40,100) + self._agents = agents + self._occ = np.zeros((height, width), dtype=bool) + self.short_min, self.short_max = short_range + self.elong_min, self.elong_max = elong_range + + # ------------------------------------------------------------------ # + # Public build + # ------------------------------------------------------------------ # + def _build(self): + grid = np.full((self._height, self._width), "empty", dtype=object) + fails = 0 + while fails < 10: # pack until space gone + if self._place_box(grid, clearance=1): + fails = 0 + else: + fails += 1 + return self._place_agents(grid) + + # ------------------------------------------------------------------ # + # Box helpers + # ------------------------------------------------------------------ # + def _place_box(self, grid, clearance: int) -> bool: + pattern = self._generate_box() + return self._place_region(grid, pattern, clearance) + + def _generate_box(self) -> np.ndarray: + # --- choose interior dimensions -------------------------------- # + short = int(self._rng.integers(self.short_min, self.short_max + 1)) + long_add = int(self._rng.integers(self.elong_min, self.elong_max + 1)) + long_side = max(short + long_add, 8) # ensure ≥ 8 somewhere + + horiz = self._rng.random() < 0.5 + h_int, w_int = (short, long_side) if horiz else (long_side, short) + + h, w = h_int + 2, w_int + 2 # +2 for walls + pat = np.full((h, w), "wall", dtype=object) + pat[1:-1, 1:-1] = "empty" + + # pick entrance corner, altar opposite + corners = [(1, 1), (1, w - 2), (h - 2, 1), (h - 2, w - 2)] + entrance = corners[self._rng.integers(4)] + altar = corners[(corners.index(entrance) + 2) % 4] + er, ec = entrance + + pat[altar] = "altar" + + # carve a single gap adjacent to the entrance corner + gaps = [] + if er == 1: gaps.append((0, ec)) # top wall + if er == h - 2: gaps.append((h - 1, ec)) # bottom wall + if ec == 1: gaps.append((er, 0)) # left wall + if ec == w - 2: gaps.append((er, w - 1)) # right wall + gap = gaps[self._rng.integers(len(gaps))] + pat[gap] = "empty" + return pat + + # ------------------------------------------------------------------ # + # Agent placement + # ------------------------------------------------------------------ # + def _place_agents(self, grid): + tags = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + n for n, k in self._agents.items() for _ in range(k)] + ) + for t in tags: + pos = self._rand_empty() + if pos: + grid[pos] = t + self._occ[pos] = True + return grid + + # ------------------------------------------------------------------ # + # Generic helpers (same as LabyrinthWorld) + # ------------------------------------------------------------------ # + def _place_region(self, grid, pat, clearance: int) -> bool: + ph, pw = pat.shape + for r, c in self._free_windows((ph + 2 * clearance, pw + 2 * clearance)): + grid[r + clearance : r + clearance + ph, + c + clearance : c + clearance + pw] = pat + self._occ[r : r + ph + 2 * clearance, + c : c + pw + 2 * clearance] = True + return True + return False + + def _free_windows(self, shape) -> List[Tuple[int, int]]: + h, w = shape + H, W = self._occ.shape + if h > H or w > W: + return [] + view = np.lib.stride_tricks.as_strided( + self._occ, (H - h + 1, W - w + 1, h, w), self._occ.strides * 2 + ) + coords = np.argwhere(view.sum(axis=(2, 3)) == 0) + self._rng.shuffle(coords) + return [tuple(t) for t in coords] + + def _rand_empty(self): + empties = np.flatnonzero(~self._occ) + return None if empties.size == 0 else tuple( + np.unravel_index(self._rng.integers(empties.size), self._occ.shape) + ) \ No newline at end of file diff --git a/mettagrid/config/room/cubicle_world.py b/mettagrid/config/room/cubicle_world.py new file mode 100644 index 00000000..511218e5 --- /dev/null +++ b/mettagrid/config/room/cubicle_world.py @@ -0,0 +1,122 @@ +""" +CubicleWorld +============ + +Creates a lattice of full‑length walls: vertical lines from top‑to‑bottom and +horizontal lines from left‑to‑right. Interior rectangles of empty space are the +“cubicles.” Door‑sized openings are carved in each interior wall. + +The outermost perimeter is always a solid wall, ensuring the arena is fully enclosed. + +Episode‑time random sampling +---------------------------- +* env_width, env_height ← uniform int samples from width_range / height_range +* gap_x, gap_y ← uniform int samples from gap_range (4‑14) + +Wall coordinates: + vertical walls at 0, gap_x+1, 2*(gap_x+1), …, width‑1 + horizontal walls at 0, gap_y+1, 2*(gap_y+1), …, height‑1 +""" + +from __future__ import annotations +from typing import Sequence +import numpy as np +from mettagrid.config.room.room import Room + + + +class CubicleWorld(Room): + STYLE_PARAMETERS = {"cubicle": {"hearts_count": 0}} + + def __init__( + self, + width_range: Sequence[int] = (60, 140), + height_range: Sequence[int] = (60, 140), + gap_range: Sequence[int] = (4, 14), + altars_count: int = 50, + agents: int | dict = 20, + seed: int | None = 42, + border_object: str = "wall", + ): + super().__init__(border_object=border_object) + rng = np.random.default_rng(seed) + + self.width = np.random.randint(width_range[0], width_range[1]) + self.height = np.random.randint(height_range[0], height_range[1]) + self.gap_x = np.random.randint(gap_range[0], gap_range[1]) + self.gap_y = np.random.randint(gap_range[0], gap_range[1]) + + self.altars_count = altars_count + self._agents = agents + self._rng = rng + + # --------------------------------------------------------------- # + def _build(self): + grid = self._generate_grid() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # --------------------------------------------------------------- # + def _generate_grid(self) -> np.ndarray: + H, W = self.height, self.width + grid = np.full((H, W), "empty", dtype=object) + + # Draw vertical full‑height walls + col = 0 + while col < W: + grid[:, col] = "wall" + col += self.gap_x + 1 + + # Draw horizontal full‑width walls + row = 0 + while row < H: + grid[row, :] = "wall" + row += self.gap_y + 1 + + # Carve door‑sized breaks in every interior wall + # Vertical walls: skip borders (col == 0 or col == W‑1) + for col in range(self.gap_x + 1, W - 1, self.gap_x + 1): + for row_start in range(0, H, self.gap_y + 1): + if row_start + self.gap_y < H: + door_row = row_start + 1 + self._rng.integers(self.gap_y) + grid[door_row, col] = "empty" + + # Horizontal walls: skip borders (row == 0 or row == H‑1) + for row in range(self.gap_y + 1, H - 1, self.gap_y + 1): + for col_start in range(0, W, self.gap_x + 1): + if col_start + self.gap_x < W: + door_col = col_start + 1 + self._rng.integers(self.gap_x) + grid[row, door_col] = "empty" + + # Ensure the outer perimeter is a solid wall + grid[0, :] = "wall" + grid[-1, :] = "wall" + grid[:, 0] = "wall" + grid[:, -1] = "wall" + + # Occupancy mask for placement + self._occ = np.zeros((H, W), dtype=bool) + return grid + + # --------------------------------------------------------------- # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(grid == "empty"))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + # build tag list + if isinstance(self._agents, int): + tags = ["agent.agent"] * self._agents + else: + tags = ["agent." + t for t, n in self._agents.items() for _ in range(n)] + + empties = np.flatnonzero((grid == "empty") & (~self._occ)) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/cylinder_world.py b/mettagrid/config/room/cylinder_world.py index e743c1ca..b855fe0c 100644 --- a/mettagrid/config/room/cylinder_world.py +++ b/mettagrid/config/room/cylinder_world.py @@ -18,7 +18,7 @@ def __init__( width: int, height: int, agents: int | dict = 0, - seed: Optional[int] = None, + seed: Optional[int] = 42, border_width: int = 0, border_object: str = "wall", ): @@ -60,7 +60,6 @@ def _build_cylinder_world(self) -> np.ndarray: fails = 0 # reset – we still found room else: fails += 1 # try a different size/orientation - # Finally, spawn any requested agents on leftover empty cells grid = self._place_agents(grid) diff --git a/mettagrid/config/room/giant_maze_world.py b/mettagrid/config/room/giant_maze_world.py new file mode 100644 index 00000000..aaa1836e --- /dev/null +++ b/mettagrid/config/room/giant_maze_world.py @@ -0,0 +1,158 @@ +""" +GiantMazeWorld (v3, continuous) +=============================== + +Builds one perfect maze, then up‑scales it so corridors and walls have +variable thickness **while preserving full connectivity**. + +User‑tunable ranges (sampled each episode): + +* corridor_range – inclusive [min,max] corridor width (≥ 1) +* wall_range – inclusive [min,max] wall thickness (≥ 1) +* altars_count – number of heart altars sprinkled on corridor tiles +""" + +from typing import Optional, Tuple, List +import numpy as np +from mettagrid.config.room.room import Room + + +class GiantMazeWorld(Room): + STYLE_PARAMETERS = {"giant_maze": {"hearts_count": 0}} + + # --------------------------------------------------------------- # + def __init__( + self, + width: int = 121, + height: int = 121, + corridor_range: Tuple[int, int] = (1, 10), + wall_range: Tuple[int, int] = (1, 5), + altars_count: int = 50, + agents: int | dict = 0, + seed: Optional[int] = 42, + border_width: int = 2, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + + width = np.random.randint(width-75,width+15) + height = np.random.randint(height-75,height+15) + + # force odd dims (good for maze generation) + self.base_w = width | 1 + self.base_h = height | 1 + + self.corridor_range = corridor_range + self.wall_range = wall_range + self.altars_count = altars_count + self._agents = agents + + # --------------------------------------------------------------- # + # Public builder + # --------------------------------------------------------------- # + def _build(self): + # sample widths + self.cw = int(self._rng.integers(self.corridor_range[0], + self.corridor_range[1] + 1)) + self.wt = int(self._rng.integers(self.wall_range[0], + self.wall_range[1] + 1)) + + grid = self._generate_maze() + self._place_altars(grid) + self._place_agents(grid) + return grid + + # --------------------------------------------------------------- # + # Maze generation with connectivity‑preserving upscale + # --------------------------------------------------------------- # + def _generate_maze(self) -> np.ndarray: + cw, wt = self.cw, self.wt + cell = cw + wt + # coarse grid size (odd) + small_h = ((self.base_h - 1) // cell) | 1 + small_w = ((self.base_w - 1) // cell) | 1 + + # perfect maze on coarse grid (True=corridor) + coarse = np.full((small_h, small_w), False, dtype=bool) + dirs = [(0, 1), (1, 0), (0, -1), (-1, 0)] + stack = [(1, 1)] + coarse[1, 1] = True + last_dir = None + rng = self._rng + while stack: + r, c = stack[-1] + opts = [ + (dr, dc) + for dr, dc in dirs + if 0 < r + dr * 2 < small_h - 1 + and 0 < c + dc * 2 < small_w - 1 + and not coarse[r + dr * 2, c + dc * 2] + ] + if opts: + if last_dir in opts and rng.random() > 0.3: + dr, dc = last_dir + else: + dr, dc = opts[rng.integers(len(opts))] + coarse[r + dr, c + dc] = True + coarse[r + dr * 2, c + dc * 2] = True + stack.append((r + dr * 2, c + dc * 2)) + last_dir = (dr, dc) + else: + stack.pop() + last_dir = None + + # upscale to full resolution with passages through walls + big_h, big_w = small_h * cell + 1, small_w * cell + 1 + grid = np.full((big_h, big_w), "wall", dtype=object) + + # carve blocks + for sr in range(small_h): + for sc in range(small_w): + if coarse[sr, sc]: + r0, c0 = sr * cell + wt, sc * cell + wt + grid[r0 : r0 + cw, c0 : c0 + cw] = "empty" + + # carve connectors through walls + for sr in range(small_h): + for sc in range(small_w): + if not coarse[sr, sc]: + continue + # right neighbour + if sc + 1 < small_w and coarse[sr, sc + 1]: + r0 = sr * cell + wt + c_bridge = (sc + 1) * cell + grid[r0 : r0 + cw, c_bridge : c_bridge + wt] = "empty" + # bottom neighbour + if sr + 1 < small_h and coarse[sr + 1, sc]: + c0 = sc * cell + wt + r_bridge = (sr + 1) * cell + grid[r_bridge : r_bridge + wt, c0 : c0 + cw] = "empty" + + # occupancy mask + self._height, self._width = big_h, big_w + self._occ = np.zeros((big_h, big_w), dtype=bool) + return grid + + # --------------------------------------------------------------- # + # Altars & agents + # --------------------------------------------------------------- # + def _place_altars(self, grid: np.ndarray): + empties = list(zip(*np.where(grid == "empty"))) + self._rng.shuffle(empties) + for pos in empties[: self.altars_count]: + grid[pos] = "altar" + self._occ[pos] = True + + def _place_agents(self, grid: np.ndarray): + tags: List[str] = ( + ["agent.agent"] * self._agents + if isinstance(self._agents, int) + else ["agent." + t for t, n in self._agents.items() for _ in range(n)] + ) + empties = np.flatnonzero((grid == "empty") & (~self._occ)) + self._rng.shuffle(empties) + for tag, idx in zip(tags, empties): + pos = tuple(np.unravel_index(idx, grid.shape)) + grid[pos] = tag + self._occ[pos] = True \ No newline at end of file diff --git a/mettagrid/config/room/giant_maze_world_varied.py b/mettagrid/config/room/giant_maze_world_varied.py new file mode 100644 index 00000000..c79aa42d --- /dev/null +++ b/mettagrid/config/room/giant_maze_world_varied.py @@ -0,0 +1,132 @@ +""" +GiantMazeWorldVaried +==================== + +Variant of GiantMazeWorld with: + +• Map dimensions sampled between 200 – 400 tiles. +• Main corridor width sampled from corridor_range. +• Fractal‑style side branches carved off main corridors. + +Extra parameters +---------------- +branch_prob – chance an empty tile spawns a branch (0–1) +branch_scale – branch width = int(main_width × branch_scale) +max_branch_length – upper bound on branch length (tiles) +""" + +from __future__ import annotations + +from typing import Sequence + +import numpy as np +from mettagrid.config.room.giant_maze_world import GiantMazeWorld + + +def _sample_int(v: int | Sequence[int], rng: np.random.Generator) -> int: + """Return v if int, else uniformly sample an int in [v[0], v[1]].""" + if isinstance(v, Sequence): + lo, hi = int(v[0]), int(v[1]) + return int(rng.integers(lo, hi + 1)) + return int(v) + + +def _sample_num(v, rng): + """Sample float or int ranges transparently.""" + if isinstance(v, Sequence): + lo, hi = v[0], v[1] + if isinstance(lo, int) and isinstance(hi, int): + return int(rng.integers(int(lo), int(hi) + 1)) + return float(rng.uniform(float(lo), float(hi))) + return v + + +class GiantMazeWorldVaried(GiantMazeWorld): + STYLE_PARAMETERS = {"giant_maze_varied": {"hearts_count": 0}} + + def __init__( + self, + width: int | Sequence[int] = (200, 400), + height: int | Sequence[int] = (200, 400), + corridor_range: Sequence[int] = (1, 10), + wall_range: Sequence[int] = (1, 8), + branch_prob: float = 0.3, + branch_scale: float = 0.5, + max_branch_length: int = 40, + altars_count: int = 50, + agents: int | dict = 0, + seed: int | None = 42, + border_width: int = 2, + border_object: str = "wall", + ): + rng_ext = np.random.default_rng(seed) + width = _sample_int(width, rng_ext) + height = _sample_int(height, rng_ext) + + super().__init__( + width=width, + height=height, + corridor_range=tuple(corridor_range), + wall_range=tuple(wall_range), + altars_count=altars_count, + agents=agents, + seed=int(rng_ext.integers(2**32)), + border_width=border_width, + border_object=border_object, + ) + + self.branch_prob = float(_sample_num(branch_prob, rng_ext)) + self.branch_scale = float(_sample_num(branch_scale, rng_ext)) + self.max_branch_length = int(_sample_num(max_branch_length, rng_ext)) + + # ------------------------------------------------------------------ + # Maze generation with fractal branches + # ------------------------------------------------------------------ + def _generate_maze(self) -> np.ndarray: + grid = super()._generate_maze() # base perfect maze + + branch_w = max(1, int(self.cw * self.branch_scale)) + directions = [(0, 1), (1, 0), (0, -1), (-1, 0)] + H, W = grid.shape + rng = self._rng # inherited RNG + + def wall(r, c): + return 0 <= r < H and 0 <= c < W and grid[r, c] == "wall" + + for r in range(1, H - 1): + for c in range(1, W - 1): + if grid[r, c] != "empty" or rng.random() > self.branch_prob: + continue + + dr, dc = directions[rng.integers(4)] + length = int(rng.integers(3, self.max_branch_length + 1)) + rr, cc = r + dr, c + dc + + for _ in range(length): + if not wall(rr, cc): + break + + grid[rr, cc] = "empty" + + # widen perpendicular + for off in range(1, branch_w): + if dr == 0: # horizontal branch -> widen vertically + for s in (-1, 1): + rw = rr + s * off + if wall(rw, cc): + grid[rw, cc] = "empty" + else: # vertical branch -> widen horizontally + for s in (-1, 1): + cw = cc + s * off + if wall(rr, cw): + grid[rr, cw] = "empty" + + rr += dr + cc += dc + + # 30 % chance to turn + if rng.random() < 0.3: + dr, dc = directions[rng.integers(4)] + + self._occ = np.zeros_like(grid, dtype=bool) + return grid \ No newline at end of file diff --git a/mettagrid/config/room/lattice_world.py b/mettagrid/config/room/lattice_world.py new file mode 100644 index 00000000..570d4d56 --- /dev/null +++ b/mettagrid/config/room/lattice_world.py @@ -0,0 +1,192 @@ +from typing import List, Optional, Tuple + +import numpy as np + +from mettagrid.config.room.room import Room + + +class _DSU: + """Simple disjoint‑set union (union–find) for connectivity guarantees.""" + + def __init__(self, n: int): + self.parent = list(range(n)) + self.rank = [0] * n + + def find(self, x: int) -> int: + while self.parent[x] != x: + self.parent[x] = self.parent[self.parent[x]] + x = self.parent[x] + return x + + def union(self, x: int, y: int) -> bool: + xr, yr = self.find(x), self.find(y) + if xr == yr: + return False + if self.rank[xr] < self.rank[yr]: + xr, yr = yr, xr + self.parent[yr] = xr + if self.rank[xr] == self.rank[yr]: + self.rank[xr] += 1 + return True + + +class LatticeWorld(Room): + """A lattice of square rooms with guaranteed connectivity and door‑density control.""" + + def __init__( + self, + rooms_per_dim: Optional[int] = None, + room_size: Optional[int] = None, + heart_altar_rate: float = 0.75, + minimum_door_rate: float = 0.75, + num_agents: int = 0, + border_width: int = 1, + border_object: str = "wall", + seed: Optional[int] = None, + ): + if not (0.0 <= minimum_door_rate <= 1.0): + raise ValueError("minimum_door_rate must be between 0 and 1") + super().__init__(border_width=border_width, border_object=border_object) + + self.seed = seed + self.rng = np.random.default_rng(seed) + + # Random sampling of dimensions if unspecified + self.rooms_per_dim = rooms_per_dim or int(self.rng.integers(6, 10)) + self.room_size = room_size or int(self.rng.integers(7, 13)) + + self.heart_altar_rate = heart_altar_rate + self.minimum_door_rate = minimum_door_rate + self.num_agents = num_agents + self.border_width = border_width + self.border_object = border_object + # Book‑keeping + self.agent_positions: dict[int, Tuple[int, int]] = {} + + # --------------------------------------------------------------------- + # Grid construction helpers + # --------------------------------------------------------------------- + def _build(self) -> np.ndarray: + self.grid_size = self.rooms_per_dim * self.room_size + (self.rooms_per_dim - 1) * self.border_width + self.grid = np.full((self.grid_size, self.grid_size), self.border_object, dtype=object) + + self._carve_rooms() + self._carve_doors_connected() + self._place_objects() + return self.grid + + # ------------------------------------------------------------------ + # Room carving + # ------------------------------------------------------------------ + def _room_bounds(self, i: int, j: int) -> Tuple[int, int, int, int]: + row_start = i * (self.room_size + self.border_width) + col_start = j * (self.room_size + self.border_width) + return ( + row_start, + row_start + self.room_size, + col_start, + col_start + self.room_size, + ) + + def _carve_rooms(self) -> None: + for i in range(self.rooms_per_dim): + for j in range(self.rooms_per_dim): + rs, re, cs, ce = self._room_bounds(i, j) + self.grid[rs:re, cs:ce] = "empty" + + # ------------------------------------------------------------------ + # Door carving with connectivity + density guarantee + # ------------------------------------------------------------------ + def _carve_doors_connected(self) -> None: + n = self.rooms_per_dim + total_edges = 2 * n * (n - 1) + dsu = _DSU(n * n) + + # All potential neighbour pairs + edges: List[Tuple[int, int, str]] = [] + for i in range(n): + for j in range(n): + idx = i * n + j + if j < n - 1: # east + edges.append((idx, idx + 1, "E")) + if i < n - 1: # south + edges.append((idx, idx + n, "S")) + self.rng.shuffle(edges) + + opened: set[Tuple[int, int]] = set() + + def _open(i1: int, j1: int, i2: int, j2: int) -> None: + """Cut a centred doorway in the wall between (i1,j1) and (i2,j2).""" + if i1 == i2: # horizontal wall (east–west neighbour) + rs, _, cs1, ce1 = self._room_bounds(i1, j1) + row = rs + self.room_size // 2 + for b in range(self.border_width): + self.grid[row, ce1 + b] = "empty" + else: # vertical wall (north–south neighbour) + _, re1, cs, _ = self._room_bounds(i1, j1) + col = cs + self.room_size // 2 + for b in range(self.border_width): + self.grid[re1 + b, col] = "empty" + + # 1. Spanning tree for connectivity + remaining_edges = [] + for a, b, _ in edges: + if dsu.union(a, b): + i1, j1 = divmod(a, n) + i2, j2 = divmod(b, n) + _open(i1, j1, i2, j2) + opened.add(tuple(sorted((a, b)))) + else: + remaining_edges.append((a, b)) + + # 2. Extra doors until density target reached + self.rng.shuffle(remaining_edges) + needed = int(np.ceil(self.minimum_door_rate * total_edges)) + while len(opened) < needed and remaining_edges: + a, b = remaining_edges.pop() + if tuple(sorted((a, b))) in opened: + continue + i1, j1 = divmod(a, n) + i2, j2 = divmod(b, n) + _open(i1, j1, i2, j2) + opened.add(tuple(sorted((a, b)))) + + # ------------------------------------------------------------------ + # Empty‑cell helpers + # ------------------------------------------------------------------ + def _random_empty(self) -> Tuple[int, int]: + empties = np.argwhere(self.grid == "empty") + if empties.size == 0: + raise ValueError("No empty positions available") + return tuple(int(x) for x in empties[self.rng.integers(len(empties))]) + + def _random_empty_in_room(self, i: int, j: int) -> Tuple[int, int]: + rs, _, cs, _ = self._room_bounds(i, j) + while True: + r = rs + int(self.rng.integers(0, self.room_size)) + c = cs + int(self.rng.integers(0, self.room_size)) + if self.grid[r, c] == "empty": + return (r, c) + + # ------------------------------------------------------------------ + # Object placement + # ------------------------------------------------------------------ + def _place_objects(self) -> None: + n = self.rooms_per_dim + all_rooms = [(i, j) for i in range(n) for j in range(n)] + + # Heart altars + heart_rooms = [room for room in all_rooms if self.rng.random() < self.heart_altar_rate] + for i, j in heart_rooms: + r, c = self._random_empty_in_room(i, j) + self.grid[r, c] = "altar" + + # Agents + if self.num_agents > 0: + if self.num_agents > len(all_rooms): + raise ValueError("More agents than rooms available") + agent_rooms = self.rng.choice(all_rooms, size=self.num_agents, replace=False) + for idx, (i, j) in enumerate(agent_rooms): + r, c = self._random_empty_in_room(i, j) + self.grid[r, c] = "agent.agent" + self.agent_positions[idx] = (r, c) diff --git a/mettagrid/config/room/memory_box_world.py b/mettagrid/config/room/memory_box_world.py new file mode 100644 index 00000000..ae9d728b --- /dev/null +++ b/mettagrid/config/room/memory_box_world.py @@ -0,0 +1,185 @@ +""" +MemoryBoxWorld (random sizes) +============================= + +• Interior width/height sampled from `size_range` (default 9‑14). +• Entrance gap (3 cells) on a random side. +• Two corridor walls whose length is randomly sampled from + `corridor_range` **but clipped so they always fit** and always ≥ 6. +• A single altar sits in the corner on the same side as the entrance, + opposite the gap. +• Boxes are stamped until 10 consecutive placement failures; agents spawn + outside the boxes. +• Exactly 20 agents are spawned in every episode. +""" + +from typing import List, Optional, Tuple + +import numpy as np +from mettagrid.config.room.room import Room + + +class MemoryBoxWorld(Room): + STYLE_PARAMETERS = {"memory_box": {"hearts_count": 0}} + + def __init__( + self, + width: int = 120, + height: int = 120, + agents: int | dict = 20, + size_range: Tuple[int, int] = (9, 14), # interior side length + corridor_range: Tuple[int, int] = (6, 10), # sampled per box + seed: Optional[int] = None, + border_width: int = 2, + border_object: str = "wall", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + width, height = np.random.randint(80,120), np.random.randint(80,120) + self._width, self._height = width, height + # Always spawn exactly 20 agents, ignoring caller‑supplied values + self._agents = agents + self._occ = np.zeros((height, width), dtype=bool) + + self.smin, self.smax = size_range + self.cmin, self.cmax = corridor_range + + # ------------------------------------------------------------------ # + @property + def agents(self) -> int: + """Always returns 20 — the fixed number of agents in MemoryBoxWorld.""" + return 20 + + # -------------------- build world -------------------- # + def _build(self): + grid = np.full((self._height, self._width), "empty", dtype=object) + fails = 0 + while fails < 10: + if self._place_box(grid, clearance=1): + fails = 0 + else: + fails += 1 + return self._place_agents(grid) + + # -------------------- box helpers ------------------- # + def _place_box(self, grid, clearance: int) -> bool: + pat = self._generate_box() + return self._place_region(grid, pat, clearance) + + def _generate_box(self) -> np.ndarray: + # interior size + h_int = int(self._rng.integers(self.smin, self.smax + 1)) + w_int = int(self._rng.integers(self.smin, self.smax + 1)) + h, w = h_int + 2, w_int + 2 + pat = np.full((h, w), "wall", dtype=object) + pat[1:-1, 1:-1] = "empty" + + # choose entrance side + side = self._rng.choice(["top", "bottom", "left", "right"]) + + # sample corridor length, clip so it fits and ≥6 + L_max_vert = h_int - 2 + L_max_horz = w_int - 2 + max_len = L_max_vert if side in ("top", "bottom") else L_max_horz + L = int(self._rng.integers(self.cmin, min(self.cmax, max_len) + 1)) + + if side in ("top", "bottom"): + gap_c = int(self._rng.integers(3, w - 3)) + wall_row = 0 if side == "top" else h - 1 + inner_r = 1 if side == "top" else h - 2 + step = 1 if side == "top" else -1 + + # entrance + for dc in (-1, 0, 1): + pat[wall_row, gap_c + dc] = "empty" + # corridor walls + for k in range(L): + r = inner_r + k * step + pat[r, gap_c - 2] = "wall" + pat[r, gap_c + 2] = "wall" + # altar + altar_c = 1 if gap_c > w // 2 else w - 2 + altar = (1, altar_c) if side == "top" else (h - 2, altar_c) + else: # left / right + gap_r = int(self._rng.integers(3, h - 3)) + wall_col = 0 if side == "left" else w - 1 + inner_c = 1 if side == "left" else w - 2 + step = 1 if side == "left" else -1 + + for dr in (-1, 0, 1): + pat[gap_r + dr, wall_col] = "empty" + for k in range(L): + c = inner_c + k * step + pat[gap_r - 2, c] = "wall" + pat[gap_r + 2, c] = "wall" + altar_r = 1 if gap_r > h // 2 else h - 2 + altar = (altar_r, 1) if side == "left" else (altar_r, w - 2) + + pat[altar] = "altar" + return pat + + # ---------------- agent placement -------------- # + def _place_agents(self, grid): + """Place agents so they are roughly uniformly spread out. + + A greedy Poisson‑disk sampler is used: each new agent spawn + must be at least *min_dist* Manhattan units away from every + already‑placed agent. If the constraint cannot be satisfied + after scanning all empties once, any remaining agents are + placed randomly. + """ + min_dist = 10 + empties = np.flatnonzero(~self._occ) + self._rng.shuffle(empties) + + chosen: List[Tuple[int, int]] = [] + H, W = self._occ.shape + for idx in empties.tolist(): + if len(chosen) == self._agents: + break + r, c = divmod(idx, W) + if all(abs(r - rr) + abs(c - cc) >= min_dist for rr, cc in chosen): + chosen.append((r, c)) + + # Fallback: fill any remaining agents without the distance constraint + if len(chosen) < self._agents: + remaining = [divmod(i, W) for i in empties.tolist() + if divmod(i, W) not in chosen] + for pos in remaining[: self._agents - len(chosen)]: + chosen.append(pos) + + # Stamp agents into the grid / occupancy mask + for pos in chosen: + grid[pos] = "agent.agent" + self._occ[pos] = True + + return grid + + # -------------- generic helpers --------------- # + def _place_region(self, grid, pat, clearance: int): + ph, pw = pat.shape + for r, c in self._free_windows((ph + 2 * clearance, pw + 2 * clearance)): + grid[r + clearance : r + clearance + ph, + c + clearance : c + clearance + pw] = pat + self._occ[r : r + ph + 2 * clearance, + c : c + pw + 2 * clearance] = True + return True + return False + + def _free_windows(self, shape): + h, w = shape + H, W = self._occ.shape + if h > H or w > W: + return [] + view = np.lib.stride_tricks.as_strided( + self._occ, (H - h + 1, W - w + 1, h, w), self._occ.strides * 2 + ) + coords = np.argwhere(view.sum(axis=(2, 3)) == 0) + self._rng.shuffle(coords) + return [tuple(t) for t in coords] + + def _rand_empty(self): + empties = np.flatnonzero(~self._occ) + return None if empties.size == 0 else tuple( + np.unravel_index(self._rng.integers(empties.size), self._occ.shape) + ) \ No newline at end of file diff --git a/mettagrid/config/room/terrain_from_numpy.py b/mettagrid/config/room/terrain_from_numpy.py index 6aa321d8..ba62c989 100644 --- a/mettagrid/config/room/terrain_from_numpy.py +++ b/mettagrid/config/room/terrain_from_numpy.py @@ -1,14 +1,27 @@ import os import random +import time import zipfile import boto3 import numpy as np from botocore.exceptions import NoCredentialsError +from filelock import FileLock from mettagrid.config.room.room import Room +def safe_load(path, retries=5, delay=1.0): + for attempt in range(retries): + try: + return np.load(path, allow_pickle=True) + except ValueError: + if attempt < retries - 1: + time.sleep(delay) + continue + raise + + def download_from_s3(s3_path: str, save_path: str, location: str = "us-east-1"): if not s3_path.startswith("s3://"): raise ValueError(f"Invalid S3 path: {s3_path}. Must start with s3://") @@ -41,33 +54,33 @@ class TerrainFromNumpy(Room): These maps each have 10 agents in them . """ - def __init__(self, dir, border_width: int = 0, border_object: str = "wall", num_agents: int = 10): + def __init__( + self, dir, border_width: int = 0, border_object: str = "wall", num_agents: int = 10, generators: bool = False + ): zipped_dir = dir + ".zip" - if not os.path.exists(dir) and not os.path.exists(zipped_dir): - s3_path = f"s3://softmax-public/maps/{zipped_dir}" - download_from_s3(s3_path, zipped_dir) - if not os.path.exists(dir) and os.path.exists(zipped_dir): - with zipfile.ZipFile(zipped_dir, "r") as zip_ref: - zip_ref.extractall(os.path.dirname(dir)) + lock_path = zipped_dir + ".lock" + # Only one process can hold this lock at a time: + with FileLock(lock_path): + if not os.path.exists(dir) and not os.path.exists(zipped_dir): + s3_path = f"s3://softmax-public/maps/{zipped_dir}" + download_from_s3(s3_path, zipped_dir) + if not os.path.exists(dir) and os.path.exists(zipped_dir): + with zipfile.ZipFile(zipped_dir, "r") as zip_ref: + zip_ref.extractall(os.path.dirname(dir)) + + # if not os.path.exists(dir) and not os.path.exists(zipped_dir): + # s3_path = f"s3://softmax-public/maps/{zipped_dir}" + # download_from_s3(s3_path, zipped_dir) + # if not os.path.exists(dir) and os.path.exists(zipped_dir): + # with zipfile.ZipFile(zipped_dir, "r") as zip_ref: + # zip_ref.extractall(os.path.dirname(dir)) self.files = os.listdir(dir) self.dir = dir self.num_agents = num_agents + self.generators = generators super().__init__(border_width=border_width, border_object=border_object) - def _build(self): - uri = np.random.choice(self.files) - level = np.load(f"{self.dir}/{uri}", allow_pickle=True) - # Count number of agents in the map - - # try again once if wrong number of agents - if not np.count_nonzero(level == "agent.agent") == self.num_agents: - uri = np.random.choice(self.files) - level = np.load(f"{self.dir}/{uri}", allow_pickle=True) - - area = level.shape[0] * level.shape[1] - num_hearts = area // random.randint(66, 180) - - # Find valid empty spaces surrounded by empty + def get_valid_positions(self, level): valid_positions = [] for i in range(1, level.shape[0] - 1): for j in range(1, level.shape[1] - 1): @@ -80,10 +93,39 @@ def _build(self): or level[i, j + 1] == "empty" ): valid_positions.append((i, j)) + return valid_positions + + def _build(self): + # TODO: add some way of sampling + uri = np.random.choice(self.files) + level = safe_load(f"{self.dir}/{uri}") + + # remove agents to then repopulate + agents = level == "agent.agent" + level[agents] = "empty" + + valid_positions = self.get_valid_positions(level) + positions = random.sample(valid_positions, self.num_agents) + for pos in positions: + level[pos] = "agent.agent" + + area = level.shape[0] * level.shape[1] + + num_hearts = area // 180 # random.randint(66, 180) + + # Find valid empty spaces surrounded by empty + valid_positions = self.get_valid_positions(level) # Randomly place hearts in valid positions positions = random.sample(valid_positions, min(num_hearts, len(valid_positions))) for pos in positions: level[pos] = "altar" + + if self.generators: + num_mines = area // random.randint(66, 180) + valid_positions = self.get_valid_positions(level) + positions = random.sample(valid_positions, min(num_mines, len(valid_positions))) + for pos in positions: + level[pos] = "generator" self._level = level return self._level diff --git a/mettagrid/config/room/varied_obstacle_shapes.py b/mettagrid/config/room/varied_obstacle_shapes.py new file mode 100644 index 00000000..db14891f --- /dev/null +++ b/mettagrid/config/room/varied_obstacle_shapes.py @@ -0,0 +1,130 @@ +from typing import Optional +import numpy as np +from omegaconf import DictConfig + +from mettagrid.config.room.room import Room + +class VariedObstacleShapes(Room): + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall" + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + + # Define a list of obstacle patterns with varying numbers of wall blocks (from 1 up to 7) + self.obstacle_patterns = [ + np.array([["wall"]]), # 1 block + np.array([["wall", "wall"]]), # 2 blocks (horizontal line) + np.array([["wall", "wall", "wall"]]), # 3 blocks (horizontal line) + np.array([["wall", "wall"], + ["wall", "wall"]]), # 4 blocks (2x2 square) + np.array([["empty", "wall", "empty"], + ["wall", "wall", "wall"], + ["empty", "wall", "empty"]]), # 5 blocks (cross) + np.array([["wall", "wall", "wall"], + ["wall", "empty", "empty"], + ["wall", "wall", "empty"]]), # 6 blocks (L-shaped variant) + np.array([["wall", "wall", "empty"], + ["empty", "wall", "wall"], + ["wall", "wall", "wall"]]) # 7 blocks (zigzag) + ] + + def _build(self): + # Prepare agent symbols + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + # Adjust object counts if total exceeds 2/3 of room area + total_objects = sum(self._objects.values()) + len(agents) + area = self._width * self._height + while total_objects > 2 * area / 3: + for obj_name in self._objects: + self._objects[obj_name] = max(1, self._objects[obj_name] // 2) + total_objects = sum(self._objects.values()) + len(agents) + + # Create an empty grid + grid = np.full((self._height, self._width), "empty", dtype=object) + + # Use the "wall" count to determine the number of obstacles to place + obstacle_count = self._objects.get("wall", 0) + grid = self._place_obstacles(grid, obstacle_count) + + # Place remaining objects (excluding walls) in random empty cells + for obj, count in self._objects.items(): + if obj == "wall": + continue + for _ in range(count): + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + raise ValueError("No empty space available for object placement.") + pos_idx = self._rng.integers(0, len(empty_positions)) + pos = empty_positions[pos_idx] + grid[pos[0], pos[1]] = obj + + # Place agents in remaining empty cells + for agent in agents: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + raise ValueError("No empty space available for agent placement.") + pos_idx = self._rng.integers(0, len(empty_positions)) + pos = empty_positions[pos_idx] + grid[pos[0], pos[1]] = agent + + return grid + + def _place_obstacles(self, grid, obstacle_count: int): + """ + Places composite obstacle objects in the grid. For each obstacle, a pattern is chosen randomly from a list + of shapes (each with a different number of wall blocks, from 1 to 7). The pattern is then placed with a one-cell + clearance around it to ensure traversability. + """ + placed = 0 + attempts = 0 + height, width = grid.shape + + while placed < obstacle_count and attempts < obstacle_count * 100: + attempts += 1 + # Randomly select one of the obstacle patterns + pattern = self.obstacle_patterns[self._rng.integers(0, len(self.obstacle_patterns))] + p_h, p_w = pattern.shape + + # Set clearance (one cell on each side) + clearance = 1 + effective_h = p_h + 2 * clearance + effective_w = p_w + 2 * clearance + + if height - effective_h < 0 or width - effective_w < 0: + break + + # Choose a random top-left coordinate for the effective region + r = self._rng.integers(0, height - effective_h + 1) + c = self._rng.integers(0, width - effective_w + 1) + + # Check if the effective region is completely empty + region = grid[r:r+effective_h, c:c+effective_w] + if np.all(region == "empty"): + # Place the pattern in the center of the effective region + start_r = r + clearance + start_c = c + clearance + grid[start_r:start_r+p_h, start_c:start_c+p_w] = pattern + placed += 1 + + if placed < obstacle_count: + print(f"Warning: Only placed {placed} out of {obstacle_count} obstacles after {attempts} attempts.") + + return grid \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_cylinder.py b/mettagrid/config/room/varied_terrain_cylinder.py new file mode 100644 index 00000000..abff5f63 --- /dev/null +++ b/mettagrid/config/room/varied_terrain_cylinder.py @@ -0,0 +1,138 @@ +""" +VariedTerrainCylinder +--------------------- +A room builder that fills the map with disjoint *cylinders*. + +A *cylinder* is two parallel walls (length 4‑14) separated by a 1‑ or 2‑cell gap. +An altar sits dead‑centre in that gap. Cylinders may be vertical or horizontal. +By default they’re vertical so you can visually verify things; set +orientation="horizontal" or "random" when you like. +""" + +from typing import Tuple, List, Optional +import numpy as np +from mettagrid.config.room.room import Room + + +class VariedTerrainCylinder(Room): + """Scatter as many cylinders as will fit, then drop agents.""" + + def __init__( + self, + width: int, + height: int, + agents: int | dict = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + orientation: str = "vertical", # "vertical", "horizontal", or "random" + style: str = "cylinder_world", + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._agents = agents + self._orientation = orientation + + if style != "cylinder_world": + raise ValueError( + f"Only style 'cylinder_world' is implemented here (got '{style}')." + ) + + # Tracks which cells are busy. + self._occupancy = np.zeros((height, width), dtype=bool) + + # ------------------------------------------------------------------ # + # Public build + # ------------------------------------------------------------------ # + def _build(self) -> np.ndarray: + grid = np.full((self._height, self._width), "empty", dtype=object) + + # 1. Scatter cylinders until no room remains. + while True: + pattern = self._generate_cylinder_pattern() + if not self._place_candidate_region(grid, pattern, clearance=1): + break + + # 2. Drop agents. + for agent in self._make_agent_list(): + pos = self._choose_random_empty() + if pos is None: + break + r, c = pos + grid[r, c] = agent + self._occupancy[r, c] = True + + return grid + + # ------------------------------------------------------------------ # + # Cylinder generation + # ------------------------------------------------------------------ # + def _generate_cylinder_pattern(self) -> np.ndarray: + length = int(self._rng.integers(4, 15)) # 4‑14 + gap = int(self._rng.integers(1, 3)) # 1‑2 + + ori = self._orientation + if ori == "random": + ori = "vertical" if self._rng.random() < 0.5 else "horizontal" + + if ori == "vertical": + h, w = length, gap + 2 + pattern = np.full((h, w), "empty", dtype=object) + pattern[:, 0] = pattern[:, -1] = "wall" + pattern[h // 2, 1 + gap // 2] = "altar" + else: # horizontal + h, w = gap + 2, length + pattern = np.full((h, w), "empty", dtype=object) + pattern[0, :] = pattern[-1, :] = "wall" + pattern[1 + gap // 2, w // 2] = "altar" + + return pattern + + # ------------------------------------------------------------------ # + # Helpers (fast occupancy ops) + # ------------------------------------------------------------------ # + def _make_agent_list(self) -> List[str]: + if isinstance(self._agents, int): + return ["agent.agent"] * self._agents + return [ + "agent." + name + for name, n in self._agents.items() + for _ in range(n) + ] + + def _update_occupancy(self, top_left: Tuple[int, int], pattern: np.ndarray) -> None: + r, c = top_left + ph, pw = pattern.shape + self._occupancy[r : r + ph, c : c + pw] |= (pattern != "empty") + + def _find_candidates(self, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + rh, rw = region_shape + H, W = self._occupancy.shape + if rh > H or rw > W: + return [] + shape = (H - rh + 1, W - rw + 1, rh, rw) + strides = self._occupancy.strides * 2 + sub = np.lib.stride_tricks.as_strided(self._occupancy, shape=shape, strides=strides) + sums = sub.sum(axis=(2, 3)) + return [tuple(idx) for idx in np.argwhere(sums == 0)] + + def _place_candidate_region( + self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0 + ) -> bool: + ph, pw = pattern.shape + cand = self._find_candidates((ph + 2 * clearance, pw + 2 * clearance)) + if not cand: + return False + r, c = cand[self._rng.integers(0, len(cand))] + grid[r + clearance : r + clearance + ph, c + clearance : c + clearance + pw] = pattern + self._update_occupancy((r + clearance, c + clearance), pattern) + return True + + def _choose_random_empty(self) -> Optional[Tuple[int, int]]: + empties = np.flatnonzero(~self._occupancy) + if not len(empties): + return None + idx = self._rng.integers(0, len(empties)) + return np.unravel_index(empties[idx], self._occupancy.shape) \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_diverse.py b/mettagrid/config/room/varied_terrain_diverse.py new file mode 100644 index 00000000..ff015fad --- /dev/null +++ b/mettagrid/config/room/varied_terrain_diverse.py @@ -0,0 +1,417 @@ +""" +This file defines the VariedTerrainDiverse environment. +It creates a grid world with configurable features including: + - Large obstacles and small obstacles: randomly generated, connected shapes. + - Cross obstacles: cross-shaped patterns. + - Mini labyrinths: maze-like structures (≈11×11) with passages thickened probabilistically, + and with at least two-cell gaps along the borders; empty cells may be replaced with "heart". + - Scattered single walls: individual wall cells placed at random empty cells. + - Blocks: new rectangular objects whose width and height are sampled uniformly between 2 and 14. + The number of blocks (block count) is sampled from 0 to 10. + - Altars: objects are placed with a configurable number of hearts. + - A clumpiness factor that biases object placement. +All objects are placed with at least a one-cell clearance and if no space is found for a new object, placement is skipped. +The build order is: + mini labyrinths → obstacles (large, small, crosses) → scattered walls → blocks → remaining objects → agents. +""" + +from typing import Optional, Tuple, List +import numpy as np +from omegaconf import DictConfig +from mettagrid.config.room.room import Room + +class VariedTerrainDiverse(Room): + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + occupancy_threshold: float = 0.66, # maximum fraction of grid cells to occupy + **kwargs # Extra parameters + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + self._occupancy_threshold = occupancy_threshold + + # Obstacle parameters. + self._large_obstacles = kwargs.pop("large_obstacles", {"size_range": [10, 25], "count": 0}) + self._small_obstacles = kwargs.pop("small_obstacles", {"size_range": [3, 6], "count": 0}) + self._crosses = kwargs.pop("crosses", {"count": 0}) + # Altars: hearts_count parameter overrides the count for altars. + self._hearts_count = kwargs.pop("hearts_count", 50) + self._clumpiness = kwargs.pop("clumpiness", 0) + # Labyrinths: number of mini labyrinths. + self._labyrinths = kwargs.pop("labyrinths", {"count": 0}) + # Scattered single walls. + self._scattered_walls = kwargs.pop("scattered_walls", {"count": 0}) + # NEW: Blocks – rectangular objects. + self._blocks = kwargs.pop("blocks", {"count": 0}) + + def _build(self) -> np.ndarray: + # Prepare agent symbols. + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, dict) or isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + # Pre-scale objects if overall occupancy might exceed threshold. + area = self._width * self._height + total_objects = sum(self._objects.values()) + len(agents) + if total_objects > self._occupancy_threshold * area: + scale = (self._occupancy_threshold * area) / total_objects + for obj in self._objects: + if self._objects[obj] > 0: + self._objects[obj] = max(1, int(self._objects[obj] * scale)) + + # Create an empty grid. + grid = np.full((self._height, self._width), "empty", dtype=object) + + # Place features in order. + grid = self._place_labyrinths(grid) + grid = self._place_all_obstacles(grid) + grid = self._place_scattered_walls(grid) + grid = self._place_blocks(grid) + for obj, count in self._objects.items(): + if obj == "altar": + count = self._hearts_count + for _ in range(count): + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for object placement.") + grid[pos[0], pos[1]] = obj + for agent in agents: + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for agent placement.") + grid[pos[0], pos[1]] = agent + + return grid + + # --------------------------- + # Helper Functions + # --------------------------- + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + p_h, p_w = pattern.shape + eff_h, eff_w = p_h + 2*clearance, p_w + 2*clearance + candidates = self._find_candidates(grid, (eff_h, eff_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance:r+clearance+p_h, c+clearance:c+clearance+p_w] = pattern + return True + return False + + # --------------------------- + # Placement Routines + # --------------------------- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Place large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Place small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Place cross obstacles (no extra clearance). + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + """ + Places rectangular blocks on the grid. + Each block's width and height are sampled uniformly between 2 and 14. + The number of blocks is determined by self._blocks["count"] (0 to 10). + A candidate region of the sampled dimensions must be completely empty. + If no such region exists, placement of that block is skipped. + """ + block_count = self._blocks.get("count", 0) + h, w = grid.shape + for _ in range(block_count): + block_w = self._rng.integers(2, 15) # [2,14] + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + # --------------------------- + # Pattern Generation Functions + # --------------------------- + def _generate_random_shape(self, num_blocks: int) -> np.ndarray: + shape_cells = {(0, 0)} + while len(shape_cells) < num_blocks: + candidates = [] + for (r, c) in shape_cells: + for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]: + candidate = (r + dr, c + dc) + if candidate not in shape_cells: + candidates.append(candidate) + if not candidates: + break + new_cell = candidates[self._rng.integers(0, len(candidates))] + shape_cells.add(new_cell) + min_r = min(r for r, _ in shape_cells) + min_c = min(c for _, c in shape_cells) + max_r = max(r for r, _ in shape_cells) + max_c = max(c for _, c in shape_cells) + pattern = np.full((max_r - min_r + 1, max_c - min_c + 1), "empty", dtype=object) + for r, c in shape_cells: + pattern[r - min_r, c - min_c] = "wall" + return pattern + + def _generate_cross_pattern(self) -> np.ndarray: + cross_w = self._rng.integers(1, 9) + cross_h = self._rng.integers(1, 9) + pattern = np.full((cross_h, cross_w), "empty", dtype=object) + center_row = cross_h // 2 + center_col = cross_w // 2 + pattern[center_row, :] = "wall" + pattern[:, center_col] = "wall" + return pattern + + def _generate_labyrinth_pattern(self) -> np.ndarray: + # Choose dimensions between 11 and 13, then clamp to 11 and force odd. + h = int(self._rng.integers(11, 14)) + w = int(self._rng.integers(11, 14)) + h = 11 if h > 11 else h + w = 11 if w > 11 else w + if h % 2 == 0: h -= 1 + if w % 2 == 0: w -= 1 + + maze = np.full((h, w), "wall", dtype=object) + start = (1, 1) + maze[start] = "empty" + stack = [start] + directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] + while stack: + r, c = stack[-1] + neighbors = [] + for dr, dc in directions: + nr, nc = r + dr, c + dc + if 0 <= nr < h and 0 <= nc < w and maze[nr, nc] == "wall": + neighbors.append((nr, nc)) + if neighbors: + next_cell = neighbors[self._rng.integers(0, len(neighbors))] + nr, nc = next_cell + wall_r, wall_c = r + (nr - r) // 2, c + (nc - c) // 2 + maze[wall_r, wall_c] = "empty" + maze[nr, nc] = "empty" + stack.append(next_cell) + else: + stack.pop() + + # Apply thickening based on a random probability between 0.3 and 1.0. + thick_prob = 0.3 + 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j + 1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i + 1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick + + # Ensure each border has at least two contiguous empty cells. + if w > 3 and not self._has_gap(maze[0, 1:w-1]): + maze[0, 1:3] = "empty" + if w > 3 and not self._has_gap(maze[h - 1, 1:w-1]): + maze[h - 1, 1:3] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, 0]): + maze[1:3, 0] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, w - 1]): + maze[1:3, w - 1] = "empty" + + # Scatter hearts in empty cells with 30% probability. + for i in range(h): + for j in range(w): + if maze[i, j] == "empty" and self._rng.random() < 0.3: + maze[i, j] = "heart" + return maze + + def _has_gap(self, line: np.ndarray) -> bool: + contiguous = 0 + for cell in line: + contiguous = contiguous + 1 if cell == "empty" else 0 + if contiguous >= 2: + return True + return False + + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Place large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Place small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Place cross obstacles. + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + """ + Places rectangular block objects on the grid. + For each block, the width and height are sampled uniformly between 2 and 14. + The number of blocks is determined by self._blocks["count"]. + The block is placed in a candidate region that is completely empty. + """ + block_count = self._blocks.get("count", 0) + for _ in range(block_count): + block_w = self._rng.integers(2, 15) # 2 to 14 inclusive. + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + p_h, p_w = pattern.shape + eff_shape = (p_h + 2 * clearance, p_w + 2 * clearance) + candidates = self._find_candidates(grid, eff_shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance:r+clearance+p_h, c+clearance:c+clearance+p_w] = pattern + return True + return False + + def _choose_random_empty_region(self, grid: np.ndarray, region_shape: Tuple[int,int]) -> Optional[Tuple[int,int]]: + candidates = self._find_candidates(grid, region_shape) + if candidates: + return candidates[self._rng.integers(0, len(candidates))] + return None + +# End of VariedTerrainDiverse class implementation \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_diverse_style.py b/mettagrid/config/room/varied_terrain_diverse_style.py new file mode 100644 index 00000000..d577bbc0 --- /dev/null +++ b/mettagrid/config/room/varied_terrain_diverse_style.py @@ -0,0 +1,474 @@ +""" +This file defines the VariedTerrainDiverseStyle environment. +It creates a grid world with a variety of obstacles and objects: + - Large obstacles and small obstacles: random connected shapes. + - Cross obstacles: cross-shaped patterns. + - Mini labyrinths: maze-like structures (~11×11) generated via recursive backtracking, + with passages thickened probabilistically and with border gaps of at least two cells; + empty cells may be replaced with "heart" with ~30% chance. + - Scattered single walls: randomly scattered wall cells. + - Blocks: new rectangular objects with width and height sampled uniformly between 2 and 14. + - Altars: objects with a reward count, overridden by hearts_count. + - A clumpiness factor that biases placement. +Extra style parameters, in particular "domain", are accepted to override distributions. +The build order is: + mini labyrinths → obstacles (large, small, crosses) → scattered walls → blocks → remaining objects → agents. +If no space is found for an object, placement is skipped (for agents, a fallback places them in the center). +""" + +from typing import Optional, Tuple, List +import numpy as np +from omegaconf import DictConfig +from mettagrid.config.room.room import Room + +class VariedTerrainDiverseStyle(Room): + # Predefined style overrides keyed by domain. + _STYLE_OVERRIDES = { + "sparse_cityscape": { + "blocks": {"count": (25, 35)}, + "large_obstacles": {"count": (1, 3)}, + "small_obstacles": {"count": (1, 3)}, + "crosses": {"count": (1, 3)}, + "labyrinths": {"count": 0}, + "scattered_walls": {"count": (1, 6)}, + "hearts_count": (80, 100), + "clumpiness": (0, 1) + }, + "ancient_ruins": { + "blocks": {"count": (0, 3)}, + "large_obstacles": {"count": (3, 6)}, + "small_obstacles": {"count": (3, 6)}, + "crosses": {"count": (1, 3)}, + "labyrinths": {"count": (1, 2)}, + "scattered_walls": {"count": (3, 6)}, + "hearts_count": (70, 90), + "clumpiness": (2, 3) + }, + "cross_forest": { + "blocks": {"count": (3, 8)}, + "large_obstacles": {"count": (1, 3)}, + "small_obstacles": {"count": (1, 3)}, + "crosses": {"count": (5, 8)}, # many crosses + "labyrinths": {"count": (0, 1)}, + "scattered_walls": {"count": (1, 4)}, + "hearts_count": (60, 80), + "clumpiness": (1, 2) + }, + "jungle": { + "blocks": {"count": (0, 10)}, + "large_obstacles": {"count": (0, 10)}, + "small_obstacles": {"count": (0, 10)}, + "crosses": {"count": (0, 10)}, + "labyrinths": {"count": (0, 10)}, + "scattered_walls": {"count": (0, 10)}, + "hearts_count": (0, 150), + "clumpiness": (0, 5) + }, + "desert_adventure": { + "blocks": {"count": (0, 3)}, + "large_obstacles": {"count": (0, 1)}, + "small_obstacles": {"count": (0, 1)}, + "crosses": {"count": (0, 1)}, + "labyrinths": {"count": (8, 12)}, + "scattered_walls": {"count": (0, 3)}, + "hearts_count": (40, 60), + "clumpiness": (0, 2) + }, + "foreign_planet": { + "blocks": {"count": (5, 10)}, + "large_obstacles": {"count": (3, 7)}, + "small_obstacles": {"count": (3, 7)}, + "crosses": {"count": (3, 7)}, + "labyrinths": {"count": (2, 4)}, + "scattered_walls": {"count": (3, 7)}, + "hearts_count": (50, 70), + "clumpiness": (1, 3) + } + } + + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + occupancy_threshold: float = 0.66, + **kwargs # Extra parameters including domain + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + self._occupancy_threshold = occupancy_threshold + + # Optional style parameter: domain. + self._domain = kwargs.pop("domain", None) + + # Obstacle parameters. + self._large_obstacles = kwargs.pop("large_obstacles", {"size_range": [10, 25], "count": 0}) + self._small_obstacles = kwargs.pop("small_obstacles", {"size_range": [3, 6], "count": 0}) + self._crosses = kwargs.pop("crosses", {"count": 0}) + self._hearts_count = kwargs.pop("hearts_count", 50) + self._clumpiness = kwargs.pop("clumpiness", 0) + self._labyrinths = kwargs.pop("labyrinths", {"count": 0}) + self._scattered_walls = kwargs.pop("scattered_walls", {"count": 0}) + self._blocks = kwargs.pop("blocks", {"count": 0}) + + self._apply_style_overrides() + + def _apply_style_overrides(self): + if self._domain: + style = self._STYLE_OVERRIDES.get(self._domain) + if style: + # Override each parameter if provided as a range (tuple) or a constant. + if "blocks" in style: + val = style["blocks"]["count"] + self._blocks["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "large_obstacles" in style: + val = style["large_obstacles"]["count"] + self._large_obstacles["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "small_obstacles" in style: + val = style["small_obstacles"]["count"] + self._small_obstacles["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "crosses" in style: + val = style["crosses"]["count"] + self._crosses["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "labyrinths" in style: + self._labyrinths["count"] = style["labyrinths"]["count"] if isinstance(style["labyrinths"]["count"], int) else int(style["labyrinths"]["count"]) + if "scattered_walls" in style: + val = style["scattered_walls"]["count"] + self._scattered_walls["count"] = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "hearts_count" in style: + val = style["hearts_count"] + self._hearts_count = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + if "clumpiness" in style: + val = style["clumpiness"] + self._clumpiness = int(self._rng.integers(val[0], val[1] + 1)) if isinstance(val, tuple) else val + + def _build(self) -> np.ndarray: + # Ensure one agent per room (map builder should create one agent per room). + if isinstance(self._agents, int) and self._agents != 1: + self._agents = 1 + elif isinstance(self._agents, DictConfig): + # If agent counts are provided as a dict, override with 1 per room. + for key in self._agents: + self._agents[key] = 1 + + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + area = self._width * self._height + total_objects = sum(self._objects.values()) + len(agents) + if total_objects > self._occupancy_threshold * area: + scale = (self._occupancy_threshold * area) / total_objects + for obj in self._objects: + if self._objects[obj] > 0: + self._objects[obj] = max(1, int(self._objects[obj] * scale)) + grid = np.full((self._height, self._width), "empty", dtype=object) + + grid = self._place_labyrinths(grid) + grid = self._place_all_obstacles(grid) + grid = self._place_scattered_walls(grid) + grid = self._place_blocks(grid) + for obj, count in self._objects.items(): + if obj == "altar": + count = self._hearts_count + for _ in range(count): + pos = self._choose_random_empty(grid) + if pos is not None: + grid[pos[0], pos[1]] = obj + else: + print(f"Warning: No empty space available for object {obj}. Skipping placement.") + for agent in agents: + pos = self._choose_random_empty(grid) + if pos is not None: + grid[pos[0], pos[1]] = agent + else: + # Fallback placement: center of grid. + center = (self._height // 2, self._width // 2) + grid[center[0], center[1]] = agent + print("Warning: No empty space found for agent; forced placement at center.") + return grid + + # --- Helper Functions --- + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + p_h, p_w = pattern.shape + eff_shape = (p_h + 2 * clearance, p_w + 2 * clearance) + candidates = self._find_candidates(grid, eff_shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance:r+clearance+p_h, c+clearance:c+clearance+p_w] = pattern + return True + return False + + # --- Placement Routines --- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Cross obstacles. + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + block_count = self._blocks.get("count", 0) + for _ in range(block_count): + block_w = self._rng.integers(2, 15) + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + # --- Pattern Generation Functions --- + def _generate_random_shape(self, num_blocks: int) -> np.ndarray: + shape_cells = {(0, 0)} + while len(shape_cells) < num_blocks: + candidates = [] + for (r, c) in shape_cells: + for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]: + candidate = (r + dr, c + dc) + if candidate not in shape_cells: + candidates.append(candidate) + if not candidates: + break + new_cell = candidates[self._rng.integers(0, len(candidates))] + shape_cells.add(new_cell) + min_r = min(r for r, _ in shape_cells) + min_c = min(c for _, c in shape_cells) + max_r = max(r for r, _ in shape_cells) + max_c = max(c for _, c in shape_cells) + pattern = np.full((max_r - min_r + 1, max_c - min_c + 1), "empty", dtype=object) + for r, c in shape_cells: + pattern[r - min_r, c - min_c] = "wall" + return pattern + + def _generate_cross_pattern(self) -> np.ndarray: + cross_w = self._rng.integers(1, 9) + cross_h = self._rng.integers(1, 9) + pattern = np.full((cross_h, cross_w), "empty", dtype=object) + center_row = cross_h // 2 + center_col = cross_w // 2 + pattern[center_row, :] = "wall" + pattern[:, center_col] = "wall" + return pattern + + def _generate_labyrinth_pattern(self) -> np.ndarray: + # Choose dimensions between 11 and 13, clamp to 11, force odd. + h = int(self._rng.integers(11, 14)) + w = int(self._rng.integers(11, 14)) + h = 11 if h > 11 else h + w = 11 if w > 11 else w + if h % 2 == 0: h -= 1 + if w % 2 == 0: w -= 1 + + maze = np.full((h, w), "wall", dtype=object) + start = (1, 1) + maze[start] = "empty" + stack = [start] + directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] + while stack: + r, c = stack[-1] + neighbors = [] + for dr, dc in directions: + nr, nc = r + dr, c + dc + if 0 <= nr < h and 0 <= nc < w and maze[nr, nc] == "wall": + neighbors.append((nr, nc)) + if neighbors: + next_cell = neighbors[self._rng.integers(0, len(neighbors))] + nr, nc = next_cell + wall_r, wall_c = r + (nr - r) // 2, c + (nc - c) // 2 + maze[wall_r, wall_c] = "empty" + maze[nr, nc] = "empty" + stack.append(next_cell) + else: + stack.pop() + + # Apply thickening with a random probability to vary density. + thick_prob = 0.3 + 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j + 1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i + 1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick + + # Enforce at least two contiguous empty cells along each border. + if w > 3 and not self._has_gap(maze[0, 1:w-1]): + maze[0, 1:3] = "empty" + if w > 3 and not self._has_gap(maze[h - 1, 1:w-1]): + maze[h - 1, 1:3] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, 0]): + maze[1:3, 0] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, w - 1]): + maze[1:3, w - 1] = "empty" + + # Scatter hearts in empty cells with a 30% probability. + for i in range(h): + for j in range(w): + if maze[i, j] == "empty" and self._rng.random() < 0.3: + maze[i, j] = "heart" + return maze + + def _has_gap(self, line: np.ndarray) -> bool: + contiguous = 0 + for cell in line: + contiguous = contiguous + 1 if cell == "empty" else 0 + if contiguous >= 2: + return True + return False + + # --- Placement Routines for Obstacles and Blocks --- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + clearance = 1 + # Large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target} blocks.") + # Small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target} blocks.") + # Cross obstacles. + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "wall" + return grid + + def _place_blocks(self, grid: np.ndarray) -> np.ndarray: + block_count = self._blocks.get("count", 0) + for _ in range(block_count): + block_w = self._rng.integers(2, 15) + block_h = self._rng.integers(2, 15) + candidates = self._find_candidates(grid, (block_h, block_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+block_h, c:c+block_w] = "block" + else: + print(f"Warning: Could not place block of size {block_h}x{block_w}.") + return grid + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + +# End of VariedTerrainDiverseStyle class implementation \ No newline at end of file diff --git a/mettagrid/config/room/varied_terrain_labyrinths.py b/mettagrid/config/room/varied_terrain_labyrinths.py new file mode 100644 index 00000000..fd85ce1e --- /dev/null +++ b/mettagrid/config/room/varied_terrain_labyrinths.py @@ -0,0 +1,323 @@ +from typing import Optional, Tuple, List +import numpy as np +from omegaconf import DictConfig +from mettagrid.config.room.room import Room + +class VariedTerrainLabyrinths(Room): + def __init__( + self, + width: int, + height: int, + objects: DictConfig, + agents: int | DictConfig = 0, + seed: Optional[int] = None, + border_width: int = 0, + border_object: str = "wall", + occupancy_threshold: float = 0.66, # maximum fraction of grid cells to occupy + **kwargs # Accept extra parameters + ): + super().__init__(border_width=border_width, border_object=border_object) + self._rng = np.random.default_rng(seed) + self._width = width + self._height = height + self._objects = objects + self._agents = agents + self._occupancy_threshold = occupancy_threshold + + # Obstacle parameters. + self._large_obstacles = kwargs.pop("large_obstacles", {"size_range": [10, 25], "count": 0}) + self._small_obstacles = kwargs.pop("small_obstacles", {"size_range": [3, 6], "count": 0}) + self._crosses = kwargs.pop("crosses", {"count": 0}) + # Altars: hearts_count overrides number of altars. + self._hearts_count = kwargs.pop("hearts_count", 50) + self._clumpiness = kwargs.pop("clumpiness", 0) + # Labyrinths: count of mini labyrinths (mazes). + self._labyrinths = kwargs.pop("labyrinths", {"count": 0}) + # Scattered single walls. + self._scattered_walls = kwargs.pop("scattered_walls", {"count": 0}) + + def _build(self) -> np.ndarray: + # Prepare agent symbols. + if isinstance(self._agents, int): + agents = ["agent.agent"] * self._agents + elif isinstance(self._agents, dict) or isinstance(self._agents, DictConfig): + agents = ["agent." + agent for agent, na in self._agents.items() for _ in range(na)] + else: + agents = [] + + # --- Pre-scale objects by estimated occupancy --- + area = self._width * self._height + total_objects = sum(self._objects.values()) + len(agents) + if total_objects > self._occupancy_threshold * area: + scale = (self._occupancy_threshold * area) / total_objects + for obj in self._objects: + if self._objects[obj] > 0: + # Ensure at least one instance if originally nonzero. + self._objects[obj] = max(1, int(self._objects[obj] * scale)) + + # Create an empty grid. + grid = np.full((self._height, self._width), "empty", dtype=object) + + # Place features in order: + grid = self._place_labyrinths(grid) + grid = self._place_all_obstacles(grid) + grid = self._place_scattered_walls(grid) + + # --- Place remaining objects (e.g., altars, other objects) --- + for obj, count in self._objects.items(): + if obj == "altar": + count = self._hearts_count + for _ in range(count): + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for object placement.") + grid[pos[0], pos[1]] = obj + + # --- Place agents --- + for agent in agents: + pos = self._choose_random_empty(grid) + if pos is None: + raise ValueError("No empty space available for agent placement.") + grid[pos[0], pos[1]] = agent + + return grid + + # -------------------------------------------------------------------------- + # Helper Functions for Candidate Search and Placement + # -------------------------------------------------------------------------- + def _find_candidates(self, grid: np.ndarray, region_shape: Tuple[int, int]) -> List[Tuple[int, int]]: + """ + Returns list of top-left indices where a sub-region of given shape is completely "empty". + """ + region_h, region_w = region_shape + h, w = grid.shape + candidates = [] + for r in range(h - region_h + 1): + for c in range(w - region_w + 1): + if np.all(grid[r:r+region_h, c:c+region_w] == "empty"): + candidates.append((r, c)) + return candidates + + def _choose_random_empty(self, grid: np.ndarray) -> Optional[Tuple[int, int]]: + """ + Returns a random empty cell (as a tuple) from the grid. + """ + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return None + idx = self._rng.integers(0, len(empty_positions)) + return tuple(empty_positions[idx]) + + def _place_candidate_region(self, grid: np.ndarray, pattern: np.ndarray, clearance: int = 0) -> bool: + """ + Attempts to place a given pattern on grid. If clearance > 0, an effective region is + computed by padding the pattern dimensions with the clearance. + """ + p_h, p_w = pattern.shape + effective_h, effective_w = p_h + 2 * clearance, p_w + 2 * clearance + candidates = self._find_candidates(grid, (effective_h, effective_w)) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r+clearance: r+clearance+p_h, c+clearance: c+clearance+p_w] = pattern + return True + return False + + # -------------------------------------------------------------------------- + # Placement Routines + # -------------------------------------------------------------------------- + def _place_labyrinths(self, grid: np.ndarray) -> np.ndarray: + """ + Place mini labyrinths onto the grid. + """ + labyrinth_count = self._labyrinths.get("count", 0) + for _ in range(labyrinth_count): + pattern = self._generate_labyrinth_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place a labyrinth; no valid region found.") + return grid + + def _place_all_obstacles(self, grid: np.ndarray) -> np.ndarray: + """ + Places large obstacles, small obstacles, and cross obstacles on the grid. + For obstacles, a one-cell clearance is enforced. + """ + clearance = 1 + + # Place large obstacles. + large_count = self._large_obstacles.get("count", 0) + low_large, high_large = self._large_obstacles.get("size_range", [10, 25]) + for _ in range(large_count): + target_blocks = self._rng.integers(low_large, high_large + 1) + pattern = self._generate_random_shape(target_blocks) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place large obstacle of {target_blocks} blocks.") + + # Place small obstacles. + small_count = self._small_obstacles.get("count", 0) + low_small, high_small = self._small_obstacles.get("size_range", [3, 6]) + for _ in range(small_count): + target_blocks = self._rng.integers(low_small, high_small + 1) + pattern = self._generate_random_shape(target_blocks) + if not self._place_candidate_region(grid, pattern, clearance): + print(f"Warning: Could not place small obstacle of {target_blocks} blocks.") + + # Place cross obstacles (no extra clearance assumed). + crosses_count = self._crosses.get("count", 0) + for _ in range(crosses_count): + pattern = self._generate_cross_pattern() + candidates = self._find_candidates(grid, pattern.shape) + if candidates: + r, c = candidates[self._rng.integers(0, len(candidates))] + grid[r:r+pattern.shape[0], c:c+pattern.shape[1]] = pattern + else: + print("Warning: Could not place cross obstacle.") + return grid + + def _place_scattered_walls(self, grid: np.ndarray) -> np.ndarray: + """ + Scatter single wall cells randomly on empty grid locations. + """ + count = self._scattered_walls.get("count", 0) + empty_positions = np.argwhere(grid == "empty") + if len(empty_positions) == 0: + return grid + # Randomly select indices (without replacement) for wall placement. + num_to_place = min(count, len(empty_positions)) + indices = self._rng.permutation(len(empty_positions))[:num_to_place] + for idx in indices: + r, c = empty_positions[idx] + grid[r, c] = "block" + return grid + + # -------------------------------------------------------------------------- + # Pattern Generation Functions (unchanged in spirit) + # -------------------------------------------------------------------------- + def _generate_random_shape(self, num_blocks: int) -> np.ndarray: + """ + Generates a random connected shape with num_blocks cells set to "wall" using a random walk. + The shape is normalized (top-left at (0,0)). + """ + shape_cells = {(0, 0)} + # Use candidate growth until desired number of blocks is reached. + while len(shape_cells) < num_blocks: + candidates = [] + for (r, c) in shape_cells: + for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]: + candidate = (r + dr, c + dc) + if candidate not in shape_cells: + candidates.append(candidate) + if not candidates: + break + new_cell = candidates[self._rng.integers(0, len(candidates))] + shape_cells.add(new_cell) + # Normalize shape coordinates. + min_r = min(r for r, _ in shape_cells) + min_c = min(c for _, c in shape_cells) + max_r = max(r for r, _ in shape_cells) + max_c = max(c for _, c in shape_cells) + pattern = np.full((max_r - min_r + 1, max_c - min_c + 1), "empty", dtype=object) + for r, c in shape_cells: + pattern[r - min_r, c - min_c] = "wall" + return pattern + + def _generate_cross_pattern(self) -> np.ndarray: + """ + Generates a cross-shaped obstacle. + Both width and height are sampled between 1 and 8 (inclusive). + The center row and column are set to "wall". + """ + cross_w = self._rng.integers(1, 9) + cross_h = self._rng.integers(1, 9) + pattern = np.full((cross_h, cross_w), "empty", dtype=object) + center_row = cross_h // 2 + center_col = cross_w // 2 + pattern[center_row, :] = "wall" + pattern[:, center_col] = "wall" + return pattern + + def _generate_labyrinth_pattern(self) -> np.ndarray: + """ + Generates a mini labyrinth (maze) pattern. + Dimensions are chosen randomly near 11×11 and forced to be odd. + A recursive backtracking algorithm carves passages. After that, passages + are thickened probabilistically, border gaps are ensured, and empty cells + are randomly replaced with "heart". + """ + # Choose dimensions between 11 and 13 then clamp and force oddness. + h = int(self._rng.integers(11, 14)) + w = int(self._rng.integers(11, 14)) + h = 11 if h > 11 else h + w = 11 if w > 11 else w + if h % 2 == 0: h -= 1 + if w % 2 == 0: w -= 1 + + maze = np.full((h, w), "wall", dtype=object) + start = (1, 1) + maze[start] = "empty" + stack = [start] + directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] + while stack: + r, c = stack[-1] + neighbors = [] + for dr, dc in directions: + nr, nc = r + dr, c + dc + if 0 <= nr < h and 0 <= nc < w and maze[nr, nc] == "wall": + neighbors.append((nr, nc)) + if neighbors: + next_cell = neighbors[self._rng.integers(0, len(neighbors))] + nr, nc = next_cell + wall_r, wall_c = r + (nr - r) // 2, c + (nc - c) // 2 + maze[wall_r, wall_c] = "empty" + maze[nr, nc] = "empty" + stack.append(next_cell) + else: + stack.pop() + + # Apply thickening to passages. + thick_prob = 0.3 + 0.7 * self._rng.random() + maze_thick = maze.copy() + for i in range(1, h - 1): + for j in range(1, w - 1): + if maze[i, j] == "empty": + if self._rng.random() < thick_prob and j+1 < w: + maze_thick[i, j + 1] = "empty" + if self._rng.random() < thick_prob and i+1 < h: + maze_thick[i + 1, j] = "empty" + maze = maze_thick + + # Ensure border gaps of at least 2 cells. + if w > 3 and not self._has_gap(maze[0, 1:w-1]): + maze[0, 1:3] = "empty" + if w > 3 and not self._has_gap(maze[h - 1, 1:w-1]): + maze[h - 1, 1:3] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, 0]): + maze[1:3, 0] = "empty" + if h > 3 and not self._has_gap(maze[1:h-1, w - 1]): + maze[1:3, w - 1] = "empty" + + # Randomly scatter hearts over empty cells with 30% probability. + for i in range(h): + for j in range(w): + if maze[i, j] == "empty" and self._rng.random() < 0.3: + maze[i, j] = "heart" + + return maze + + def _has_gap(self, line: np.ndarray) -> bool: + """ + Returns True if a 1D array has at least 2 contiguous "empty" cells. + """ + contiguous = 0 + for cell in line: + contiguous = contiguous + 1 if cell == "empty" else 0 + if contiguous >= 2: + return True + return False + +# ============================================================================== +# End of VariedTerrainLabyrinths +# ==============================================================================