|
| 1 | +""" |
| 2 | +This file implements a grid with a 3d reservation matrix with dimensions for x, y, and time. There |
| 3 | +is also infrastructure to generate dynamic obstacles that move around the grid. The obstacles' paths |
| 4 | +are stored in the reservation matrix on creation. |
| 5 | +""" |
| 6 | +import numpy as np |
| 7 | +import matplotlib.pyplot as plt |
| 8 | +from enum import Enum |
| 9 | +from dataclasses import dataclass |
| 10 | + |
| 11 | +@dataclass(order=True) |
| 12 | +class Position: |
| 13 | + x: int |
| 14 | + y: int |
| 15 | + |
| 16 | + def as_ndarray(self) -> np.ndarray: |
| 17 | + return np.array([self.x, self.y]) |
| 18 | + |
| 19 | + def __add__(self, other): |
| 20 | + if isinstance(other, Position): |
| 21 | + return Position(self.x + other.x, self.y + other.y) |
| 22 | + raise NotImplementedError( |
| 23 | + f"Addition not supported for Position and {type(other)}" |
| 24 | + ) |
| 25 | + |
| 26 | + def __sub__(self, other): |
| 27 | + if isinstance(other, Position): |
| 28 | + return Position(self.x - other.x, self.y - other.y) |
| 29 | + raise NotImplementedError( |
| 30 | + f"Subtraction not supported for Position and {type(other)}" |
| 31 | + ) |
| 32 | + |
| 33 | + |
| 34 | +class ObstacleArrangement(Enum): |
| 35 | + # Random obstacle positions and movements |
| 36 | + RANDOM = 0 |
| 37 | + # Obstacles start in a line in y at center of grid and move side-to-side in x |
| 38 | + ARRANGEMENT1 = 1 |
| 39 | + |
| 40 | + |
| 41 | +class Grid: |
| 42 | + # Set in constructor |
| 43 | + grid_size: np.ndarray |
| 44 | + reservation_matrix: np.ndarray |
| 45 | + obstacle_paths: list[list[Position]] = [] |
| 46 | + # Obstacles will never occupy these points. Useful to avoid impossible scenarios |
| 47 | + obstacle_avoid_points: list[Position] = [] |
| 48 | + |
| 49 | + # Number of time steps in the simulation |
| 50 | + time_limit: int |
| 51 | + |
| 52 | + # Logging control |
| 53 | + verbose = False |
| 54 | + |
| 55 | + def __init__( |
| 56 | + self, |
| 57 | + grid_size: np.ndarray, |
| 58 | + num_obstacles: int = 40, |
| 59 | + obstacle_avoid_points: list[Position] = [], |
| 60 | + obstacle_arrangement: ObstacleArrangement = ObstacleArrangement.RANDOM, |
| 61 | + time_limit: int = 100, |
| 62 | + ): |
| 63 | + self.obstacle_avoid_points = obstacle_avoid_points |
| 64 | + self.time_limit = time_limit |
| 65 | + self.grid_size = grid_size |
| 66 | + self.reservation_matrix = np.zeros((grid_size[0], grid_size[1], self.time_limit)) |
| 67 | + |
| 68 | + if num_obstacles > self.grid_size[0] * self.grid_size[1]: |
| 69 | + raise Exception("Number of obstacles is greater than grid size!") |
| 70 | + |
| 71 | + if obstacle_arrangement == ObstacleArrangement.RANDOM: |
| 72 | + self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles) |
| 73 | + elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1: |
| 74 | + self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles) |
| 75 | + |
| 76 | + for i, path in enumerate(self.obstacle_paths): |
| 77 | + obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid |
| 78 | + for t, position in enumerate(path): |
| 79 | + # Reserve old & new position at this time step |
| 80 | + if t > 0: |
| 81 | + self.reservation_matrix[path[t - 1].x, path[t - 1].y, t] = obs_idx |
| 82 | + self.reservation_matrix[position.x, position.y, t] = obs_idx |
| 83 | + |
| 84 | + """ |
| 85 | + Generate dynamic obstacles that move around the grid. Initial positions and movements are random |
| 86 | + """ |
| 87 | + def generate_dynamic_obstacles(self, obs_count: int) -> list[list[Position]]: |
| 88 | + obstacle_paths = [] |
| 89 | + for _ in (0, obs_count): |
| 90 | + # Sample until a free starting space is found |
| 91 | + initial_position = self.sample_random_position() |
| 92 | + while not self.valid_obstacle_position(initial_position, 0): |
| 93 | + initial_position = self.sample_random_position() |
| 94 | + |
| 95 | + positions = [initial_position] |
| 96 | + if self.verbose: |
| 97 | + print("Obstacle initial position: ", initial_position) |
| 98 | + |
| 99 | + # Encourage obstacles to mostly stay in place - too much movement leads to chaotic planning scenarios |
| 100 | + # that are not fun to watch |
| 101 | + weights = [0.05, 0.05, 0.05, 0.05, 0.8] |
| 102 | + diffs = [ |
| 103 | + Position(0, 1), |
| 104 | + Position(0, -1), |
| 105 | + Position(1, 0), |
| 106 | + Position(-1, 0), |
| 107 | + Position(0, 0), |
| 108 | + ] |
| 109 | + |
| 110 | + for t in range(1, self.time_limit - 1): |
| 111 | + sampled_indices = np.random.choice( |
| 112 | + len(diffs), size=5, replace=False, p=weights |
| 113 | + ) |
| 114 | + rand_diffs = [diffs[i] for i in sampled_indices] |
| 115 | + |
| 116 | + valid_position = None |
| 117 | + for diff in rand_diffs: |
| 118 | + new_position = positions[-1] + diff |
| 119 | + |
| 120 | + if not self.valid_obstacle_position(new_position, t): |
| 121 | + continue |
| 122 | + |
| 123 | + valid_position = new_position |
| 124 | + break |
| 125 | + |
| 126 | + # Impossible situation for obstacle - stay in place |
| 127 | + # -> this can happen if the oaths of other obstacles this one |
| 128 | + if valid_position is None: |
| 129 | + valid_position = positions[-1] |
| 130 | + |
| 131 | + positions.append(valid_position) |
| 132 | + |
| 133 | + obstacle_paths.append(positions) |
| 134 | + |
| 135 | + return obstacle_paths |
| 136 | + |
| 137 | + """ |
| 138 | + Generate a line of obstacles in y at the center of the grid that move side-to-side in x |
| 139 | + Bottom half start moving right, top half start moving left. If `obs_count` is less than the length of |
| 140 | + the grid, only the first `obs_count` obstacles will be generated. |
| 141 | + """ |
| 142 | + def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: |
| 143 | + obstacle_paths = [] |
| 144 | + half_grid_x = self.grid_size[0] // 2 |
| 145 | + half_grid_y = self.grid_size[1] // 2 |
| 146 | + |
| 147 | + for y_idx in range(0, min(obs_count, self.grid_size[1])): |
| 148 | + moving_right = y_idx < half_grid_y |
| 149 | + position = Position(half_grid_x, y_idx) |
| 150 | + path = [position] |
| 151 | + |
| 152 | + for t in range(1, self.time_limit - 1): |
| 153 | + # sit in place every other time step |
| 154 | + if t % 2 == 0: |
| 155 | + path.append(position) |
| 156 | + continue |
| 157 | + |
| 158 | + # first check if we should switch direction (at edge of grid) |
| 159 | + if (moving_right and position.x == self.grid_size[0] - 1) or ( |
| 160 | + not moving_right and position.x == 0 |
| 161 | + ): |
| 162 | + moving_right = not moving_right |
| 163 | + # step in direction |
| 164 | + position = Position( |
| 165 | + position.x + (1 if moving_right else -1), position.y |
| 166 | + ) |
| 167 | + path.append(position) |
| 168 | + |
| 169 | + obstacle_paths.append(path) |
| 170 | + |
| 171 | + return obstacle_paths |
| 172 | + |
| 173 | + """ |
| 174 | + Check if the given position is valid at time t |
| 175 | +
|
| 176 | + input: |
| 177 | + position (Position): (x, y) position |
| 178 | + t (int): time step |
| 179 | +
|
| 180 | + output: |
| 181 | + bool: True if position/time combination is valid, False otherwise |
| 182 | + """ |
| 183 | + def valid_position(self, position: Position, t: int) -> bool: |
| 184 | + # Check if new position is in grid |
| 185 | + if not self.inside_grid_bounds(position): |
| 186 | + return False |
| 187 | + |
| 188 | + # Check if new position is not occupied at time t |
| 189 | + return self.reservation_matrix[position.x, position.y, t] == 0 |
| 190 | + |
| 191 | + """ |
| 192 | + Returns True if the given position is valid at time t and is not in the set of obstacle_avoid_points |
| 193 | + """ |
| 194 | + def valid_obstacle_position(self, position: Position, t: int) -> bool: |
| 195 | + return ( |
| 196 | + self.valid_position(position, t) |
| 197 | + and position not in self.obstacle_avoid_points |
| 198 | + ) |
| 199 | + |
| 200 | + """ |
| 201 | + Returns True if the given position is within the grid's boundaries |
| 202 | + """ |
| 203 | + def inside_grid_bounds(self, position: Position) -> bool: |
| 204 | + return ( |
| 205 | + position.x >= 0 |
| 206 | + and position.x < self.grid_size[0] |
| 207 | + and position.y >= 0 |
| 208 | + and position.y < self.grid_size[1] |
| 209 | + ) |
| 210 | + |
| 211 | + """ |
| 212 | + Sample a random position that is within the grid's boundaries |
| 213 | +
|
| 214 | + output: |
| 215 | + Position: (x, y) position |
| 216 | + """ |
| 217 | + def sample_random_position(self) -> Position: |
| 218 | + return Position( |
| 219 | + np.random.randint(0, self.grid_size[0]), |
| 220 | + np.random.randint(0, self.grid_size[1]), |
| 221 | + ) |
| 222 | + |
| 223 | + """ |
| 224 | + Returns a tuple of (x_positions, y_positions) of the obstacles at time t |
| 225 | + """ |
| 226 | + def get_obstacle_positions_at_time(self, t: int) -> tuple[list[int], list[int]]: |
| 227 | + x_positions = [] |
| 228 | + y_positions = [] |
| 229 | + for obs_path in self.obstacle_paths: |
| 230 | + x_positions.append(obs_path[t].x) |
| 231 | + y_positions.append(obs_path[t].y) |
| 232 | + return (x_positions, y_positions) |
| 233 | + |
| 234 | + |
| 235 | +show_animation = True |
| 236 | + |
| 237 | + |
| 238 | +def main(): |
| 239 | + grid = Grid( |
| 240 | + np.array([11, 11]), |
| 241 | + num_obstacles=10, |
| 242 | + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, |
| 243 | + ) |
| 244 | + |
| 245 | + if not show_animation: |
| 246 | + return |
| 247 | + |
| 248 | + fig = plt.figure(figsize=(8, 7)) |
| 249 | + ax = fig.add_subplot( |
| 250 | + autoscale_on=False, |
| 251 | + xlim=(0, grid.grid_size[0] - 1), |
| 252 | + ylim=(0, grid.grid_size[1] - 1), |
| 253 | + ) |
| 254 | + ax.set_aspect("equal") |
| 255 | + ax.grid() |
| 256 | + ax.set_xticks(np.arange(0, 11, 1)) |
| 257 | + ax.set_yticks(np.arange(0, 11, 1)) |
| 258 | + (obs_points,) = ax.plot([], [], "ro", ms=15) |
| 259 | + |
| 260 | + # for stopping simulation with the esc key. |
| 261 | + plt.gcf().canvas.mpl_connect( |
| 262 | + "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] |
| 263 | + ) |
| 264 | + |
| 265 | + for i in range(0, grid.time_limit - 1): |
| 266 | + obs_positions = grid.get_obstacle_positions_at_time(i) |
| 267 | + obs_points.set_data(obs_positions[0], obs_positions[1]) |
| 268 | + plt.pause(0.2) |
| 269 | + plt.show() |
| 270 | + |
| 271 | + |
| 272 | +if __name__ == "__main__": |
| 273 | + main() |
0 commit comments