diff --git a/modules/cluster_estimation/cluster_estimation.py b/modules/cluster_estimation/cluster_estimation.py index 10c7bb91..e66fcf67 100644 --- a/modules/cluster_estimation/cluster_estimation.py +++ b/modules/cluster_estimation/cluster_estimation.py @@ -55,6 +55,32 @@ class ClusterEstimation: __WEIGHT_DROP_THRESHOLD = 0.1 __MAX_COVARIANCE_THRESHOLD = 10 + @staticmethod + def check_create_arguments( + min_activation_threshold: int, + min_new_points_to_run: int, + max_num_components: int, + random_state: int, + ) -> bool: + """ + Checks if a valid cluster estimation object can be constructed. + + See `ClusterEstimation` for parameter descriptions. + """ + if min_activation_threshold < max_num_components: + return False + + if min_new_points_to_run < 0: + return False + + if max_num_components < 1: + return False + + if random_state < 0: + return False + + return True + @classmethod def create( cls, @@ -85,16 +111,11 @@ def create( RETURNS: The ClusterEstimation object if all conditions pass, otherwise False, None """ - if min_activation_threshold < max_num_components: - return False, None - - if min_new_points_to_run < 0: - return False, None - - if max_num_components < 1: - return False, None + is_valid_arguments = ClusterEstimation.check_create_arguments( + min_activation_threshold, min_new_points_to_run, max_num_components, random_state + ) - if random_state < 0: + if not is_valid_arguments: return False, None return True, ClusterEstimation( @@ -211,21 +232,20 @@ def run( model_output = self.__filter_by_covariances(model_output) # Create output list of remaining valid clusters - detections_in_world = [] + objects_in_world = [] for cluster in model_output: result, landing_pad = object_in_world.ObjectInWorld.create( - cluster[0][0], - cluster[0][1], - cluster[2], + cluster[0][0], cluster[0][1], cluster[2] ) - if result: - detections_in_world.append(landing_pad) - else: - self.__logger.warning("Failed to create ObjectInWorld object") + if not result: + self.__logger.error("Failed to create ObjectInWorld object") + return False, None + + objects_in_world.append(landing_pad) - self.__logger.info(detections_in_world) - return True, detections_in_world + self.__logger.info(objects_in_world) + return True, objects_in_world def __decide_to_run(self, run_override: bool) -> bool: """ diff --git a/modules/cluster_estimation/cluster_estimation_by_label.py b/modules/cluster_estimation/cluster_estimation_by_label.py new file mode 100644 index 00000000..bbd8150c --- /dev/null +++ b/modules/cluster_estimation/cluster_estimation_by_label.py @@ -0,0 +1,162 @@ +""" +Cluster estimation by label. +""" + +from . import cluster_estimation +from .. import detection_in_world +from .. import object_in_world +from ..common.modules.logger import logger + + +class ClusterEstimationByLabel: + """ + Cluster estimation filtered on label. + + ATTRIBUTES + ---------- + min_activation_threshold: int + Minimum total data points before model runs. Must be at least max_num_components. + + min_new_points_to_run: int + Minimum number of new data points that must be collected before running model. + + max_num_components: int + Max number of real landing pads. Must be at least 1. + + random_state: int + Seed for randomizer, to get consistent results. + + local_logger: Logger + For logging error and debug messages. + + METHODS + ------- + run() + Cluster estimation filtered by label. + """ + + __create_key = object() + + @classmethod + def create( + cls, + min_activation_threshold: int, + min_new_points_to_run: int, + max_num_components: int, + random_state: int, + local_logger: logger.Logger, + ) -> "tuple[True, ClusterEstimationByLabel] | tuple[False, None]": + """ + See `ClusterEstimation` for parameter descriptions. + + Return: Success, cluster estimation by label object. + """ + + is_valid_arguments = cluster_estimation.ClusterEstimation.check_create_arguments( + min_activation_threshold, min_new_points_to_run, max_num_components, random_state + ) + + if not is_valid_arguments: + return False, None + + return True, ClusterEstimationByLabel( + cls.__create_key, + min_activation_threshold, + min_new_points_to_run, + max_num_components, + random_state, + local_logger, + ) + + def __init__( + self, + class_private_create_key: object, + min_activation_threshold: int, + min_new_points_to_run: int, + max_num_components: int, + random_state: int, + local_logger: logger.Logger, + ) -> None: + """ + Private constructor, use create() method. + """ + assert ( + class_private_create_key is ClusterEstimationByLabel.__create_key + ), "Use create() method" + + # Construction arguments for `ClusterEstimation` + self.__min_activation_threshold = min_activation_threshold + self.__min_new_points_to_run = min_new_points_to_run + self.__max_num_components = max_num_components + self.__random_state = random_state + self.__local_logger = local_logger + + # Cluster model corresponding to each label + # Each cluster estimation object stores the detections given to in its __all_points bucket across runs + self.__label_to_cluster_estimation_model: dict[ + int, cluster_estimation.ClusterEstimation + ] = {} + + def run( + self, + input_detections: list[detection_in_world.DetectionInWorld], + run_override: bool, + ) -> tuple[True, dict[int, list[object_in_world.ObjectInWorld]]] | tuple[False, None]: + """ + See `ClusterEstimation` for parameter descriptions. + + RETURNS + ------- + model_ran: bool + True if ClusterEstimation object successfully ran its estimation model, False otherwise. + + labels_to_objects: dict[int, list[object_in_world.ObjectInWorld] or None. + Dictionary where the key is a label and the value is a list of all cluster detections with that label. + ObjectInWorld objects don't have a label property, but they are sorted into label categories in the dictionary. + """ + label_to_detections: dict[int, list[detection_in_world.DetectionInWorld]] = {} + + # Filtering detections by label + for detection in input_detections: + if not detection.label in label_to_detections: + label_to_detections[detection.label] = [] + + label_to_detections[detection.label].append(detection) + + labels_to_objects: dict[int, list[object_in_world.ObjectInWorld]] = {} + + for label, detections in label_to_detections.items(): + # Create cluster estimation for label if it doesn't exist + if not label in self.__label_to_cluster_estimation_model: + result, cluster_model = cluster_estimation.ClusterEstimation.create( + self.__min_activation_threshold, + self.__min_new_points_to_run, + self.__max_num_components, + self.__random_state, + self.__local_logger, + ) + if not result: + self.__local_logger.error( + f"Failed to create cluster estimation for label {label}" + ) + return False, None + + self.__label_to_cluster_estimation_model[label] = cluster_model + + # Runs cluster estimation for specific label + result, clusters = self.__label_to_cluster_estimation_model[label].run( + detections, + run_override, + ) + + if not result: + self.__local_logger.error( + f"Failed to run cluster estimation model for label {label}" + ) + return False, None + + if not label in labels_to_objects: + labels_to_objects[label] = [] + labels_to_objects[label] += clusters + + return True, labels_to_objects diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index 0f378625..3b2ac686 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -26,26 +26,9 @@ def cluster_estimation_worker( PARAMETERS ---------- - min_activation_threshold: int - Minimum total data points before model runs. - min_new_points_to_run: int - Minimum number of new data points that must be collected before running model. + See `ClusterEstimation` for parameter descriptions. - max_num_components: int - Max number of real landing pads. - - random_state: int - Seed for randomizer, to get consistent results. - - input_queue: queue_proxy_wrapper.QueuePRoxyWrapper - Data queue. - - output_queue: queue_proxy_wrapper.QueuePRoxyWrapper - Data queue. - - worker_controller: worker_controller.WorkerController - How the main process communicates to this worker process. """ worker_name = pathlib.Path(__file__).stem process_id = os.getpid() diff --git a/modules/common b/modules/common index 9acf88b4..9b10a334 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad +Subproject commit 9b10a334651b7cca5d014d4640e42d3a55d128f8 diff --git a/modules/object_in_world.py b/modules/object_in_world.py index 83922253..6d8ef38d 100644 --- a/modules/object_in_world.py +++ b/modules/object_in_world.py @@ -21,7 +21,12 @@ def create( if spherical_variance < 0.0: return False, None - return True, ObjectInWorld(cls.__create_key, location_x, location_y, spherical_variance) + return True, ObjectInWorld( + cls.__create_key, + location_x, + location_y, + spherical_variance, + ) def __init__( self, diff --git a/tests/unit/test_cluster_estimation_by_label.py b/tests/unit/test_cluster_estimation_by_label.py new file mode 100644 index 00000000..ff2c32ee --- /dev/null +++ b/tests/unit/test_cluster_estimation_by_label.py @@ -0,0 +1,347 @@ +""" +Testing ClusterEstimationByLabel. +""" + +import random +import numpy as np +import pytest +import sklearn.datasets + +from modules.cluster_estimation import cluster_estimation_by_label +from modules.common.modules.logger import logger +from modules import detection_in_world + +MIN_TOTAL_POINTS_THRESHOLD = 100 +MIN_NEW_POINTS_TO_RUN = 10 +MAX_NUM_COMPONENTS = 10 +RNG_SEED = 0 +CENTRE_BOX_SIZE = 500 + +# Test functions use test fixture signature names and access class privates +# No enable +# pylint: disable=protected-access,redefined-outer-name,too-many-instance-attributes,duplicate-code + + +@pytest.fixture() +def cluster_model_by_label() -> cluster_estimation_by_label.ClusterEstimationByLabel: # type: ignore + """ + Cluster estimation by label object. + """ + result, test_logger = logger.Logger.create("test_logger", False) + assert result + assert test_logger is not None + + result, model = cluster_estimation_by_label.ClusterEstimationByLabel.create( + MIN_TOTAL_POINTS_THRESHOLD, + MIN_NEW_POINTS_TO_RUN, + MAX_NUM_COMPONENTS, + RNG_SEED, + test_logger, + ) + assert result + assert model is not None + + yield model # type: ignore + + +def generate_cluster_data( + n_samples_per_cluster: "list[int]", + cluster_standard_deviation: int, + label: int, +) -> "tuple[list[detection_in_world.DetectionInWorld], list[np.ndarray]]": + """ + Returns a list of points (DetectionInWorld objects) with specified points per cluster + and standard deviation. + + PARAMETERS + ---------- + n_samples_per_cluster: list[int] + List corresponding to how many points to generate for each generated cluster + ex: [10 20 30] will generate 10 points for one cluster, 20 points for the next, + and 30 points for the final cluster. + + cluster_standard_deviation: int + The standard deviation of the generated points, bigger + standard deviation == more spread out points. + + label: int + The label that every generated detection gets assigned + + RETURNS + ------- + detections: list[detection_in_world.DetectionInWorld] + List of points (DetectionInWorld objects). + + cluster_positions: list[np.ndarray] + Coordinate positions of each cluster centre. + ------- + """ + # .make_blobs() is a sklearn library function that returns a tuple of two values + # First value is ndarray of shape (2, total # of samples) that gives the (x,y) + # coordinate of generated data points. + # Second value is the integer labels for cluster membership of each generated point (unused). + # Third value is the (x,y) coordinates for each of the cluster centres. + + generated_points, _, cluster_positions = sklearn.datasets.make_blobs( # type: ignore + n_samples=n_samples_per_cluster, + n_features=2, + cluster_std=cluster_standard_deviation, + center_box=(0, CENTRE_BOX_SIZE), + random_state=RNG_SEED, + return_centers=True, + ) + + detections = [] + for point in generated_points: + # Placeholder variables to create DetectionInWorld objects + placeholder_vertices = np.array([[0, 0], [0, 0], [0, 0], [0, 0]]) + placeholder_confidence = 0.5 + + result, detection_to_add = detection_in_world.DetectionInWorld.create( + placeholder_vertices, + point, + label, + placeholder_confidence, + ) + + assert result + assert detection_to_add is not None + detections.append(detection_to_add) + + return detections, cluster_positions.tolist() + + +def generate_cluster_data_by_label( + labels_to_n_samples_per_cluster: "dict[int, list[int]]", + cluster_standard_deviation: int, +) -> "tuple[list[detection_in_world.DetectionInWorld], dict[int, list[np.ndarray]]]": + """ + Returns a list of labeled points (DetectionInWorld objects) with specified points per cluster + and standard deviation. + + PARAMETERS + ---------- + labels_to_cluster_samples: "dict[int, list[int]]" + Dictionary where the key is a label and the value is a + list of integers the represent the number of samples a cluster has. + + cluster_standard_deviation: int + The standard deviation of the generated points, bigger + standard deviation == more spread out points. + + RETURNS + ------- + detections: list[detection_in_world.DetectionInWorld] + List of points (DetectionInWorld objects). + + labels_to_cluster_positions: dict[int, list[np.ndarray]] + Dictionary where the key is a label and the value is a + list of coordinate positions of each cluster centre with that label. + ------- + """ + + detections = [] + labels_to_cluster_positions: dict[int, list[np.ndarray]] = {} + + for label, n_samples_list in labels_to_n_samples_per_cluster.items(): + temp_detections, cluster_positions = generate_cluster_data( + n_samples_list, cluster_standard_deviation, label + ) + detections += temp_detections + labels_to_cluster_positions[label] = cluster_positions + + return detections, labels_to_cluster_positions + + +class TestModelExecutionCondition: + """ + Tests execution condition for estimation worker at different amount of total and new data + points. + """ + + __STD_DEV_REG = 1 # Regular standard deviation is 1m + + def test_under_min_total_threshold( + self, cluster_model_by_label: cluster_estimation_by_label.ClusterEstimationByLabel + ) -> None: + """ + Total data under threshold should not run. + """ + # Setup + original_count = MIN_TOTAL_POINTS_THRESHOLD - 1 # Less than min threshold (100) + + generated_detections, _ = generate_cluster_data_by_label( + {0: [original_count]}, self.__STD_DEV_REG + ) + + # Run + result, detections_in_world = cluster_model_by_label.run(generated_detections, False) + + # Test + assert not result + assert detections_in_world is None + + def test_at_min_total_threshold( + self, cluster_model_by_label: cluster_estimation_by_label.ClusterEstimationByLabel + ) -> None: + """ + Should run once total threshold reached regardless of + current bucket size. + """ + # Setup + original_count = MIN_TOTAL_POINTS_THRESHOLD - 1 # Should not run the first time + new_count = MIN_NEW_POINTS_TO_RUN - 1 # Under 10 new points + + generated_detections, _ = generate_cluster_data_by_label( + {0: [original_count]}, self.__STD_DEV_REG + ) + generated_detections_2, _ = generate_cluster_data_by_label( + {0: [new_count]}, self.__STD_DEV_REG + ) + + # Run + result, detections_in_world = cluster_model_by_label.run(generated_detections, False) + result_2, detections_in_world_2 = cluster_model_by_label.run(generated_detections_2, False) + + # Test + assert not result + assert detections_in_world is None + assert result_2 + assert detections_in_world_2 is not None + + def test_under_min_bucket_size( + self, cluster_model_by_label: cluster_estimation_by_label.ClusterEstimationByLabel + ) -> None: + """ + New data under threshold should not run. + """ + # Setup + original_count = MIN_TOTAL_POINTS_THRESHOLD + 10 # Should run the first time + new_count = MIN_NEW_POINTS_TO_RUN - 1 # Under 10 new points, shouldn't run + + generated_detections, _ = generate_cluster_data_by_label( + {0: [original_count]}, self.__STD_DEV_REG + ) + generated_detections_2, _ = generate_cluster_data_by_label( + {0: [new_count]}, self.__STD_DEV_REG + ) + + # Run + result, detections_in_world = cluster_model_by_label.run(generated_detections, False) + result_2, detections_in_world_2 = cluster_model_by_label.run(generated_detections_2, False) + + # Test + assert result + assert detections_in_world is not None + assert not result_2 + assert detections_in_world_2 is None + + def test_good_data( + self, cluster_model_by_label: cluster_estimation_by_label.ClusterEstimationByLabel + ) -> None: + """ + All conditions met should run. + """ + original_count = MIN_TOTAL_POINTS_THRESHOLD + 1 # More than min total threshold should run + generated_detections, _ = generate_cluster_data_by_label( + {0: [original_count]}, self.__STD_DEV_REG + ) + + # Run + result, detections_in_world = cluster_model_by_label.run(generated_detections, False) + + # Test + assert result + assert detections_in_world is not None + + +class TestCorrectClusterPositionOutput: + """ + Tests if cluster estimation by label properly sorts labels. + """ + + __STD_DEV_REG = 1 # Regular standard deviation is 1m + __MAX_POSITION_TOLERANCE = 1 + + def test_one_label( + self, cluster_model_by_label: cluster_estimation_by_label.ClusterEstimationByLabel + ) -> None: + """ + Five clusters with small standard devition that all have the same label + """ + # Setup + labels_to_n_samples_per_cluster = {1: [50, 100, 150, 200, 250]} + generated_detections, labels_to_generated_cluster_positions = ( + generate_cluster_data_by_label(labels_to_n_samples_per_cluster, self.__STD_DEV_REG) + ) + random.shuffle( + generated_detections + ) # so all abojects with the same label are not arranged all in a row + + # Run + result, detections_in_world = cluster_model_by_label.run(generated_detections, False) + + # Test + assert result + assert detections_in_world is not None + assert len(detections_in_world[1]) == 5 + for cluster in detections_in_world[1]: + is_match = False + for generated_cluster in labels_to_generated_cluster_positions[1]: + # Check if coordinates are equal + distance = np.linalg.norm( + [ + cluster.location_x - generated_cluster[0], + cluster.location_y - generated_cluster[1], + ] + ) + if distance < self.__MAX_POSITION_TOLERANCE: + is_match = True + break + + assert is_match + + def test_multiple_labels( + self, cluster_model_by_label: cluster_estimation_by_label.ClusterEstimationByLabel + ) -> None: + """ + Five clusters with small standard devition each belonging to one of three labels, with large points per cluster + """ + # Setup + labels_to_n_samples_per_cluster = { + 1: [70, 100, 130], + 2: [60, 90, 120], + 3: [50, 80, 110], + } + generated_detections, labels_to_generated_cluster_positions = ( + generate_cluster_data_by_label(labels_to_n_samples_per_cluster, self.__STD_DEV_REG) + ) + random.shuffle( + generated_detections + ) # so all abojects with the same label are not arranged all in a row + + # Run + result, detections_in_world = cluster_model_by_label.run(generated_detections, False) + + # Test + assert result + assert detections_in_world is not None + assert len(detections_in_world[1]) == 3 + assert len(detections_in_world[2]) == 3 + assert len(detections_in_world[3]) == 3 + for label in range(1, 4): + for cluster in detections_in_world[label]: + is_match = False + for generated_cluster in labels_to_generated_cluster_positions[label]: + # Check if coordinates are equal + distance = np.linalg.norm( + [ + cluster.location_x - generated_cluster[0], + cluster.location_y - generated_cluster[1], + ] + ) + if distance < self.__MAX_POSITION_TOLERANCE: + is_match = True + break + + assert is_match