diff --git a/components/clp-package-utils/clp_package_utils/controller.py b/components/clp-package-utils/clp_package_utils/controller.py new file mode 100644 index 0000000000..0b3807a3dd --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/controller.py @@ -0,0 +1,637 @@ +import json +import logging +import multiprocessing +import os +import pathlib +import socket +import stat +import subprocess +import uuid +from abc import ABC, abstractmethod +from typing import Any, Dict + +from clp_py_utils.clp_config import ( + AwsAuthType, + CLPConfig, + COMPRESSION_SCHEDULER_COMPONENT_NAME, + COMPRESSION_WORKER_COMPONENT_NAME, + DB_COMPONENT_NAME, + DeploymentType, + GARBAGE_COLLECTOR_COMPONENT_NAME, + QUERY_SCHEDULER_COMPONENT_NAME, + QUERY_WORKER_COMPONENT_NAME, + QueryEngine, + QUEUE_COMPONENT_NAME, + REDIS_COMPONENT_NAME, + REDUCER_COMPONENT_NAME, + RESULTS_CACHE_COMPONENT_NAME, + StorageEngine, + StorageType, + WEBUI_COMPONENT_NAME, +) +from clp_py_utils.clp_metadata_db_utils import ( + get_archives_table_name, + get_datasets_table_name, + get_files_table_name, +) + +from clp_package_utils.general import ( + check_docker_dependencies, + CONTAINER_CLP_HOME, + dump_shared_container_config, + generate_docker_compose_container_config, + get_clp_home, + validate_db_config, + validate_queue_config, + validate_redis_config, + validate_results_cache_config, + validate_webui_config, +) + +# Type alias for environment variables dictionary. +EnvVarsDict = Dict[str, str] + +LOG_FILE_ACCESS_MODE = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH + +DEFAULT_UID_GID = f"{os.getuid()}:{os.getgid()}" +SERVICE_CONTAINER_USER_ID = 999 +SERVICE_CONTAINER_GROUP_ID = 999 +SERVICE_CONTAINER_UID_GID = f"{SERVICE_CONTAINER_USER_ID}:{SERVICE_CONTAINER_GROUP_ID}" + +logger = logging.getLogger(__name__) + + +class BaseController(ABC): + """ + Abstract base controller for preparing and deploying CLP components. Provides common logic for + preparing environment variables, directories, and configuration files for each service. + """ + + def __init__(self, clp_config: CLPConfig): + self.clp_config = clp_config + self._clp_home = get_clp_home() + self._conf_dir = self._clp_home / "etc" + + @abstractmethod + def start(self): + """ + Starts the set-up components with orchestrator-specific logic. + """ + pass + + @abstractmethod + def stop(self): + """ + Stops the deployed components with orchestrator-specific logic. + """ + pass + + @abstractmethod + def _set_up_env(self): + """ + Sets up all components for the orchestrator by preparing environment variables, directories, + and configuration files. + """ + pass + + def _set_up_env_for_database(self) -> EnvVarsDict: + """ + Sets up environment variables and directories for the database component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = DB_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + conf_logging_file = self._conf_dir / "mysql" / "conf.d" / "logging.cnf" + data_dir = self.clp_config.data_directory / component_name + logs_dir = self.clp_config.logs_directory / component_name + validate_db_config(self.clp_config, conf_logging_file, data_dir, logs_dir) + + data_dir.mkdir(exist_ok=True, parents=True) + logs_dir.mkdir(exist_ok=True, parents=True) + _chown_paths_if_root(data_dir, logs_dir) + + return { + "CLP_DB_CONF_LOGGING_FILE_HOST": str(conf_logging_file), + "CLP_DB_DATA_DIR_HOST": str(data_dir), + "CLP_DB_LOGS_DIR_HOST": str(logs_dir), + "CLP_DB_HOST": _get_ip_from_hostname(self.clp_config.database.host), + "CLP_DB_PORT": str(self.clp_config.database.port), + "CLP_DB_NAME": self.clp_config.database.name, + "CLP_DB_USER": self.clp_config.database.username, + "CLP_DB_PASS": self.clp_config.database.password, + "CLP_DB_IMAGE": ( + "mysql:8.0.23" if "mysql" == self.clp_config.database.type else "mariadb:10-jammy" + ), + } + + def _set_up_env_for_queue(self) -> EnvVarsDict: + """ + Sets up environment variables and directories for the message queue component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = QUEUE_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + logs_dir = self.clp_config.logs_directory / component_name + validate_queue_config(self.clp_config, logs_dir) + + logs_dir.mkdir(exist_ok=True, parents=True) + _chown_paths_if_root(logs_dir) + + return { + "CLP_QUEUE_LOGS_DIR_HOST": str(logs_dir), + "CLP_QUEUE_HOST": _get_ip_from_hostname(self.clp_config.queue.host), + "CLP_QUEUE_PORT": str(self.clp_config.queue.port), + "CLP_QUEUE_USER": self.clp_config.queue.username, + "CLP_QUEUE_PASS": self.clp_config.queue.password, + } + + def _set_up_env_for_redis(self) -> EnvVarsDict: + """ + Sets up environment variables and directories for the Redis component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = REDIS_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + conf_file = self._conf_dir / "redis" / "redis.conf" + data_dir = self.clp_config.data_directory / component_name + logs_dir = self.clp_config.logs_directory / component_name + validate_redis_config(self.clp_config, conf_file, data_dir, logs_dir) + + data_dir.mkdir(exist_ok=True, parents=True) + logs_dir.mkdir(exist_ok=True, parents=True) + _chown_paths_if_root(data_dir, logs_dir) + + return { + "CLP_REDIS_CONF_FILE_HOST": str(conf_file), + "CLP_REDIS_DATA_DIR_HOST": str(data_dir), + "CLP_REDIS_LOGS_DIR_HOST": str(logs_dir), + "CLP_REDIS_HOST": _get_ip_from_hostname(self.clp_config.redis.host), + "CLP_REDIS_PORT": str(self.clp_config.redis.port), + "CLP_REDIS_PASS": self.clp_config.redis.password, + "CLP_REDIS_QUERY_BACKEND_DB": str(self.clp_config.redis.query_backend_database), + "CLP_REDIS_COMPRESSION_BACKEND_DB": str( + self.clp_config.redis.compression_backend_database + ), + } + + def _set_up_env_for_results_cache(self) -> EnvVarsDict: + """ + Sets up environment variables and directories for the results cache (MongoDB) component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = RESULTS_CACHE_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + conf_file = self._conf_dir / "mongo" / "mongod.conf" + data_dir = self.clp_config.data_directory / component_name + logs_dir = self.clp_config.logs_directory / component_name + validate_results_cache_config(self.clp_config, conf_file, data_dir, logs_dir) + + data_dir.mkdir(exist_ok=True, parents=True) + logs_dir.mkdir(exist_ok=True, parents=True) + _chown_paths_if_root(data_dir, logs_dir) + + return { + "CLP_RESULTS_CACHE_CONF_FILE_HOST": str(conf_file), + "CLP_RESULTS_CACHE_DATA_DIR_HOST": str(data_dir), + "CLP_RESULTS_CACHE_LOGS_DIR_HOST": str(logs_dir), + "CLP_RESULTS_CACHE_HOST": _get_ip_from_hostname(self.clp_config.results_cache.host), + "CLP_RESULTS_CACHE_PORT": str(self.clp_config.results_cache.port), + "CLP_RESULTS_CACHE_DB_NAME": self.clp_config.results_cache.db_name, + "CLP_RESULTS_CACHE_STREAM_COLLECTION_NAME": self.clp_config.results_cache.stream_collection_name, + } + + def _set_up_env_for_compression_scheduler(self) -> EnvVarsDict: + """ + Sets up environment variables and files for the compression scheduler component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = COMPRESSION_SCHEDULER_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + log_file = self.clp_config.logs_directory / f"{component_name}.log" + log_file.touch(mode=LOG_FILE_ACCESS_MODE, exist_ok=True) + + return { + "CLP_COMPRESSION_SCHEDULER_LOGGING_LEVEL": self.clp_config.compression_scheduler.logging_level, + "CLP_COMPRESSION_SCHEDULER_LOG_FILE_HOST": str(log_file), + } + + def _set_up_env_for_query_scheduler(self) -> EnvVarsDict: + """ + Sets up environment variables and files for the query scheduler component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = QUERY_SCHEDULER_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + log_file = self.clp_config.logs_directory / f"{component_name}.log" + log_file.touch(mode=LOG_FILE_ACCESS_MODE, exist_ok=True) + + return { + "CLP_QUERY_SCHEDULER_LOGGING_LEVEL": self.clp_config.query_scheduler.logging_level, + "CLP_QUERY_SCHEDULER_LOG_FILE_HOST": str(log_file), + } + + def _set_up_env_for_compression_worker(self, num_workers: int) -> EnvVarsDict: + """ + Sets up environment variables for the compression worker component. + + :param num_workers: Number of worker processes to run. + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = COMPRESSION_WORKER_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + logs_dir = self.clp_config.logs_directory / component_name + + logs_dir.mkdir(parents=True, exist_ok=True) + + return { + "CLP_COMPRESSION_WORKER_CONCURRENCY": str(num_workers), + "CLP_COMPRESSION_WORKER_LOGGING_LEVEL": self.clp_config.compression_worker.logging_level, + "CLP_COMPRESSION_WORKER_LOGS_DIR_HOST": str(logs_dir), + } + + def _set_up_env_for_query_worker(self, num_workers: int) -> EnvVarsDict: + """ + Sets up environment variables for the query worker component. + + :param num_workers: Number of worker processes to run. + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = QUERY_WORKER_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + logs_dir = self.clp_config.logs_directory / component_name + + logs_dir.mkdir(parents=True, exist_ok=True) + + return { + "CLP_QUERY_WORKER_LOGGING_LEVEL": self.clp_config.query_worker.logging_level, + "CLP_QUERY_WORKER_LOGS_DIR_HOST": str(logs_dir), + "CLP_QUERY_WORKER_CONCURRENCY": str(num_workers), + } + + def _set_up_env_for_reducer(self, num_workers: int) -> EnvVarsDict: + """ + Sets up environment variables for the reducer component. + + :param num_workers: Number of worker processes to run. + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = REDUCER_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + logs_dir = self.clp_config.logs_directory / component_name + + logs_dir.mkdir(parents=True, exist_ok=True) + + return { + "CLP_REDUCER_LOGGING_LEVEL": self.clp_config.reducer.logging_level, + "CLP_REDUCER_LOGS_DIR_HOST": str(logs_dir), + "CLP_REDUCER_CONCURRENCY": str(num_workers), + "CLP_REDUCER_UPSERT_INTERVAL": str(self.clp_config.reducer.upsert_interval), + } + + def _set_up_env_for_webui(self, container_clp_config: CLPConfig) -> EnvVarsDict: + """ + Sets up environment variables and settings for the Web UI component. + + :param container_clp_config: CLP configuration inside the containers. + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = WEBUI_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + container_webui_dir = CONTAINER_CLP_HOME / "var" / "www" / "webui" + client_settings_json_path = ( + self._clp_home / "var" / "www" / "webui" / "client" / "settings.json" + ) + server_settings_json_path = ( + self._clp_home / "var" / "www" / "webui" / "server" / "dist" / "settings.json" + ) + + validate_webui_config(self.clp_config, client_settings_json_path, server_settings_json_path) + + # Read, update, and write back client's and server's settings.json + clp_db_connection_params = self.clp_config.database.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + if StorageEngine.CLP_S == self.clp_config.package.storage_engine: + archives_table_name = "" + files_table_name = "" + else: + archives_table_name = get_archives_table_name(table_prefix, None) + files_table_name = get_files_table_name(table_prefix, None) + + client_settings_json_updates = { + "ClpStorageEngine": self.clp_config.package.storage_engine, + "ClpQueryEngine": self.clp_config.package.query_engine, + "MongoDbSearchResultsMetadataCollectionName": self.clp_config.webui.results_metadata_collection_name, + "SqlDbClpArchivesTableName": archives_table_name, + "SqlDbClpDatasetsTableName": get_datasets_table_name(table_prefix), + "SqlDbClpFilesTableName": files_table_name, + "SqlDbClpTablePrefix": table_prefix, + "SqlDbCompressionJobsTableName": "compression_jobs", + } + client_settings_json = self._read_and_update_settings_json( + client_settings_json_path, client_settings_json_updates + ) + with open(client_settings_json_path, "w") as client_settings_json_file: + client_settings_json_file.write(json.dumps(client_settings_json)) + + server_settings_json_updates = { + "SqlDbHost": container_clp_config.database.host, + "SqlDbPort": container_clp_config.database.port, + "SqlDbName": self.clp_config.database.name, + "SqlDbQueryJobsTableName": "query_jobs", + "MongoDbHost": container_clp_config.results_cache.host, + "MongoDbPort": container_clp_config.results_cache.port, + "MongoDbName": self.clp_config.results_cache.db_name, + "MongoDbSearchResultsMetadataCollectionName": self.clp_config.webui.results_metadata_collection_name, + "MongoDbStreamFilesCollectionName": self.clp_config.results_cache.stream_collection_name, + "ClientDir": str(container_webui_dir / "client"), + "LogViewerDir": str(container_webui_dir / "yscope-log-viewer"), + "StreamTargetUncompressedSize": self.clp_config.stream_output.target_uncompressed_size, + } + + stream_storage = self.clp_config.stream_output.storage + if StorageType.S3 == stream_storage.type: + s3_config = stream_storage.s3_config + server_settings_json_updates["StreamFilesDir"] = None + server_settings_json_updates["StreamFilesS3Region"] = s3_config.region_code + server_settings_json_updates["StreamFilesS3PathPrefix"] = ( + f"{s3_config.bucket}/{s3_config.key_prefix}" + ) + auth = s3_config.aws_authentication + if AwsAuthType.profile == auth.type: + server_settings_json_updates["StreamFilesS3Profile"] = auth.profile + else: + server_settings_json_updates["StreamFilesS3Profile"] = None + elif StorageType.FS == stream_storage.type: + server_settings_json_updates["StreamFilesDir"] = str( + container_clp_config.stream_output.get_directory() + ) + server_settings_json_updates["StreamFilesS3Region"] = None + server_settings_json_updates["StreamFilesS3PathPrefix"] = None + server_settings_json_updates["StreamFilesS3Profile"] = None + + query_engine = self.clp_config.package.query_engine + if QueryEngine.PRESTO == query_engine: + server_settings_json_updates["PrestoHost"] = self.clp_config.presto.host + server_settings_json_updates["PrestoPort"] = self.clp_config.presto.port + else: + server_settings_json_updates["PrestoHost"] = None + server_settings_json_updates["PrestoPort"] = None + + server_settings_json = self._read_and_update_settings_json( + server_settings_json_path, server_settings_json_updates + ) + with open(server_settings_json_path, "w") as settings_json_file: + settings_json_file.write(json.dumps(server_settings_json)) + + return { + "CLP_WEBUI_HOST": _get_ip_from_hostname(self.clp_config.webui.host), + "CLP_WEBUI_PORT": str(self.clp_config.webui.port), + "CLP_WEBUI_RATE_LIMIT": str(self.clp_config.webui.rate_limit), + } + + def _set_up_env_for_garbage_collector(self) -> EnvVarsDict: + """ + Sets up environment variables for the garbage collector component. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = GARBAGE_COLLECTOR_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + logs_dir = self.clp_config.logs_directory / component_name + + logs_dir.mkdir(parents=True, exist_ok=True) + + return {"CLP_GC_LOGGING_LEVEL": self.clp_config.garbage_collector.logging_level} + + def _read_and_update_settings_json( + self, settings_file_path: pathlib.Path, updates: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Reads and updates a settings JSON file. + + :param settings_file_path: + :param updates: + """ + with open(settings_file_path, "r") as settings_json_file: + settings_object = json.loads(settings_json_file.read()) + self._update_settings_object("", settings_object, updates) + + return settings_object + + def _update_settings_object( + self, + parent_key_prefix: str, + settings: Dict[str, Any], + updates: Dict[str, Any], + ): + """ + Recursively updates the given settings object with the values from `updates`. + + :param parent_key_prefix: The prefix for keys at this level in the settings dictionary. + :param settings: The settings to update. + :param updates: The updates. + :raises ValueError: If a key in `updates` doesn't exist in `settings`. + """ + for key, value in updates.items(): + if key not in settings: + error_msg = ( + f"{parent_key_prefix}{key} is not a valid configuration key for the webui." + ) + raise ValueError(error_msg) + if isinstance(value, dict): + self._update_settings_object(f"{parent_key_prefix}{key}.", settings[key], value) + else: + settings[key] = updates[key] + + +class DockerComposeController(BaseController): + """ + Controller for deploying CLP components using Docker Compose. + """ + + def __init__(self, clp_config: CLPConfig, instance_id: str): + self._project_name = f"clp-package-{instance_id}" + super().__init__(clp_config) + + def start(self): + """ + Deploys CLP components using Docker Compose by: + 1. Checking Docker dependencies. + 2. Setting up environment variables and configuration. + 3. Running `docker compose up -d`. + """ + check_docker_dependencies(should_compose_run=False, project_name=self._project_name) + self._set_up_env() + + deployment_type = self.clp_config.get_deployment_type() + logger.info(f"Starting CLP using Docker Compose ({deployment_type})...") + + cmd = ["docker", "compose", "--project-name", self._project_name] + if deployment_type == DeploymentType.BASE: + cmd += ["--file", "docker-compose.base.yaml"] + cmd += ["up", "--detach"] + try: + subprocess.run( + cmd, + cwd=self._clp_home, + stderr=subprocess.STDOUT, + check=True, + ) + except subprocess.CalledProcessError: + logger.exception("Failed to start CLP.") + raise + + def stop(self): + """ + Stops CLP components deployed via Docker Compose. + """ + check_docker_dependencies(should_compose_run=True, project_name=self._project_name) + + logger.info("Stopping all CLP containers using Docker Compose...") + try: + subprocess.run( + ["docker", "compose", "--project-name", self._project_name, "down"], + cwd=self._clp_home, + stderr=subprocess.STDOUT, + check=True, + ) + logger.info("All CLP containers stopped.") + except subprocess.CalledProcessError: + logger.exception("Failed to stop CLP containers using Docker Compose.") + raise + + @staticmethod + def _get_num_workers() -> int: + """ + Gets the parallelism number for worker components. + TODO: Revisit after moving from single-container to multi-container workers. + :return: Number of worker processes. + """ + return multiprocessing.cpu_count() // 2 + + def _set_up_env(self): + """ + Sets up all CLP components for Docker Compose by: + - Generating container-specific config. + - Preparing environment variables for all components. + - Writing environment variables to `.env`. + """ + container_clp_config = generate_docker_compose_container_config(self.clp_config) + num_workers = self._get_num_workers() + dump_shared_container_config(container_clp_config, self.clp_config) + + env_dict = { + "CLP_PACKAGE_STORAGE_ENGINE": self.clp_config.package.storage_engine, + # User and group IDs + "CLP_UID_GID": DEFAULT_UID_GID, + "CLP_SERVICE_CONTAINER_UID_GID": ( + SERVICE_CONTAINER_UID_GID if os.geteuid() == 0 else DEFAULT_UID_GID + ), + # Package container + "CLP_PACKAGE_CONTAINER": self.clp_config.container_image_ref, + # Runtime data directories + "CLP_DATA_DIR_HOST": str(self.clp_config.data_directory), + "CLP_LOGS_DIR_HOST": str(self.clp_config.logs_directory), + # Input directories + "CLP_LOGS_INPUT_DIR_HOST": str(self.clp_config.logs_input.directory), + "CLP_LOGS_INPUT_DIR_CONTAINER": str(container_clp_config.logs_input.directory), + # Output directories + "CLP_ARCHIVE_OUTPUT_DIR_HOST": str(self.clp_config.archive_output.get_directory()), + "CLP_STREAM_OUTPUT_DIR_HOST": str(self.clp_config.stream_output.get_directory()), + # AWS credentials + "CLP_AWS_ACCESS_KEY_ID": os.getenv("AWS_ACCESS_KEY_ID", ""), + "CLP_AWS_SECRET_ACCESS_KEY": os.getenv("AWS_SECRET_ACCESS_KEY", ""), + **self._set_up_env_for_database(), + **self._set_up_env_for_queue(), + **self._set_up_env_for_redis(), + **self._set_up_env_for_results_cache(), + **self._set_up_env_for_compression_scheduler(), + **self._set_up_env_for_query_scheduler(), + **self._set_up_env_for_compression_worker(num_workers), + **self._set_up_env_for_query_worker(num_workers), + **self._set_up_env_for_reducer(num_workers), + **self._set_up_env_for_webui(container_clp_config), + **self._set_up_env_for_garbage_collector(), + } + + if self.clp_config.aws_config_directory is not None: + env_dict["CLP_AWS_CONFIG_DIR_HOST"] = str(self.clp_config.aws_config_directory) + + with open(f"{self._clp_home}/.env", "w") as env_file: + for key, value in env_dict.items(): + env_file.write(f"{key}={value}\n") + + +def get_or_create_instance_id(clp_config: CLPConfig): + """ + Gets or create a unique instance ID for this CLP instance. + :param clp_config: + :return: The instance ID. + """ + instance_id_file_path = clp_config.logs_directory / "instance-id" + + if instance_id_file_path.exists(): + with open(instance_id_file_path, "r") as f: + instance_id = f.readline() + else: + instance_id = str(uuid.uuid4())[-4:] + with open(instance_id_file_path, "w") as f: + f.write(instance_id) + + return instance_id + + +def _chown_paths_if_root(*paths: pathlib.Path): + """ + Changes ownership of the given paths to the default service container user/group IDs if the + current process is running as root. + + :param paths: + """ + if os.getuid() != 0: + return + for path in paths: + _chown_recursively(path, SERVICE_CONTAINER_USER_ID, SERVICE_CONTAINER_GROUP_ID) + + +def _chown_recursively( + path: pathlib.Path, + user_id: int, + group_id: int, +): + """ + Recursively changes the owner of the given path to the given user ID and group ID. + + :param path: + :param user_id: + :param group_id: + """ + chown_cmd = ["chown", "--recursive", f"{user_id}:{group_id}", str(path)] + subprocess.run(chown_cmd, stdout=subprocess.DEVNULL, check=True) + + +def _get_ip_from_hostname(hostname: str) -> str: + """ + Resolves a hostname to an IPv4 IP address. + + :param hostname: + :return: The resolved IP address. + """ + return socket.gethostbyname(hostname) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index ad577a50bf..c428e0b351 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -1,5 +1,6 @@ import enum import errno +import json import os import pathlib import re @@ -15,6 +16,9 @@ CLP_DEFAULT_CREDENTIALS_FILE_PATH, CLP_SHARED_CONFIG_FILENAME, CLPConfig, + CONTAINER_AWS_CONFIG_DIRECTORY, + CONTAINER_CLP_HOME, + CONTAINER_INPUT_LOGS_ROOT_DIR, DB_COMPONENT_NAME, QueryEngine, QUEUE_COMPONENT_NAME, @@ -42,12 +46,6 @@ EXTRACT_IR_CMD = "i" EXTRACT_JSON_CMD = "j" -# Paths -CONTAINER_AWS_CONFIG_DIRECTORY = pathlib.Path("/") / ".aws" -CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp" -CONTAINER_INPUT_LOGS_ROOT_DIR = pathlib.Path("/") / "mnt" / "logs" -CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH = pathlib.Path("etc") / "clp-config.yml" - DOCKER_MOUNT_TYPE_STRINGS = ["bind"] @@ -132,59 +130,58 @@ def generate_container_name(job_type: str) -> str: return f"clp-{job_type}-{str(uuid.uuid4())[-4:]}" -def check_dependencies(): +def is_docker_compose_running(project_name: str) -> bool: + """ + Checks if a Docker Compose project is running. + + :param project_name: + :return: True if at least one instance is running, else False. + :raises EnvironmentError: If Docker Compose is not installed or fails. + """ + cmd = ["docker", "compose", "ls", "--format", "json", "--filter", f"name={project_name}"] + try: + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + running_instances = json.loads(output) + return len(running_instances) >= 1 + except subprocess.CalledProcessError: + raise EnvironmentError("docker-compose is not installed or not functioning properly.") + + +def check_docker_dependencies(should_compose_run: bool, project_name: str): + """ + Checks if Docker and Docker Compose are installed, and whether Docker Compose is running or not. + + :param should_compose_run: + :param project_name: The Docker Compose project name to check. + :raises EnvironmentError: If any Docker dependency is not installed or Docker Compose state + does not match expectation. + """ try: subprocess.run( "command -v docker", shell=True, - stdout=subprocess.PIPE, + stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, check=True, ) except subprocess.CalledProcessError: raise EnvironmentError("docker is not installed or available on the path") - try: - subprocess.run( - ["docker", "ps"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True - ) - except subprocess.CalledProcessError: - raise EnvironmentError("docker cannot run without superuser privileges (sudo).") - - -def is_container_running(container_name): - # fmt: off - cmd = [ - "docker", "ps", - # Only return container IDs - "--quiet", - "--filter", f"name={container_name}" - ] - # fmt: on - proc = subprocess.run(cmd, stdout=subprocess.PIPE) - if proc.stdout.decode("utf-8"): - return True - - return False + is_running = is_docker_compose_running(project_name) + if should_compose_run and not is_running: + raise EnvironmentError("docker-compose is not running.") + if not should_compose_run and is_running: + raise EnvironmentError("docker-compose is already running.") -def is_container_exited(container_name): - # fmt: off - cmd = [ - "docker", "ps", - # Only return container IDs - "--quiet", - "--filter", f"name={container_name}", - "--filter", "status=exited" - ] - # fmt: on - proc = subprocess.run(cmd, stdout=subprocess.PIPE) - if proc.stdout.decode("utf-8"): - return True - - return False +def _validate_log_directory(logs_dir: pathlib.Path, component_name: str): + """ + Validate that a log directory path of a component is valid. -def validate_log_directory(logs_dir: pathlib.Path, component_name: str) -> None: + :param logs_dir: + :param component_name: + :raises ValueError: If the path is invalid or not a directory. + """ try: validate_path_could_be_dir(logs_dir) except ValueError as ex: @@ -309,6 +306,19 @@ def generate_container_config( return container_clp_config, docker_mounts +def generate_docker_compose_container_config(clp_config: CLPConfig) -> CLPConfig: + """ + Copies the given config and transforms mount paths and hosts for Docker Compose. + + :param clp_config: + :return: The container config and the mounts. + """ + container_clp_config = clp_config.model_copy(deep=True) + container_clp_config.transform_for_container() + + return container_clp_config + + def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig: worker_config = WorkerConfig() worker_config.package = clp_config.package.model_copy(deep=True) @@ -431,11 +441,6 @@ def load_config_file( validate_path_for_container_mount(clp_config.data_directory) validate_path_for_container_mount(clp_config.logs_directory) - # Make data and logs directories node-specific - hostname = socket.gethostname() - clp_config.data_directory /= hostname - clp_config.logs_directory /= hostname - return clp_config @@ -488,35 +493,40 @@ def validate_and_load_redis_credentials_file( clp_config.redis.load_credentials_from_file(clp_config.credentials_file_path) -def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path): +def validate_db_config( + clp_config: CLPConfig, base_config: pathlib.Path, data_dir: pathlib.Path, logs_dir: pathlib.Path +): + if not base_config.exists(): + raise ValueError( + f"{DB_COMPONENT_NAME} base configuration at {str(base_config)} is missing." + ) _validate_data_directory(data_dir, DB_COMPONENT_NAME) - validate_log_directory(logs_dir, DB_COMPONENT_NAME) + _validate_log_directory(logs_dir, DB_COMPONENT_NAME) validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port) def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path): - validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME) + _validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME) validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port) def validate_redis_config( - clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, base_config: pathlib.Path + clp_config: CLPConfig, base_config: pathlib.Path, data_dir: pathlib.Path, logs_dir: pathlib.Path ): - _validate_data_directory(data_dir, REDIS_COMPONENT_NAME) - validate_log_directory(logs_dir, REDIS_COMPONENT_NAME) - if not base_config.exists(): raise ValueError( f"{REDIS_COMPONENT_NAME} base configuration at {str(base_config)} is missing." ) + _validate_data_directory(data_dir, REDIS_COMPONENT_NAME) + _validate_log_directory(logs_dir, REDIS_COMPONENT_NAME) validate_port(f"{REDIS_COMPONENT_NAME}.port", clp_config.redis.host, clp_config.redis.port) def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_workers: int): - validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME) + _validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME) for i in range(0, num_workers): validate_port( @@ -527,10 +537,14 @@ def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_w def validate_results_cache_config( - clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path + clp_config: CLPConfig, base_config: pathlib.Path, data_dir: pathlib.Path, logs_dir: pathlib.Path ): + if not base_config.exists(): + raise ValueError( + f"{RESULTS_CACHE_COMPONENT_NAME} base configuration at {str(base_config)} is missing." + ) _validate_data_directory(data_dir, RESULTS_CACHE_COMPONENT_NAME) - validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME) + _validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME) validate_port( f"{RESULTS_CACHE_COMPONENT_NAME}.port", diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index b965805d11..5ccc351a61 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -9,13 +9,13 @@ from clp_py_utils.clp_config import ( CLP_DB_PASS_ENV_VAR_NAME, CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLP_DEFAULT_DATASET_NAME, StorageEngine, StorageType, ) from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLPConfig, DockerMount, dump_container_config, diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index ae9de02092..b8c1275c59 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -10,13 +10,13 @@ from clp_py_utils.clp_config import ( CLP_DB_PASS_ENV_VAR_NAME, CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLP_DEFAULT_DATASET_NAME, StorageEngine, ) from job_orchestration.scheduler.job_config import InputType from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CONTAINER_INPUT_LOGS_ROOT_DIR, dump_container_config, generate_container_config, diff --git a/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py b/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py index 08c0cebe16..7b895340e6 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py @@ -10,13 +10,13 @@ ARCHIVE_MANAGER_ACTION_NAME, CLP_DB_PASS_ENV_VAR_NAME, CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, StorageEngine, StorageType, ) from clp_py_utils.s3_utils import generate_container_auth_options from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, dump_container_config, generate_container_config, generate_container_name, diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index 7c4a1d5f62..d5c4c059dd 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -9,6 +9,7 @@ from clp_py_utils.clp_config import ( CLP_DB_PASS_ENV_VAR_NAME, CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLP_DEFAULT_DATASET_NAME, CLPConfig, StorageEngine, @@ -16,7 +17,6 @@ ) from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, DockerMount, DockerMountType, dump_container_config, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 79825dcac7..e0349e5f2c 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import Any, List, Optional -from clp_py_utils.clp_config import Database +from clp_py_utils.clp_config import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, Database from clp_py_utils.clp_metadata_db_utils import ( delete_archives_from_metadata_db, get_archives_table_name, @@ -15,7 +15,6 @@ from clp_py_utils.sql_adapter import SQL_Adapter from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLPConfig, get_clp_home, load_config_file, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index e56a6722b9..ac4df2aa18 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -10,6 +10,7 @@ import brotli import msgpack from clp_py_utils.clp_config import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLPConfig, COMPRESSION_JOBS_TABLE_NAME, ) @@ -29,7 +30,6 @@ ) from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CONTAINER_INPUT_LOGS_ROOT_DIR, get_clp_home, load_config_file, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/dataset_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/dataset_manager.py index 44ab9d2a10..4ff30cd9fd 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/dataset_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/dataset_manager.py @@ -6,7 +6,13 @@ from pathlib import Path from typing import Dict, List -from clp_py_utils.clp_config import ArchiveOutput, Database, S3Config, StorageType +from clp_py_utils.clp_config import ( + ArchiveOutput, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + Database, + S3Config, + StorageType, +) from clp_py_utils.clp_metadata_db_utils import ( delete_dataset_from_metadata_db, get_datasets_table_name, @@ -15,7 +21,6 @@ from clp_py_utils.sql_adapter import SQL_Adapter from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLPConfig, get_clp_home, load_config_file, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index 1418271c37..300609439e 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -12,6 +12,7 @@ from clp_py_utils.clp_config import ( CLP_DB_PASS_ENV_VAR_NAME, CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLPConfig, Database, ) @@ -25,7 +26,6 @@ ) from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, EXTRACT_FILE_CMD, EXTRACT_IR_CMD, EXTRACT_JSON_CMD, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index 4919836e4d..c5e33130e4 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -12,6 +12,7 @@ import psutil import pymongo from clp_py_utils.clp_config import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, Database, ResultsCache, ) @@ -20,7 +21,6 @@ from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, get_clp_home, load_config_file, ) diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index a69d97db6f..03aa3817b9 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -8,13 +8,13 @@ from clp_py_utils.clp_config import ( CLP_DB_PASS_ENV_VAR_NAME, CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLP_DEFAULT_DATASET_NAME, StorageEngine, StorageType, ) from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, dump_container_config, generate_container_config, generate_container_name, diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index f41ff8ab7b..a9ff5b596e 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -1,1130 +1,25 @@ import argparse -import json import logging -import multiprocessing -import os import pathlib -import shlex -import socket -import subprocess import sys -import time -import uuid -from typing import Any, Dict, List, Optional -from clp_py_utils.clp_config import ( - ALL_TARGET_NAME, - AwsAuthType, - CLPConfig, - COMPRESSION_JOBS_TABLE_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - CONTROLLER_TARGET_NAME, - DB_COMPONENT_NAME, - GARBAGE_COLLECTOR_COMPONENT_NAME, - get_components_for_target, - QUERY_JOBS_TABLE_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - QueryEngine, - QUEUE_COMPONENT_NAME, - REDIS_COMPONENT_NAME, - REDUCER_COMPONENT_NAME, - RESULTS_CACHE_COMPONENT_NAME, - StorageEngine, - StorageType, - WEBUI_COMPONENT_NAME, -) -from clp_py_utils.clp_metadata_db_utils import ( - get_archives_table_name, - get_datasets_table_name, - get_files_table_name, -) -from clp_py_utils.s3_utils import generate_container_auth_options -from job_orchestration.scheduler.constants import SchedulerType -from pydantic import BaseModel +from clp_py_utils.clp_config import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH +from clp_package_utils.controller import DockerComposeController, get_or_create_instance_id from clp_package_utils.general import ( - check_dependencies, - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - CLPDockerMounts, - CONTAINER_CLP_HOME, - DockerMount, - DockerMountType, - dump_shared_container_config, - generate_container_config, - get_celery_connection_env_vars_list, get_clp_home, - get_common_env_vars_list, - get_credential_env_vars_list, - is_container_exited, - is_container_running, - is_retention_period_configured, load_config_file, validate_and_load_db_credentials_file, validate_and_load_queue_credentials_file, validate_and_load_redis_credentials_file, - validate_db_config, - validate_log_directory, validate_logs_input_config, validate_output_storage_config, - validate_queue_config, - validate_redis_config, - validate_reducer_config, - validate_results_cache_config, validate_retention_config, - validate_webui_config, ) logger = logging.getLogger(__file__) -def container_exists(container_name): - if is_container_running(container_name): - logger.info(f"{container_name} already running.") - return True - elif is_container_exited(container_name): - logger.info(f"{container_name} exited but not removed.") - return True - return False - - -def append_docker_options( - cmd: List[str], - mounts: Optional[List[Optional[DockerMount]]] = None, - env_vars: Optional[List[str]] = None, -): - """ - Appends Docker mount and environment variable options to a command list. - - :param cmd: The command list to append options to. - :param mounts: Optional list of DockerMount objects to add as --mount options. - :param env_vars: Optional list of environment variables to add as -e options. - """ - if mounts: - for mount in mounts: - if mount: - cmd.append("--mount") - cmd.append(str(mount)) - - if env_vars: - for env_var in env_vars: - if "" != env_var: - cmd.append("-e") - cmd.append(env_var) - - -def append_docker_port_settings_for_host_ips( - hostname: str, host_port: int, container_port: int, cmd: List[str] -): - # Note: We use a set because gethostbyname_ex can return the same IP twice for one hostname - for ip in set(socket.gethostbyname_ex(hostname)[2]): - cmd.append("-p") - cmd.append(f"{ip}:{host_port}:{container_port}") - - -def chown_recursively( - path: pathlib.Path, - user_id: int, - group_id: int, -): - """ - Recursively changes the owner of the given path to the given user ID and group ID. - :param path: - :param user_id: - :param group_id: - """ - chown_cmd = ["chown", "--recursive", f"{user_id}:{group_id}", str(path)] - subprocess.run(chown_cmd, stdout=subprocess.DEVNULL, check=True) - - -def wait_for_container_cmd(container_name: str, cmd_to_run: List[str], timeout: int): - container_exec_cmd = ["docker", "exec", container_name] - cmd = container_exec_cmd + cmd_to_run - - begin_time = time.time() - - while True: - try: - subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) - return True - except subprocess.CalledProcessError: - if time.time() - begin_time > timeout: - break - time.sleep(1) - - cmd_str = shlex.join(cmd_to_run) - logger.error(f"Timeout while waiting for command {cmd_str} to run after {timeout} seconds") - return False - - -def start_db(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): - component_name = DB_COMPONENT_NAME - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - db_data_dir = clp_config.data_directory / component_name - db_logs_dir = clp_config.logs_directory / component_name - - validate_db_config(clp_config, db_data_dir, db_logs_dir) - - # Create directories - db_data_dir.mkdir(exist_ok=True, parents=True) - db_logs_dir.mkdir(exist_ok=True, parents=True) - - # Start container - mounts = [ - DockerMount( - DockerMountType.BIND, - conf_dir / "mysql" / "conf.d", - pathlib.Path("/") / "etc" / "mysql" / "conf.d", - True, - ), - DockerMount(DockerMountType.BIND, db_data_dir, pathlib.Path("/") / "var" / "lib" / "mysql"), - DockerMount(DockerMountType.BIND, db_logs_dir, pathlib.Path("/") / "var" / "log" / "mysql"), - ] - env_vars = [ - f"MYSQL_ROOT_PASSWORD={clp_config.database.password}", - f"MYSQL_USER={clp_config.database.username}", - f"MYSQL_PASSWORD={clp_config.database.password}", - f"MYSQL_DATABASE={clp_config.database.name}", - ] - # fmt: off - cmd = [ - "docker", "run", - "-d", - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - append_docker_options(cmd, mounts, env_vars) - append_docker_port_settings_for_host_ips( - clp_config.database.host, clp_config.database.port, 3306, cmd - ) - if "mysql" == clp_config.database.type: - cmd.append("mysql:8.0.23") - elif "mariadb" == clp_config.database.type: - cmd.append("mariadb:10-jammy") - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - # fmt: off - mysqladmin_cmd = [ - "mysqladmin", "ping", - "--silent", - "-h", "127.0.0.1", - "-u", str(clp_config.database.username), - f"--password={clp_config.database.password}", - ] - # fmt: on - - if not wait_for_container_cmd(container_name, mysqladmin_cmd, 180): - raise EnvironmentError(f"{component_name} did not initialize in time") - - logger.info(f"Started {component_name}.") - - -def create_db_tables( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - component_name = DB_COMPONENT_NAME - logger.info(f"Creating {component_name} tables...") - - container_name = f"clp-{component_name}-table-creator-{instance_id}" - - clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" - # fmt: off - container_start_cmd = [ - "docker", "run", - "-i", - "--network", "host", - "--rm", - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - env_vars = [ - *get_common_env_vars_list(), - *get_credential_env_vars_list(container_clp_config, include_db_credentials=True), - ] - necessary_mounts = [ - mounts.clp_home, - mounts.data_dir, - mounts.logs_dir, - mounts.generated_config_file, - ] - append_docker_options(container_start_cmd, necessary_mounts, env_vars) - container_start_cmd.append(clp_config.container_image_ref) - - clp_py_utils_dir = clp_site_packages_dir / "clp_py_utils" - # fmt: off - create_tables_cmd = [ - "python3", - str(clp_py_utils_dir / "create-db-tables.py"), - "--config", str(container_clp_config.get_shared_config_file_path()), - "--storage-engine", str(container_clp_config.package.storage_engine), - ] - # fmt: on - - cmd = container_start_cmd + create_tables_cmd - logger.debug(shlex.join(cmd)) - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Created {component_name} tables.") - - -def create_results_cache_indices( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - component_name = RESULTS_CACHE_COMPONENT_NAME - logger.info(f"Creating {component_name} indices...") - - container_name = f"clp-{component_name}-indices-creator-{instance_id}" - - clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" - # fmt: off - container_start_cmd = [ - "docker", "run", - "-i", - "--network", "host", - "--rm", - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - env_vars = [f"PYTHONPATH={clp_site_packages_dir}"] - necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir] - append_docker_options(container_start_cmd, necessary_mounts, env_vars) - container_start_cmd.append(clp_config.container_image_ref) - - clp_py_utils_dir = clp_site_packages_dir / "clp_py_utils" - # fmt: off - init_cmd = [ - "python3", - str(clp_py_utils_dir / "initialize-results-cache.py"), - "--uri", container_clp_config.results_cache.get_uri(), - "--stream-collection", container_clp_config.results_cache.stream_collection_name, - ] - # fmt: on - - cmd = container_start_cmd + init_cmd - logger.debug(shlex.join(cmd)) - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Created {component_name} indices.") - - -def start_queue(instance_id: str, clp_config: CLPConfig): - component_name = QUEUE_COMPONENT_NAME - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - queue_logs_dir = clp_config.logs_directory / component_name - validate_queue_config(clp_config, queue_logs_dir) - - log_filename = "rabbitmq.log" - - # Generate config file - config_filename = f"{container_name}.conf" - host_config_file_path = clp_config.logs_directory / config_filename - with open(host_config_file_path, "w") as f: - f.write(f"default_user = {clp_config.queue.username}\n") - f.write(f"default_pass = {clp_config.queue.password}\n") - f.write(f"log.file = {log_filename}\n") - - # Create directories - queue_logs_dir.mkdir(exist_ok=True, parents=True) - - # Start container - rabbitmq_logs_dir = pathlib.Path("/") / "var" / "log" / "rabbitmq" - mounts = [ - DockerMount( - DockerMountType.BIND, - host_config_file_path, - pathlib.Path("/") / "etc" / "rabbitmq" / "rabbitmq.conf", - True, - ), - DockerMount(DockerMountType.BIND, queue_logs_dir, rabbitmq_logs_dir), - ] - rabbitmq_pid_file_path = pathlib.Path("/") / "tmp" / "rabbitmq.pid" - - host_user_id = os.getuid() - if 0 != host_user_id: - container_user = f"{host_user_id}:{os.getgid()}" - else: - # The host user is `root` so use the container's default user and make this component's - # directories writable by that user. - # NOTE: This doesn't affect the host user's access to the directories since they're `root`. - container_user = "rabbitmq" - default_container_user_id = 999 - default_container_group_id = 999 - chown_recursively(queue_logs_dir, default_container_user_id, default_container_group_id) - - # fmt: off - cmd = [ - "docker", "run", - "-d", - "--name", container_name, - "--log-driver", "local", - # Override RABBITMQ_LOGS since the image sets it to *only* log to stdout - "-u", container_user - ] - env_vars = [ - f"RABBITMQ_LOGS={rabbitmq_logs_dir / log_filename}", - f"RABBITMQ_PID_FILE={rabbitmq_pid_file_path}", - ] - # fmt: on - append_docker_options(cmd, mounts, env_vars) - append_docker_port_settings_for_host_ips( - clp_config.queue.host, clp_config.queue.port, 5672, cmd - ) - cmd.append("rabbitmq:3.9.8") - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - # Wait for queue to start up - rabbitmq_cmd = ["rabbitmq-diagnostics", "check_running"] - if not wait_for_container_cmd(container_name, rabbitmq_cmd, 60): - raise EnvironmentError(f"{component_name} did not initialize in time") - - logger.info(f"Started {component_name}.") - - -def start_redis(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): - component_name = REDIS_COMPONENT_NAME - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - redis_logs_dir = clp_config.logs_directory / component_name - redis_data_dir = clp_config.data_directory / component_name - - base_config_file_path = conf_dir / "redis" / "redis.conf" - validate_redis_config(clp_config, redis_data_dir, redis_logs_dir, base_config_file_path) - - config_filename = f"{container_name}.conf" - host_config_file_path = clp_config.logs_directory / config_filename - with open(base_config_file_path, "r") as base, open(host_config_file_path, "w") as full: - for line in base.readlines(): - full.write(line) - full.write(f"requirepass {clp_config.redis.password}\n") - - redis_data_dir.mkdir(exist_ok=True, parents=True) - redis_logs_dir.mkdir(exist_ok=True, parents=True) - - # Start container - config_file_path = pathlib.Path("/") / "usr" / "local" / "etc" / "redis" / "redis.conf" - mounts = [ - DockerMount(DockerMountType.BIND, host_config_file_path, config_file_path, True), - DockerMount( - DockerMountType.BIND, redis_logs_dir, pathlib.Path("/") / "var" / "log" / "redis" - ), - DockerMount(DockerMountType.BIND, redis_data_dir, pathlib.Path("/") / "data"), - ] - - host_user_id = os.getuid() - if 0 != host_user_id: - container_user = f"{host_user_id}:{os.getgid()}" - else: - # The host user is `root` so use the container's default user and make this component's - # directories writable by that user. - # NOTE: This doesn't affect the host user's access to the directories since they're `root`. - container_user = "redis" - default_container_user_id = 999 - default_container_group_id = 999 - chown_recursively(redis_data_dir, default_container_user_id, default_container_group_id) - chown_recursively(redis_logs_dir, default_container_user_id, default_container_group_id) - - # fmt: off - cmd = [ - "docker", "run", - "-d", - "--name", container_name, - "--log-driver", "local", - "-u", container_user, - ] - # fmt: on - append_docker_options(cmd, mounts) - append_docker_port_settings_for_host_ips( - clp_config.redis.host, clp_config.redis.port, 6379, cmd - ) - cmd.append("redis:7.2.4") - cmd.append("redis-server") - cmd.append(str(config_file_path)) - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - # fmt: off - redis_ping_cmd = [ - "redis-cli", - "-h", "127.0.0.1", - "-p", "6379", - "-a", clp_config.redis.password, - "PING" - ] - # fmt: on - - if not wait_for_container_cmd(container_name, redis_ping_cmd, 30): - raise EnvironmentError(f"{component_name} did not initialize in time") - - logger.info(f"Started {component_name}.") - - -def start_results_cache(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): - component_name = RESULTS_CACHE_COMPONENT_NAME - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - data_dir = clp_config.data_directory / component_name - logs_dir = clp_config.logs_directory / component_name - - validate_results_cache_config(clp_config, data_dir, logs_dir) - - data_dir.mkdir(exist_ok=True, parents=True) - logs_dir.mkdir(exist_ok=True, parents=True) - - mounts = [ - DockerMount( - DockerMountType.BIND, conf_dir / "mongo", pathlib.Path("/") / "etc" / "mongo", True - ), - DockerMount(DockerMountType.BIND, data_dir, pathlib.Path("/") / "data" / "db"), - DockerMount(DockerMountType.BIND, logs_dir, pathlib.Path("/") / "var" / "log" / "mongodb"), - ] - - host_user_id = os.getuid() - if 0 != host_user_id: - container_user = f"{host_user_id}:{os.getgid()}" - else: - # The host user is `root` so use the container's default user and make this component's - # directories writable by that user. - # NOTE: This doesn't affect the host user's access to the directories since they're `root`. - container_user = "mongodb" - default_container_user_id = 999 - default_container_group_id = 999 - chown_recursively(data_dir, default_container_user_id, default_container_group_id) - chown_recursively(logs_dir, default_container_user_id, default_container_group_id) - - # fmt: off - cmd = [ - "docker", "run", - "-d", - "--network", "host", - "--name", container_name, - "--log-driver", "local", - "-u", container_user, - ] - # fmt: on - append_docker_options(cmd, mounts) - cmd.append("mongo:7.0.1") - cmd.append("--config") - cmd.append(str(pathlib.Path("/") / "etc" / "mongo" / "mongod.conf")) - cmd.append("--bind_ip") - cmd.append(clp_config.results_cache.host) - cmd.append("--port") - cmd.append(str(clp_config.results_cache.port)) - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Started {component_name}.") - - -def start_compression_scheduler( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - module_name = "job_orchestration.scheduler.compress.compression_scheduler" - generic_start_scheduler( - COMPRESSION_SCHEDULER_COMPONENT_NAME, - module_name, - instance_id, - clp_config, - container_clp_config, - mounts, - ) - - -def start_query_scheduler( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - module_name = "job_orchestration.scheduler.query.query_scheduler" - generic_start_scheduler( - QUERY_SCHEDULER_COMPONENT_NAME, - module_name, - instance_id, - clp_config, - container_clp_config, - mounts, - ) - - -def generic_start_scheduler( - component_name: str, - module_name: str, - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - logs_dir = clp_config.logs_directory / component_name - logs_dir.mkdir(parents=True, exist_ok=True) - container_logs_dir = container_clp_config.logs_directory / component_name - - # fmt: off - container_start_cmd = [ - "docker", "run", - "-di", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - - env_vars = [ - *get_common_env_vars_list(), - *get_credential_env_vars_list(container_clp_config, include_db_credentials=True), - *get_celery_connection_env_vars_list(container_clp_config), - f"CLP_LOGS_DIR={container_logs_dir}", - f"CLP_LOGGING_LEVEL={clp_config.query_scheduler.logging_level}", - ] - necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.generated_config_file] - aws_mount, aws_env_vars = generate_container_auth_options(clp_config, component_name) - if aws_mount: - necessary_mounts.append(mounts.aws_config_dir) - if aws_env_vars: - env_vars.extend(aws_env_vars) - if ( - COMPRESSION_SCHEDULER_COMPONENT_NAME == component_name - and StorageType.FS == clp_config.logs_input.type - ): - necessary_mounts.append(mounts.input_logs_dir) - append_docker_options(container_start_cmd, necessary_mounts, env_vars) - container_start_cmd.append(clp_config.container_image_ref) - - # fmt: off - scheduler_cmd = [ - "python3", "-u", - "-m", module_name, - "--config", str(container_clp_config.get_shared_config_file_path()), - ] - # fmt: on - cmd = container_start_cmd + scheduler_cmd - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Started {component_name}.") - - -def start_compression_worker( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - num_cpus: int, - mounts: CLPDockerMounts, -): - celery_method = "job_orchestration.executor.compress" - celery_route = SchedulerType.COMPRESSION - compression_worker_mounts = [mounts.archives_output_dir] - generic_start_worker( - COMPRESSION_WORKER_COMPONENT_NAME, - instance_id, - clp_config, - clp_config.compression_worker, - container_clp_config, - celery_method, - celery_route, - clp_config.redis.compression_backend_database, - num_cpus, - mounts, - compression_worker_mounts, - ) - - -def start_query_worker( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - num_cpus: int, - mounts: CLPDockerMounts, -): - celery_method = "job_orchestration.executor.query" - celery_route = SchedulerType.QUERY - - query_worker_mounts = [mounts.stream_output_dir] - if StorageType.FS == clp_config.archive_output.storage.type: - query_worker_mounts.append(mounts.archives_output_dir) - - generic_start_worker( - QUERY_WORKER_COMPONENT_NAME, - instance_id, - clp_config, - clp_config.query_worker, - container_clp_config, - celery_method, - celery_route, - clp_config.redis.query_backend_database, - num_cpus, - mounts, - query_worker_mounts, - ) - - -def generic_start_worker( - component_name: str, - instance_id: str, - clp_config: CLPConfig, - worker_config: BaseModel, - container_clp_config: CLPConfig, - celery_method: str, - celery_route: str, - redis_database: int, - num_cpus: int, - mounts: CLPDockerMounts, - worker_specific_mounts: Optional[List[Optional[DockerMount]]], -): - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - logs_dir = clp_config.logs_directory / component_name - logs_dir.mkdir(parents=True, exist_ok=True) - container_logs_dir = container_clp_config.logs_directory / component_name - - # Create necessary directories - clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True) - clp_config.stream_output.get_directory().mkdir(parents=True, exist_ok=True) - - clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" - container_worker_log_path = container_logs_dir / "worker.log" - # fmt: off - container_start_cmd = [ - "docker", "run", - "-di", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - - env_vars = [ - *get_common_env_vars_list(include_clp_home_env_var=True), - *get_celery_connection_env_vars_list(container_clp_config), - f"CLP_CONFIG_PATH={container_clp_config.get_shared_config_file_path()}", - f"CLP_LOGS_DIR={container_logs_dir}", - f"CLP_LOGGING_LEVEL={worker_config.logging_level}", - f"CLP_WORKER_LOG_PATH={container_worker_log_path}", - ] - necessary_mounts = [ - mounts.clp_home, - mounts.data_dir, - mounts.logs_dir, - ] - if StorageType.FS == clp_config.logs_input.type: - necessary_mounts.append(mounts.input_logs_dir) - if worker_specific_mounts: - necessary_mounts.extend(worker_specific_mounts) - - aws_mount, aws_env_vars = generate_container_auth_options(clp_config, component_name) - if aws_mount: - necessary_mounts.append(mounts.aws_config_dir) - if aws_env_vars: - env_vars.extend(aws_env_vars) - - append_docker_options(container_start_cmd, necessary_mounts, env_vars) - container_start_cmd.append(clp_config.container_image_ref) - - worker_cmd = [ - "python3", - str(clp_site_packages_dir / "bin" / "celery"), - "-A", - celery_method, - "worker", - "--concurrency", - str(num_cpus), - "--loglevel", - "WARNING", - "-f", - str(container_worker_log_path), - "-Q", - celery_route, - "-n", - component_name, - ] - cmd = container_start_cmd + worker_cmd - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Started {component_name}.") - - -def update_settings_object( - parent_key_prefix: str, - settings: Dict[str, Any], - updates: Dict[str, Any], -): - """ - Recursively updates the given settings object with the values from `updates`. - - :param parent_key_prefix: The prefix for keys at this level in the settings dictionary. - :param settings: The settings to update. - :param updates: The updates. - :raises ValueError: If a key in `updates` doesn't exist in `settings`. - """ - for key, value in updates.items(): - if key not in settings: - error_msg = f"{parent_key_prefix}{key} is not a valid configuration key for the webui." - raise ValueError(error_msg) - if isinstance(value, dict): - update_settings_object(f"{parent_key_prefix}{key}.", settings[key], value) - else: - settings[key] = updates[key] - - -def read_and_update_settings_json(settings_file_path: pathlib.Path, updates: Dict[str, Any]): - """ - Reads and updates a settings JSON file. - - :param settings_file_path: - :param updates: - """ - with open(settings_file_path, "r") as settings_json_file: - settings_object = json.loads(settings_json_file.read()) - update_settings_object("", settings_object, updates) - - return settings_object - - -def start_webui( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - component_name = WEBUI_COMPONENT_NAME - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - container_webui_dir = CONTAINER_CLP_HOME / "var" / "www" / "webui" - node_path = str(container_webui_dir / "server" / "node_modules") - client_settings_json_path = ( - get_clp_home() / "var" / "www" / "webui" / "client" / "settings.json" - ) - server_settings_json_path = ( - get_clp_home() / "var" / "www" / "webui" / "server" / "dist" / "settings.json" - ) - - validate_webui_config(clp_config, client_settings_json_path, server_settings_json_path) - - # Read, update, and write back client's and server's settings.json - clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True) - table_prefix = clp_db_connection_params["table_prefix"] - if StorageEngine.CLP_S == clp_config.package.storage_engine: - archives_table_name = "" - files_table_name = "" - else: - archives_table_name = get_archives_table_name(table_prefix, None) - files_table_name = get_files_table_name(table_prefix, None) - - client_settings_json_updates = { - "ClpStorageEngine": clp_config.package.storage_engine, - "ClpQueryEngine": clp_config.package.query_engine, - "MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name, - "SqlDbClpArchivesTableName": archives_table_name, - "SqlDbClpDatasetsTableName": get_datasets_table_name(table_prefix), - "SqlDbClpFilesTableName": files_table_name, - "SqlDbClpTablePrefix": table_prefix, - "SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME, - } - client_settings_json = read_and_update_settings_json( - client_settings_json_path, client_settings_json_updates - ) - with open(client_settings_json_path, "w") as client_settings_json_file: - client_settings_json_file.write(json.dumps(client_settings_json)) - - server_settings_json_updates = { - "SqlDbHost": clp_config.database.host, - "SqlDbPort": clp_config.database.port, - "SqlDbName": clp_config.database.name, - "SqlDbQueryJobsTableName": QUERY_JOBS_TABLE_NAME, - "MongoDbHost": clp_config.results_cache.host, - "MongoDbPort": clp_config.results_cache.port, - "MongoDbName": clp_config.results_cache.db_name, - "MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name, - "MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name, - "ClientDir": str(container_webui_dir / "client"), - "LogViewerDir": str(container_webui_dir / "yscope-log-viewer"), - "StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size, - "ClpQueryEngine": clp_config.package.query_engine, - } - - container_cmd_extra_opts = [] - - stream_storage = clp_config.stream_output.storage - if StorageType.S3 == stream_storage.type: - s3_config = stream_storage.s3_config - server_settings_json_updates["StreamFilesDir"] = None - server_settings_json_updates["StreamFilesS3Region"] = s3_config.region_code - server_settings_json_updates["StreamFilesS3PathPrefix"] = ( - f"{s3_config.bucket}/{s3_config.key_prefix}" - ) - auth = s3_config.aws_authentication - if AwsAuthType.profile == auth.type: - server_settings_json_updates["StreamFilesS3Profile"] = auth.profile - else: - server_settings_json_updates["StreamFilesS3Profile"] = None - elif StorageType.FS == stream_storage.type: - server_settings_json_updates["StreamFilesDir"] = str( - container_clp_config.stream_output.get_directory() - ) - server_settings_json_updates["StreamFilesS3Region"] = None - server_settings_json_updates["StreamFilesS3PathPrefix"] = None - server_settings_json_updates["StreamFilesS3Profile"] = None - - query_engine = clp_config.package.query_engine - if QueryEngine.PRESTO == query_engine: - server_settings_json_updates["PrestoHost"] = clp_config.presto.host - server_settings_json_updates["PrestoPort"] = clp_config.presto.port - else: - server_settings_json_updates["PrestoHost"] = None - server_settings_json_updates["PrestoPort"] = None - - server_settings_json = read_and_update_settings_json( - server_settings_json_path, server_settings_json_updates - ) - with open(server_settings_json_path, "w") as settings_json_file: - settings_json_file.write(json.dumps(server_settings_json)) - - # fmt: off - container_cmd = [ - "docker", "run", - "-d", - "--network", "host", - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - container_cmd.extend(container_cmd_extra_opts) - - env_vars = [ - *get_common_env_vars_list(), - *get_credential_env_vars_list(container_clp_config, include_db_credentials=True), - f"NODE_PATH={node_path}", - f"HOST={clp_config.webui.host}", - f"PORT={clp_config.webui.port}", - f"NODE_ENV=production", - f"RATE_LIMIT={clp_config.webui.rate_limit}", - ] - necessary_mounts = [ - mounts.clp_home, - ] - if StorageType.S3 == stream_storage.type: - auth = stream_storage.s3_config.aws_authentication - if AwsAuthType.credentials == auth.type: - credentials = auth.credentials - env_vars.append(f"AWS_ACCESS_KEY_ID={credentials.access_key_id}") - env_vars.append(f"AWS_SECRET_ACCESS_KEY={credentials.secret_access_key}") - else: - aws_mount, aws_env_vars = generate_container_auth_options( - clp_config, WEBUI_COMPONENT_NAME - ) - if aws_mount: - necessary_mounts.append(mounts.aws_config_dir) - if aws_env_vars: - env_vars.extend(aws_env_vars) - elif StorageType.FS == stream_storage.type: - necessary_mounts.append(mounts.stream_output_dir) - - append_docker_options(container_cmd, necessary_mounts, env_vars) - container_cmd.append(clp_config.container_image_ref) - - node_cmd = [ - str(CONTAINER_CLP_HOME / "bin" / "node-22"), - str(container_webui_dir / "server" / "dist" / "src" / "main.js"), - ] - cmd = container_cmd + node_cmd - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Started {component_name}.") - - -def start_reducer( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - num_workers: int, - mounts: CLPDockerMounts, -): - component_name = REDUCER_COMPONENT_NAME - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - logs_dir = clp_config.logs_directory / component_name - validate_reducer_config(clp_config, logs_dir, num_workers) - - logs_dir.mkdir(parents=True, exist_ok=True) - container_logs_dir = container_clp_config.logs_directory / component_name - - # fmt: off - container_start_cmd = [ - "docker", "run", - "-di", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - env_vars = [ - *get_common_env_vars_list(include_clp_home_env_var=True), - f"CLP_LOGS_DIR={container_logs_dir}", - f"CLP_LOGGING_LEVEL={clp_config.reducer.logging_level}", - ] - necessary_mounts = [ - mounts.clp_home, - mounts.logs_dir, - mounts.generated_config_file, - ] - append_docker_options(container_start_cmd, necessary_mounts, env_vars) - container_start_cmd.append(clp_config.container_image_ref) - - # fmt: off - reducer_cmd = [ - "python3", "-u", - "-m", "job_orchestration.reducer.reducer", - "--config", str(container_clp_config.get_shared_config_file_path()), - "--concurrency", str(num_workers), - "--upsert-interval", str(clp_config.reducer.upsert_interval), - ] - # fmt: on - cmd = container_start_cmd + reducer_cmd - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Started {component_name}.") - - -def start_garbage_collector( - instance_id: str, - clp_config: CLPConfig, - container_clp_config: CLPConfig, - mounts: CLPDockerMounts, -): - component_name = GARBAGE_COLLECTOR_COMPONENT_NAME - - if not is_retention_period_configured(clp_config): - logger.info(f"Retention period is not configured, skipping {component_name} creation...") - return - - logger.info(f"Starting {component_name}...") - - container_name = f"clp-{component_name}-{instance_id}" - if container_exists(container_name): - return - - logs_dir = clp_config.logs_directory / component_name - validate_log_directory(logs_dir, component_name) - # Create logs directory if necessary - logs_dir.mkdir(parents=True, exist_ok=True) - container_logs_dir = container_clp_config.logs_directory / component_name - - # fmt: off - container_start_cmd = [ - "docker", "run", - "-di", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "--name", container_name, - "--log-driver", "local", - "-u", f"{os.getuid()}:{os.getgid()}", - ] - # fmt: on - - necessary_mounts = [ - mounts.clp_home, - mounts.logs_dir, - mounts.generated_config_file, - ] - env_vars = [ - *get_common_env_vars_list(include_clp_home_env_var=True), - *get_credential_env_vars_list(container_clp_config, include_db_credentials=True), - f"CLP_LOGS_DIR={container_logs_dir}", - f"CLP_LOGGING_LEVEL={clp_config.garbage_collector.logging_level}", - ] - - # Add necessary mounts for archives and streams. - if StorageType.FS == clp_config.archive_output.storage.type: - necessary_mounts.append(mounts.archives_output_dir) - if StorageType.FS == clp_config.stream_output.storage.type: - necessary_mounts.append(mounts.stream_output_dir) - - aws_mount, aws_env_vars = generate_container_auth_options(clp_config, component_name) - if aws_mount: - necessary_mounts.append(mounts.aws_config_dir) - if aws_env_vars: - env_vars.extend(aws_env_vars) - - append_docker_options(container_start_cmd, necessary_mounts, env_vars) - container_start_cmd.append(clp_config.container_image_ref) - - # fmt: off - garbage_collector_cmd = [ - "python3", "-u", - "-m", "job_orchestration.garbage_collector.garbage_collector", - "--config", str(container_clp_config.get_shared_config_file_path()), - ] - # fmt: on - cmd = container_start_cmd + garbage_collector_cmd - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Started {component_name}.") - - -def add_num_workers_argument(parser): - parser.add_argument( - "--num-workers", - type=int, - default=multiprocessing.cpu_count(), - help="Number of workers to start", - ) - - def main(argv): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -1136,110 +31,20 @@ def main(argv): default=str(default_config_file_path), help="CLP package configuration file.", ) - args_parser.add_argument( - "--verbose", - "-v", - action="store_true", - help="Enable debug logging.", - ) - - component_args_parser = args_parser.add_subparsers(dest="target") - component_args_parser.add_parser(CONTROLLER_TARGET_NAME) - component_args_parser.add_parser(DB_COMPONENT_NAME) - component_args_parser.add_parser(QUEUE_COMPONENT_NAME) - component_args_parser.add_parser(REDIS_COMPONENT_NAME) - component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) - component_args_parser.add_parser(COMPRESSION_SCHEDULER_COMPONENT_NAME) - component_args_parser.add_parser(QUERY_SCHEDULER_COMPONENT_NAME) - compression_worker_parser = component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) - add_num_workers_argument(compression_worker_parser) - query_worker_parser = component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME) - add_num_workers_argument(query_worker_parser) - reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME) - add_num_workers_argument(reducer_server_parser) - component_args_parser.add_parser(WEBUI_COMPONENT_NAME) - component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME) parsed_args = args_parser.parse_args(argv[1:]) - if parsed_args.verbose: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - - if parsed_args.target: - target = parsed_args.target - else: - target = ALL_TARGET_NAME - - try: - check_dependencies() - except: - logger.exception("Dependency checking failed.") - return -1 - # Validate and load config file try: + # Validate and load config file. config_file_path = pathlib.Path(parsed_args.config) clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - runnable_components = clp_config.get_runnable_components() - components_to_start = get_components_for_target(target) - components_to_start = components_to_start.intersection(runnable_components) - - # Exit early if no components to start - if len(components_to_start) == 0: - logger.error(f"{target} not available with current configuration") - return -1 - - # Validate and load necessary credentials - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - DB_COMPONENT_NAME, - GARBAGE_COLLECTOR_COMPONENT_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - WEBUI_COMPONENT_NAME, - ): - validate_and_load_db_credentials_file(clp_config, clp_home, True) - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - QUEUE_COMPONENT_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - ): - validate_and_load_queue_credentials_file(clp_config, clp_home, True) - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - REDIS_COMPONENT_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - ): - validate_and_load_redis_credentials_file(clp_config, clp_home, True) - if target in ( - ALL_TARGET_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - ): - validate_logs_input_config(clp_config) - if target in ( - ALL_TARGET_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - GARBAGE_COLLECTOR_COMPONENT_NAME, - ): - validate_output_storage_config(clp_config) - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - GARBAGE_COLLECTOR_COMPONENT_NAME, - ): - validate_retention_config(clp_config) + validate_and_load_db_credentials_file(clp_config, clp_home, True) + validate_and_load_queue_credentials_file(clp_config, clp_home, True) + validate_and_load_redis_credentials_file(clp_config, clp_home, True) + validate_logs_input_config(clp_config) + validate_output_storage_config(clp_config) + validate_retention_config(clp_config) clp_config.validate_data_dir() clp_config.validate_logs_dir() @@ -1248,83 +53,20 @@ def main(argv): logger.exception("Failed to load config.") return -1 - if target in ( - COMPRESSION_WORKER_COMPONENT_NAME, - REDUCER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - ): - num_workers = parsed_args.num_workers - else: - num_workers = multiprocessing.cpu_count() // 2 - - container_clp_config, mounts = generate_container_config(clp_config, clp_home) - - # Create necessary directories - clp_config.data_directory.mkdir(parents=True, exist_ok=True) - clp_config.logs_directory.mkdir(parents=True, exist_ok=True) - - dump_shared_container_config(container_clp_config, clp_config) - try: - # Create instance-id file - instance_id_file_path = clp_config.logs_directory / "instance-id" - if instance_id_file_path.exists(): - with open(instance_id_file_path, "r") as f: - instance_id = f.readline() - else: - instance_id = str(uuid.uuid4())[-4:] - with open(instance_id_file_path, "w") as f: - f.write(instance_id) - f.flush() - - conf_dir = clp_home / "etc" - - # Start components - if DB_COMPONENT_NAME in components_to_start: - start_db(instance_id, clp_config, conf_dir) - - if ( - target == CONTROLLER_TARGET_NAME and DB_COMPONENT_NAME in runnable_components - ) or DB_COMPONENT_NAME in components_to_start: - create_db_tables(instance_id, clp_config, container_clp_config, mounts) - - if QUEUE_COMPONENT_NAME in components_to_start: - start_queue(instance_id, clp_config) - - if REDIS_COMPONENT_NAME in components_to_start: - start_redis(instance_id, clp_config, conf_dir) - - if RESULTS_CACHE_COMPONENT_NAME in components_to_start: - start_results_cache(instance_id, clp_config, conf_dir) - - if ( - target == CONTROLLER_TARGET_NAME and RESULTS_CACHE_COMPONENT_NAME in runnable_components - ) or RESULTS_CACHE_COMPONENT_NAME in components_to_start: - create_results_cache_indices(instance_id, clp_config, container_clp_config, mounts) - - if COMPRESSION_SCHEDULER_COMPONENT_NAME in components_to_start: - start_compression_scheduler(instance_id, clp_config, container_clp_config, mounts) - - if QUERY_SCHEDULER_COMPONENT_NAME in components_to_start: - start_query_scheduler(instance_id, clp_config, container_clp_config, mounts) - - if COMPRESSION_WORKER_COMPONENT_NAME in components_to_start: - start_compression_worker( - instance_id, clp_config, container_clp_config, num_workers, mounts - ) - - if QUERY_WORKER_COMPONENT_NAME in components_to_start: - start_query_worker(instance_id, clp_config, container_clp_config, num_workers, mounts) - - if REDUCER_COMPONENT_NAME in components_to_start: - start_reducer(instance_id, clp_config, container_clp_config, num_workers, mounts) - - if WEBUI_COMPONENT_NAME in components_to_start: - start_webui(instance_id, clp_config, container_clp_config, mounts) - - if GARBAGE_COLLECTOR_COMPONENT_NAME in components_to_start: - start_garbage_collector(instance_id, clp_config, container_clp_config, mounts) + # Create necessary directories. + clp_config.data_directory.mkdir(parents=True, exist_ok=True) + clp_config.logs_directory.mkdir(parents=True, exist_ok=True) + clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True) + clp_config.stream_output.get_directory().mkdir(parents=True, exist_ok=True) + except: + logger.exception("Failed to create necessary directories.") + return -1 + instance_id = get_or_create_instance_id(clp_config) + try: + controller = DockerComposeController(clp_config, instance_id) + controller.start() except Exception as ex: if type(ex) == ValueError: logger.error(f"Failed to start CLP: {ex}") diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index ebcded6e2b..5506e81f08 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -1,211 +1,31 @@ -import argparse import logging -import pathlib -import subprocess import sys -from typing import List -from clp_py_utils.clp_config import ( - ALL_TARGET_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - CONTROLLER_TARGET_NAME, - DB_COMPONENT_NAME, - GARBAGE_COLLECTOR_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - QUEUE_COMPONENT_NAME, - REDIS_COMPONENT_NAME, - REDUCER_COMPONENT_NAME, - RESULTS_CACHE_COMPONENT_NAME, - WEBUI_COMPONENT_NAME, -) +from clp_py_utils.clp_config import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH +from clp_package_utils.controller import DockerComposeController, get_or_create_instance_id from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, get_clp_home, - is_container_exited, - is_container_running, load_config_file, - validate_and_load_db_credentials_file, - validate_and_load_queue_credentials_file, ) logger = logging.getLogger(__file__) -def stop_running_container( - container_name: str, already_exited_containers: List[str], force: bool, timeout: int = 10 -): - if is_container_running(container_name): - logger.info(f"Stopping {container_name}...") - cmd = ["docker", "stop", "-t", str(timeout), container_name] - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Removing {container_name}...") - cmd = ["docker", "rm", container_name] - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - - logger.info(f"Stopped and removed {container_name}.") - elif is_container_exited(container_name): - if force: - logger.info(f"Forcibly removing exited {container_name}...") - cmd = ["docker", "rm", container_name] - subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) - logger.info(f"Removed {container_name}...") - else: - already_exited_containers.append(container_name) - - -def main(argv): +def main(): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH - args_parser = argparse.ArgumentParser(description="Stops CLP") - args_parser.add_argument( - "--config", - "-c", - default=str(default_config_file_path), - help="CLP package configuration file.", - ) - args_parser.add_argument( - "--force", - "-f", - action="store_true", - help="Forcibly remove exited containers", - ) - - component_args_parser = args_parser.add_subparsers(dest="target") - component_args_parser.add_parser(CONTROLLER_TARGET_NAME) - component_args_parser.add_parser(DB_COMPONENT_NAME) - component_args_parser.add_parser(QUEUE_COMPONENT_NAME) - component_args_parser.add_parser(REDIS_COMPONENT_NAME) - component_args_parser.add_parser(REDUCER_COMPONENT_NAME) - component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) - component_args_parser.add_parser(COMPRESSION_SCHEDULER_COMPONENT_NAME) - component_args_parser.add_parser(QUERY_SCHEDULER_COMPONENT_NAME) - component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) - component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME) - component_args_parser.add_parser(WEBUI_COMPONENT_NAME) - component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME) - - parsed_args = args_parser.parse_args(argv[1:]) - - if parsed_args.target: - target = parsed_args.target - else: - target = ALL_TARGET_NAME - - # Validate and load config file try: - config_file_path = pathlib.Path(parsed_args.config) - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - - # Validate and load necessary credentials - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - DB_COMPONENT_NAME, - ): - validate_and_load_db_credentials_file(clp_config, clp_home, False) - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, - QUEUE_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - ): - validate_and_load_queue_credentials_file(clp_config, clp_home, False) + clp_config = load_config_file(default_config_file_path, default_config_file_path, clp_home) except: logger.exception("Failed to load config.") return -1 + instance_id = get_or_create_instance_id(clp_config) try: - # Read instance ID from file - logs_dir = clp_config.logs_directory - instance_id_file_path = logs_dir / "instance-id" - if not (logs_dir.exists() and logs_dir.is_dir() and instance_id_file_path.exists()): - # No instance ID file, so nothing to do - return 0 - with open(instance_id_file_path, "r") as f: - instance_id = f.readline() - - already_exited_containers = [] - force = parsed_args.force - if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME): - container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME): - container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - if target in (ALL_TARGET_NAME, REDUCER_COMPONENT_NAME): - container_name = f"clp-{REDUCER_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - - container_config_file_path = logs_dir / f"{container_name}.yml" - if container_config_file_path.exists(): - container_config_file_path.unlink() - if target in (ALL_TARGET_NAME, QUERY_WORKER_COMPONENT_NAME): - container_name = f"clp-{QUERY_WORKER_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME): - container_name = f"clp-{QUERY_SCHEDULER_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - - container_config_file_path = logs_dir / f"{container_name}.yml" - if container_config_file_path.exists(): - container_config_file_path.unlink() - if target in ( - ALL_TARGET_NAME, - CONTROLLER_TARGET_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - ): - container_name = f"clp-{COMPRESSION_SCHEDULER_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force, timeout=300) - - container_config_file_path = logs_dir / f"{container_name}.yml" - if container_config_file_path.exists(): - container_config_file_path.unlink() - if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME): - container_name = f"clp-{COMPRESSION_WORKER_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force, timeout=60) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, REDIS_COMPONENT_NAME): - container_name = f"clp-{REDIS_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - - redis_config_file_path = logs_dir / f"{container_name}.conf" - if redis_config_file_path.exists(): - redis_config_file_path.unlink() - if target in (ALL_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME): - container_name = f"clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUEUE_COMPONENT_NAME): - container_name = f"clp-{QUEUE_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - - queue_config_file_path = logs_dir / f"{container_name}.conf" - if queue_config_file_path.exists(): - queue_config_file_path.unlink() - if target in (ALL_TARGET_NAME, DB_COMPONENT_NAME): - container_name = f"clp-{DB_COMPONENT_NAME}-{instance_id}" - stop_running_container(container_name, already_exited_containers, force) - - if already_exited_containers: - container_list = " ".join(already_exited_containers) - logger.warning( - f"The following containers have already exited and were not removed:" - f" {container_list}" - ) - logger.warning(f"Run with --force to remove them") - elif target in ALL_TARGET_NAME: - # NOTE: We can only remove the instance ID file if all containers have been stopped. - # Currently, we only remove the instance file when all containers are stopped at once. - # If a single container is stopped, it's expensive to check if the others are running, - # so instead we don't remove the instance file. In the worst case, a user will have to - # remove it manually. - instance_id_file_path.unlink() + controller = DockerComposeController(clp_config, instance_id) + controller.stop() except: logger.exception("Failed to stop CLP.") return -1 @@ -214,4 +34,4 @@ def main(argv): if "__main__" == __name__: - sys.exit(main(sys.argv)) + sys.exit(main()) diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index ba8a8728cf..43d1695e60 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -1,9 +1,8 @@ import os import pathlib from enum import auto -from typing import Any, Literal, Optional, Set, Union +from typing import Any, ClassVar, Literal, Optional, Union -from dotenv import dotenv_values from pydantic import ( BaseModel, ConfigDict, @@ -36,45 +35,6 @@ WEBUI_COMPONENT_NAME = "webui" GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector" -# Component groups -GENERAL_SCHEDULING_COMPONENTS = { - QUEUE_COMPONENT_NAME, - REDIS_COMPONENT_NAME, -} -COMPRESSION_COMPONENTS = GENERAL_SCHEDULING_COMPONENTS | { - DB_COMPONENT_NAME, - COMPRESSION_SCHEDULER_COMPONENT_NAME, - COMPRESSION_WORKER_COMPONENT_NAME, -} -QUERY_COMPONENTS = GENERAL_SCHEDULING_COMPONENTS | { - DB_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - QUERY_WORKER_COMPONENT_NAME, - REDUCER_COMPONENT_NAME, -} -UI_COMPONENTS = { - RESULTS_CACHE_COMPONENT_NAME, - WEBUI_COMPONENT_NAME, -} -STORAGE_MANAGEMENT_COMPONENTS = {GARBAGE_COLLECTOR_COMPONENT_NAME} -ALL_COMPONENTS = ( - COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS | STORAGE_MANAGEMENT_COMPONENTS -) - -# Target names -ALL_TARGET_NAME = "" -CONTROLLER_TARGET_NAME = "controller" - -TARGET_TO_COMPONENTS = { - ALL_TARGET_NAME: ALL_COMPONENTS, - CONTROLLER_TARGET_NAME: GENERAL_SCHEDULING_COMPONENTS - | { - COMPRESSION_SCHEDULER_COMPONENT_NAME, - QUERY_SCHEDULER_COMPONENT_NAME, - } - | STORAGE_MANAGEMENT_COMPONENTS, -} - # Action names ARCHIVE_MANAGER_ACTION_NAME = "archive_manager" @@ -83,8 +43,18 @@ COMPRESSION_JOBS_TABLE_NAME = "compression_jobs" COMPRESSION_TASKS_TABLE_NAME = "compression_tasks" +# Paths +CONTAINER_AWS_CONFIG_DIRECTORY = pathlib.Path("/") / ".aws" +CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp" +CONTAINER_INPUT_LOGS_ROOT_DIR = pathlib.Path("/") / "mnt" / "logs" +CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH = pathlib.Path("etc") / "clp-config.yml" CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path("etc") / "credentials.yml" CLP_DEFAULT_DATA_DIRECTORY_PATH = pathlib.Path("var") / "data" +CLP_DEFAULT_ARCHIVE_DIRECTORY_PATH = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives" +CLP_DEFAULT_ARCHIVE_STAGING_DIRECTORY_PATH = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives" +CLP_DEFAULT_STREAM_DIRECTORY_PATH = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams" +CLP_DEFAULT_STREAM_STAGING_DIRECTORY_PATH = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams" +CLP_DEFAULT_LOG_DIRECTORY_PATH = pathlib.Path("var") / "log" CLP_DEFAULT_DATASET_NAME = "default" CLP_METADATA_TABLE_PREFIX = "clp_" CLP_PACKAGE_CONTAINER_IMAGE_ID_PATH = pathlib.Path("clp-package-image.id") @@ -99,6 +69,11 @@ CLP_REDIS_PASS_ENV_VAR_NAME = "CLP_REDIS_PASS" +class DeploymentType(KebabCaseStrEnum): + BASE = auto() + FULL = auto() + + class StorageEngine(KebabCaseStrEnum): CLP = auto() CLP_S = auto() @@ -174,9 +149,11 @@ def validate_query_engine_package_compatibility(self): class Database(BaseModel): + DEFAULT_PORT: ClassVar[int] = 3306 + type: str = "mariadb" host: str = "localhost" - port: int = 3306 + port: int = DEFAULT_PORT name: str = "clp-db" ssl_cert: Optional[str] = None auto_commit: bool = False @@ -285,6 +262,10 @@ def load_credentials_from_env(self): self.username = _get_env_var(CLP_DB_USER_ENV_VAR_NAME) self.password = _get_env_var(CLP_DB_PASS_ENV_VAR_NAME) + def transform_for_container(self): + self.host = DB_COMPONENT_NAME + self.port = self.DEFAULT_PORT + def _validate_logging_level(cls, value): if not is_valid_logging_level(value): @@ -320,6 +301,8 @@ def validate_logging_level(cls, value): class QueryScheduler(BaseModel): + DEFAULT_PORT: ClassVar[int] = 7000 + host: str = "localhost" port: int = 7000 jobs_poll_delay: float = 0.1 # seconds @@ -345,6 +328,10 @@ def validate_port(cls, value): _validate_port(cls, value) return value + def transform_for_container(self): + self.host = QUERY_SCHEDULER_COMPONENT_NAME + self.port = self.DEFAULT_PORT + class CompressionWorker(BaseModel): logging_level: str = "INFO" @@ -367,8 +354,10 @@ def validate_logging_level(cls, value): class Redis(BaseModel): + DEFAULT_PORT: ClassVar[int] = 6379 + host: str = "localhost" - port: int = 6379 + port: int = DEFAULT_PORT query_backend_database: int = 0 compression_backend_database: int = 1 # redis can perform authentication without a username @@ -407,10 +396,16 @@ def load_credentials_from_env(self): """ self.password = _get_env_var(CLP_REDIS_PASS_ENV_VAR_NAME) + def transform_for_container(self): + self.host = REDIS_COMPONENT_NAME + self.port = self.DEFAULT_PORT + class Reducer(BaseModel): + DEFAULT_PORT: ClassVar[int] = 14009 + host: str = "localhost" - base_port: int = 14009 + base_port: int = DEFAULT_PORT logging_level: str = "INFO" upsert_interval: int = 100 # milliseconds @@ -440,10 +435,16 @@ def validate_upsert_interval(cls, value): raise ValueError(f"{value} is not greater than zero") return value + def transform_for_container(self): + self.host = REDUCER_COMPONENT_NAME + self.base_port = self.DEFAULT_PORT + class ResultsCache(BaseModel): + DEFAULT_PORT: ClassVar[int] = 27017 + host: str = "localhost" - port: int = 27017 + port: int = DEFAULT_PORT db_name: str = "clp-query-results" stream_collection_name: str = "stream-files" retention_period: Optional[int] = 60 @@ -487,10 +488,16 @@ def validate_retention_period(cls, value): def get_uri(self): return f"mongodb://{self.host}:{self.port}/{self.db_name}" + def transform_for_container(self): + self.host = RESULTS_CACHE_COMPONENT_NAME + self.port = self.DEFAULT_PORT + class Queue(BaseModel): + DEFAULT_PORT: ClassVar[int] = 5672 + host: str = "localhost" - port: int = 5672 + port: int = DEFAULT_PORT username: Optional[str] = None password: Optional[str] = None @@ -530,6 +537,10 @@ def load_credentials_from_env(self): self.username = _get_env_var(CLP_QUEUE_USER_ENV_VAR_NAME) self.password = _get_env_var(CLP_QUEUE_PASS_ENV_VAR_NAME) + def transform_for_container(self): + self.host = QUEUE_COMPONENT_NAME + self.port = self.DEFAULT_PORT + class S3Credentials(BaseModel): access_key_id: str @@ -617,6 +628,9 @@ class S3IngestionConfig(BaseModel): def dump_to_primitive_dict(self): return self.model_dump() + def transform_for_container(self): + pass + class FsStorage(BaseModel): type: Literal[StorageType.FS.value] = StorageType.FS.value @@ -670,21 +684,39 @@ def dump_to_primitive_dict(self): class FsIngestionConfig(FsStorage): directory: pathlib.Path = pathlib.Path("/") + def transform_for_container(self): + input_logs_dir = self.directory.resolve() + self.directory = CONTAINER_INPUT_LOGS_ROOT_DIR / input_logs_dir.relative_to( + input_logs_dir.anchor + ) + class ArchiveFsStorage(FsStorage): - directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives" + directory: pathlib.Path = CLP_DEFAULT_ARCHIVE_DIRECTORY_PATH + + def transform_for_container(self): + self.directory = pathlib.Path("/") / CLP_DEFAULT_ARCHIVE_DIRECTORY_PATH class StreamFsStorage(FsStorage): - directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams" + directory: pathlib.Path = CLP_DEFAULT_STREAM_DIRECTORY_PATH + + def transform_for_container(self): + self.directory = pathlib.Path("/") / CLP_DEFAULT_STREAM_DIRECTORY_PATH class ArchiveS3Storage(S3Storage): - staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives" + staging_directory: pathlib.Path = CLP_DEFAULT_ARCHIVE_STAGING_DIRECTORY_PATH + + def transform_for_container(self): + self.staging_directory = pathlib.Path("/") / CLP_DEFAULT_ARCHIVE_STAGING_DIRECTORY_PATH class StreamS3Storage(S3Storage): - staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams" + staging_directory: pathlib.Path = CLP_DEFAULT_STREAM_STAGING_DIRECTORY_PATH + + def transform_for_container(self): + self.staging_directory = pathlib.Path("/") / CLP_DEFAULT_STREAM_STAGING_DIRECTORY_PATH def _get_directory_from_storage_config( @@ -897,8 +929,8 @@ class CLPConfig(BaseModel): archive_output: ArchiveOutput = ArchiveOutput() stream_output: StreamOutput = StreamOutput() - data_directory: pathlib.Path = pathlib.Path("var") / "data" - logs_directory: pathlib.Path = pathlib.Path("var") / "log" + data_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH + logs_directory: pathlib.Path = CLP_DEFAULT_LOG_DIRECTORY_PATH aws_config_directory: Optional[pathlib.Path] = None _container_image_id_path: pathlib.Path = PrivateAttr( @@ -1028,22 +1060,22 @@ def load_container_image_ref(self): def get_shared_config_file_path(self) -> pathlib.Path: return self.logs_directory / CLP_SHARED_CONFIG_FILENAME - def get_runnable_components(self) -> Set[str]: + def get_deployment_type(self) -> DeploymentType: if QueryEngine.PRESTO == self.package.query_engine: - return COMPRESSION_COMPONENTS | UI_COMPONENTS + return DeploymentType.BASE else: - return ALL_COMPONENTS + return DeploymentType.FULL def dump_to_primitive_dict(self): - custom_serialized_fields = ( + custom_serialized_fields = { "database", "queue", "redis", "logs_input", "archive_output", "stream_output", - ) - d = self.model_dump(exclude=set(custom_serialized_fields)) + } + d = self.model_dump(exclude=custom_serialized_fields) for key in custom_serialized_fields: d[key] = getattr(self, key).dump_to_primitive_dict() @@ -1068,6 +1100,28 @@ def validate_presto_config(self): ) return self + def transform_for_container(self): + """ + Adjusts paths and service hosts for containerized execution. + + Converts all relevant directories to absolute paths inside the container + and updates service hostnames/ports to their container service names. + """ + self.data_directory = pathlib.Path("/") / CLP_DEFAULT_DATA_DIRECTORY_PATH + self.logs_directory = pathlib.Path("/") / CLP_DEFAULT_LOG_DIRECTORY_PATH + if self.aws_config_directory is not None: + self.aws_config_directory = CONTAINER_AWS_CONFIG_DIRECTORY + self.logs_input.transform_for_container() + self.archive_output.storage.transform_for_container() + self.stream_output.storage.transform_for_container() + + self.database.transform_for_container() + self.queue.transform_for_container() + self.redis.transform_for_container() + self.results_cache.transform_for_container() + self.query_scheduler.transform_for_container() + self.reducer.transform_for_container() + class WorkerConfig(BaseModel): package: Package = Package() @@ -1089,15 +1143,6 @@ def dump_to_primitive_dict(self): return d -def get_components_for_target(target: str) -> Set[str]: - if target in TARGET_TO_COMPONENTS: - return TARGET_TO_COMPONENTS[target] - elif target in ALL_COMPONENTS: - return {target} - else: - return set() - - def _validate_directory(value: Any): """ Validates that the given value represents a directory path. diff --git a/docs/src/dev-docs/building-package.md b/docs/src/dev-docs/building-package.md index 2964063a4e..993254cff9 100644 --- a/docs/src/dev-docs/building-package.md +++ b/docs/src/dev-docs/building-package.md @@ -85,7 +85,12 @@ task docker-images:package This will create a Docker image named `clp-package:dev`. +The package includes a `docker-compose.yaml` file that can be used to deploy CLP using Docker Compose. +If you want to manually deploy with Docker Compose instead of using the package scripts, see the +[Docker Compose design][docker-compose-design] for more information. + [Docker]: https://docs.docker.com/engine/install/ +[docker-compose-design]: ../dev-docs/design-docker-compose.md [Task]: https://taskfile.dev/ [uv]: https://docs.astral.sh/uv/ [y-scope/clp#1352]: https://github.com/y-scope/clp/issues/1352 diff --git a/docs/src/dev-docs/design-docker-compose.md b/docs/src/dev-docs/design-docker-compose.md new file mode 100644 index 0000000000..9bec8bb49b --- /dev/null +++ b/docs/src/dev-docs/design-docker-compose.md @@ -0,0 +1,199 @@ +# Docker Compose design + +This document explains the technical details of CLP's Docker Compose implementation. + +## Overview + +The Docker Compose implementation follows a controller architecture with a `BaseController` abstract +class and a `DockerComposeController` implementation. + +## Architecture + +### Controller Pattern + +The orchestration uses a controller pattern: + +* `BaseController` (abstract): Defines the interface for provisioning and managing CLP components. +* `DockerComposeController`: Implements Docker Compose-specific logic. + +## Initialization + +The controller performs these initialization steps: + +1. **Provisioning**: Provisions all components and generates component specific configuration + variables. +2. **Configuration Transformation**: The `transform_for_container()` method in `CLPConfig` adapts + configurations for containerized environments +3. **Environment Generation**: Creates a `.env` file with necessary Docker Compose variables + +### Configuration Transformation + +The `transform_for_container()` method in the `CLPConfig` class and related component classes +adapts the configuration for containerized environments by: + +1. Converting host paths to container paths +2. Updating service hostnames to match Docker Compose service names +3. Setting appropriate ports for container communication + +### Environment Variables + +The controller generates a comprehensive set of environment variables that are written to a `.env` +file, including: + +* Component-specific settings (ports, logging levels, concurrency) +* Credentials for database, queue, and Redis services +* Paths for data, logs, archives, and streams +* AWS credentials when needed + +## Deployment Process + +The `start-clp.sh` script executes the `start_clp.py` Python script to orchestrate the deployment. + +### Deployment Types + +CLP supports two deployment types determined by the `package.query_engine` configuration setting: + +1. **BASE**: For deployments using [Presto][presto-integration] as the query engine. Uses only + `docker-compose.base.yaml`. +2. **FULL**: For deployments using CLP's native query engine. Uses both compose files. + +## Docker Compose Files + +The Docker Compose setup uses two files: + +* `docker-compose.base.yaml`: Defines base services for all deployment types, excluding Celery + scheduler and worker components to allow separate Presto [integration][presto-integration]. +* `docker-compose.yaml`: Extends the base file with additional services for complete deployments + +Each file defines services with: + +* Service dependencies via `depends_on` +* Health checks for critical services +* Volume binding mounts for persistent data +* Network configuration +* User permissions + +## Service architecture + +The Docker Compose setup includes the following services: + +:::{mermaid} +graph LR + %% Services + database["database (MySQL)"] + queue["queue (RabbitMQ)"] + redis["redis (Redis)"] + results_cache["results-cache (MongoDB)"] + compression_scheduler["compression-scheduler"] + query_scheduler["query-scheduler"] + compression_worker["compression-worker"] + query_worker["query-worker"] + reducer["reducer"] + webui["webui"] + garbage_collector["garbage-collector"] + + %% One-time jobs + db_table_creator["db-table-creator"] + results_cache_indices_creator["results-cache-indices-creator"] + + %% Dependencies + database -->|healthy| db_table_creator + results_cache -->|healthy| results_cache_indices_creator + db_table_creator -->|completed_successfully| compression_scheduler + queue -->|healthy| compression_scheduler + redis -->|healthy| compression_scheduler + db_table_creator -->|completed_successfully| query_scheduler + queue -->|healthy| query_scheduler + redis -->|healthy| query_scheduler + query_scheduler -->|healthy| reducer + results_cache_indices_creator -->|completed_successfully| reducer + db_table_creator -->|completed_successfully| webui + results_cache_indices_creator -->|completed_successfully| webui + db_table_creator -->|completed_successfully| garbage_collector + results_cache_indices_creator -->|completed_successfully| garbage_collector + + subgraph Databases + database + queue + redis + results_cache + end + + subgraph DB Migration Jobs + db_table_creator + results_cache_indices_creator + end + + subgraph Schedulers + compression_scheduler + query_scheduler + end + + subgraph Workers + compression_worker + query_worker + reducer + end + + subgraph UI & Management + webui + garbage_collector + end +::: + +### Services overview + +The CLP package is composed of several service components. The tables below list the services and +their functions. + +:::{table} Services +:align: left + +| Service | Description | +|-----------------------|-----------------------------------------------------------------| +| database | Database for archive metadata, compression jobs, and query jobs | +| queue | Task queue for schedulers | +| redis | Task result storage for workers | +| compression_scheduler | Scheduler for compression jobs | +| query_scheduler | Scheduler for search/aggregation jobs | +| results_cache | Storage for the workers to return search results to the UI | +| compression_worker | Worker processes for compression jobs | +| query_worker | Worker processes for search/aggregation jobs | +| reducer | Reducers for performing the final stages of aggregation jobs | +| webui | Web server for the UI | +| garbage_collector | Background process for retention control | +::: + +### One-time initialization jobs + +We also set up short-lived run-once "services" to initialize some services listed above. + +:::{table} Initialization jobs +:align: left + +| Job | Description | +|-------------------------------|---------------------------------------------------------| +| db-table-creator | Initializes database tables | +| results-cache-indices-creator | Initializes single-node replica set and sets up indices | +::: + +## Troubleshooting + +If you encounter issues with the Docker Compose deployment: + +1. Check service status: + ```bash + docker compose ps + ``` + +2. View service logs: + ```bash + docker compose logs + ``` + +3. Validate configuration: + ```bash + docker compose config + ``` + +[presto-integration]: ../user-docs/guides-using-presto.md diff --git a/docs/src/dev-docs/index.md b/docs/src/dev-docs/index.md index a4ef9e60fe..c3bf5e0241 100644 --- a/docs/src/dev-docs/index.md +++ b/docs/src/dev-docs/index.md @@ -95,6 +95,7 @@ tooling-gh-workflows :hidden: design-project-structure +design-docker-compose design-kv-ir-streams/index design-metadata-db diff --git a/docs/src/user-docs/guides-multi-node.md b/docs/src/user-docs/guides-multi-node.md index 44078605ad..52f206f7af 100644 --- a/docs/src/user-docs/guides-multi-node.md +++ b/docs/src/user-docs/guides-multi-node.md @@ -2,145 +2,12 @@ A multi-node deployment allows you to run CLP across a distributed set of hosts. -## Requirements - -* [Docker] - * If you're not running as root, ensure docker can be run - [without superuser privileges][docker-non-root]. -* Python 3.9 or higher -* One or more hosts networked together -* A distributed filesystem (e.g. [SeaweedFS]) accessible by all worker hosts through a filesystem - mount - * See [below](#setting-up-seaweedfs) for how to set up a simple SeaweedFS cluster. - -## Cluster overview - -The CLP package is composed of several components--controller components and worker components. In a -cluster, there should be a single instance of each controller component and one or more instances of -worker components. The tables below list the components and their functions. - -:::{table} Controller components -:align: left - -| Component | Description | -|-----------------------|-----------------------------------------------------------------| -| database | Database for archive metadata, compression jobs, and query jobs | -| queue | Task queue for schedulers | -| redis | Task result storage for workers | -| compression_scheduler | Scheduler for compression jobs | -| query_scheduler | Scheduler for search/aggregation jobs | -| results_cache | Storage for the workers to return search results to the UI | -| webui | Web server for the UI | -| garbage_collector | Background process for retention control | -::: - -:::{table} Worker components -:align: left - -| Component | Description | -|--------------------|--------------------------------------------------------------| -| compression_worker | Worker processes for compression jobs | -| query_worker | Worker processes for search/aggregation jobs | -| reducer | Reducers for performing the final stages of aggregation jobs | -::: - -:::{note} -Running additional workers increases the parallelism of compression and search/aggregation jobs. +:::{warning} +CLP now uses Docker Compose for orchestration and support for multi-node deployments has been +removed temporarily. Please contact us if you need immediate support for multi-node deployments, or +stay tuned for future updates on Kubernetes Helm support. ::: -## Configuring CLP - -1. Copy `etc/credentials.template.yml` to `etc/credentials.yml`. -2. Edit `etc/credentials.yml`: - - {style=lower-alpha} - 1. Uncomment the file. - 2. Choose an appropriate username and password. - * Note that these are *new* credentials that will be used by the components. - -3. Choose which hosts you would like to use for the controller components. - * You can use a single host for all controller components. -4. Edit `etc/clp-config.yml`: - - {style=lower-alpha} - 1. Uncomment the file. - 2. Set the `host` config of each controller component to the host that you'd like to run them - on. - * If desired, you can run different controller components on different hosts. - 3. Change any of the controller components' ports that will conflict with services you already - have running. - 4. Set `archive_output.directory` to a directory on the distributed filesystem. - * Ideally, the directory should be empty or should not yet exist (CLP will create it) since - CLP will write several files and directories directly to the given directory. - -5. Download and extract the package on all nodes. -6. Copy the `credentials.yml` and `clp-config.yml` files that you created above and paste them - into `etc` on all the hosts where you extracted the package. - -## Starting CLP - -Before starting each CLP component, note that some components must be started before others. We -organize the components into groups below, where components in a group can be started in any order, -but all components in a group must be started before starting a component in the next group. - -**Group 1 components:** - -* `database` -* `queue` -* `redis` -* `results_cache` - -**Group 2 components:** - -* `compression_scheduler` -* `query_scheduler` -* `garbage_collector` - -**Group 3 components:** - -* `compression_worker` -* `query_worker` -* `reducer` - -For each component, on the host where you want to run the component, run: - -```bash -sbin/start-clp.sh -``` - -Where `` is the name of the component in the groups above. - -## Using CLP - -To learn how to compress and search your logs, check out the quick-start guide that corresponds to -the flavor of CLP you're running: - -::::{grid} 1 1 2 2 -:gutter: 2 - -:::{grid-item-card} -:link: quick-start/clp-json -Using clp-json -^^^ -How to compress and search JSON logs. -::: - -:::{grid-item-card} -:link: quick-start/clp-text -Using clp-text -^^^ -How to compress and search unstructured text logs. -::: -:::: - -## Stopping CLP - -If you need to stop the cluster, run: - -```bash -sbin/stop-clp.sh -``` - ## Setting up SeaweedFS The instructions below are for running a simple SeaweedFS cluster on a set of hosts. For other use diff --git a/taskfile.yaml b/taskfile.yaml index 18b647feb5..998b9fba11 100644 --- a/taskfile.yaml +++ b/taskfile.yaml @@ -121,6 +121,7 @@ tasks: - "components/clp-py-utils/dist/*.whl" - "components/job-orchestration/dist/*.whl" - "components/package-template/src/**/*" + - "tools/deployment/package/**/*" generates: ["{{.CHECKSUM_FILE}}"] deps: - "core" @@ -172,6 +173,10 @@ tasks: - |- cd "{{.OUTPUT_DIR}}/var/www/webui" PATH="{{.G_NODEJS_22_BIN_DIR}}":$PATH npm ci --omit=dev + - >- + rsync -a + "tools/deployment/package/" + "{{.OUTPUT_DIR}}" - "echo '{{.G_PACKAGE_VERSION}}' > '{{.OUTPUT_DIR}}/VERSION'" # This command must be last - task: "utils:checksum:compute" diff --git a/tools/deployment/package/docker-compose.base.yaml b/tools/deployment/package/docker-compose.base.yaml new file mode 100644 index 0000000000..2c00c58f15 --- /dev/null +++ b/tools/deployment/package/docker-compose.base.yaml @@ -0,0 +1,355 @@ +name: "clp-package-base" + +# Common service defaults. +x-service-defaults: &service_defaults + image: "${CLP_PACKAGE_CONTAINER:-clp-package}" + logging: + driver: "local" + stop_grace_period: "3s" + user: "${CLP_UID_GID:-1000:1000}" + +# Common healthcheck defaults. +x-healthcheck-defaults: &healthcheck_defaults + # Avoid lowering to prevent excessive resource usage. + interval: "30s" + # Mark unhealthy after 3 failed probes. + # - In steady state, ( + ) × 3 = ~90s before the service is marked unhealthy. + # - From startup, (60s) + ~90s = ~150s before the service is marked unhealthy. + retries: 3 + # Frequent checks during startup allow fast transition to healthy. + start_interval: "2s" + # Ignore failures for ~15 frequent checks before counting retries. + start_period: "60s" + # Short timeout since no remote communication is expected. + timeout: "2s" + +# Common volume definitions. +x-volume-definitions: + aws-config-readonly: &volume_aws_config_readonly + type: "bind" + source: "${CLP_AWS_CONFIG_DIR_HOST:-~/.aws}" + target: "/.aws" + read_only: true + clp-config-readonly: &volume_clp_config_readonly + type: "bind" + source: "${CLP_LOGS_DIR_HOST:-./var/log}/.clp-config.yml" + target: "/etc/clp-config.yml" + read_only: true + logs-input-readonly: &volume_root_logs_readonly + type: "bind" + source: "${CLP_LOGS_INPUT_DIR_HOST:-/}" + target: "${CLP_LOGS_INPUT_DIR_CONTAINER:-/mnt/logs}" + read_only: true + +services: + database: + <<: *service_defaults + image: "${CLP_DB_IMAGE:-mysql:8.0.23}" + hostname: "database" + user: "${CLP_SERVICE_CONTAINER_UID_GID:-1000:1000}" + environment: + MYSQL_DATABASE: "${CLP_DB_NAME}" + MYSQL_PASSWORD: "${CLP_DB_PASS}" + MYSQL_ROOT_PASSWORD: "${CLP_DB_PASS}" + MYSQL_USER: "${CLP_DB_USER}" + ports: + - host_ip: "${CLP_DB_HOST:-127.0.0.1}" + published: "${CLP_DB_PORT:-3306}" + target: 3306 + volumes: + - type: "bind" + source: "${CLP_DB_CONF_LOGGING_FILE_HOST:-./etc/mysql/conf.d/logging.cnf}" + target: "/etc/mysql/conf.d/logging.cnf" + read_only: true + - type: "bind" + source: "${CLP_DB_DATA_DIR_HOST:-./var/data/database}" + target: "/var/lib/mysql" + - type: "bind" + source: "${CLP_DB_LOGS_DIR_HOST:-./var/log/database}" + target: "/var/log/mysql" + healthcheck: + <<: *healthcheck_defaults + test: [ + "CMD", + "mysqladmin", "ping", + "--silent", + "-h", "127.0.0.1", + "-u", "${CLP_DB_USER}", + "--password=${CLP_DB_PASS}" + ] + + db-table-creator: + <<: *service_defaults + hostname: "db_table_creator" + environment: + CLP_DB_PASS: "${CLP_DB_PASS}" + CLP_DB_USER: "${CLP_DB_USER}" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + volumes: + - *volume_clp_config_readonly + depends_on: + database: + condition: "service_healthy" + command: [ + "python3", + "-u", + "-m", "clp_py_utils.create-db-tables", + "--config", "/etc/clp-config.yml", + "--storage-engine", "${CLP_PACKAGE_STORAGE_ENGINE:-clp}", + ] + + queue: + <<: *service_defaults + image: "rabbitmq:3.9.8" + hostname: "queue" + user: "${CLP_SERVICE_CONTAINER_UID_GID:-1000:1000}" + environment: + RABBITMQ_DEFAULT_PASS: "${CLP_QUEUE_PASS}" + RABBITMQ_DEFAULT_USER: "${CLP_QUEUE_USER}" + RABBITMQ_LOGS: "/var/log/rabbitmq/rabbitmq.log" + ports: + - host_ip: "${CLP_QUEUE_HOST:-127.0.0.1}" + published: "${CLP_QUEUE_PORT:-5672}" + target: 5672 + volumes: + - type: "bind" + source: "${CLP_QUEUE_LOGS_DIR_HOST:-./var/log/queue}" + target: "/var/log/rabbitmq" + healthcheck: + <<: *healthcheck_defaults + test: [ + "CMD", + "rabbitmq-diagnostics", "check_running" + ] + + redis: + <<: *service_defaults + image: "redis:7.2.4" + hostname: "redis" + user: "${CLP_SERVICE_CONTAINER_UID_GID:-1000:1000}" + ports: + - host_ip: "${CLP_REDIS_HOST:-127.0.0.1}" + published: "${CLP_REDIS_PORT:-6379}" + target: 6379 + volumes: + - type: "bind" + source: "${CLP_REDIS_CONF_FILE_HOST:-./etc/redis/redis.conf}" + target: "/usr/local/etc/redis/redis.conf" + read_only: true + - type: "bind" + source: "${CLP_REDIS_DATA_DIR_HOST:-./var/data/redis}" + target: "/data" + - type: "bind" + source: "${CLP_REDIS_LOGS_DIR_HOST:-./var/log/redis}" + target: "/var/log/redis" + healthcheck: + <<: *healthcheck_defaults + test: [ + "CMD", + "redis-cli", + "-h", "127.0.0.1", + "-p", "6379", + "-a", "${CLP_REDIS_PASS}", + "PING" + ] + command: [ + "redis-server", + "/usr/local/etc/redis/redis.conf", + "--requirepass", "${CLP_REDIS_PASS}" + ] + + results-cache: + <<: *service_defaults + image: "mongo:7.0.1" + hostname: "results_cache" + user: "${CLP_SERVICE_CONTAINER_UID_GID:-1000:1000}" + ports: + - host_ip: "${CLP_RESULTS_CACHE_HOST:-127.0.0.1}" + published: "${CLP_RESULTS_CACHE_PORT:-27017}" + target: 27017 + volumes: + - type: "bind" + source: "${CLP_RESULTS_CACHE_CONF_FILE_HOST:-./etc/mongo/mongod.conf}" + target: "/etc/mongo/mongod.conf" + read_only: true + - type: "bind" + source: "${CLP_RESULTS_CACHE_DATA_DIR_HOST:-./var/data/results_cache}" + target: "/data/db" + - type: "bind" + source: "${CLP_RESULTS_CACHE_LOGS_DIR_HOST:-./var/log/results_cache}" + target: "/var/log/mongodb" + healthcheck: + <<: *healthcheck_defaults + test: [ + "CMD-SHELL", + "echo 'db.runCommand(\"ping\").ok' | mongosh 127.0.0.1:27017/test --quiet" + ] + command: [ + "--config", "/etc/mongo/mongod.conf", + "--bind_ip", "0.0.0.0", + ] + + results-cache-indices-creator: + <<: *service_defaults + hostname: "results_cache_indices_creator" + environment: + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + depends_on: + results-cache: + condition: "service_healthy" + command: [ + "python3", + "-u", + "-m", "clp_py_utils.initialize-results-cache", + "--uri", "mongodb://results_cache:27017/${CLP_RESULTS_CACHE_DB_NAME:-clp-query-results}", + "--stream-collection", "${CLP_RESULTS_CACHE_STREAM_COLLECTION_NAME:-stream-files}", + ] + + compression-scheduler: + <<: *service_defaults + hostname: "compression_scheduler" + environment: + BROKER_URL: "amqp://${CLP_QUEUE_USER}:${CLP_QUEUE_PASS}@queue:5672" + CLP_DB_PASS: "${CLP_DB_PASS}" + CLP_DB_USER: "${CLP_DB_USER}" + CLP_LOGGING_LEVEL: "${CLP_COMPRESSION_SCHEDULER_LOGGING_LEVEL:-INFO}" + CLP_LOGS_DIR: "/var/log" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + RESULT_BACKEND: >- + redis://default:${CLP_REDIS_PASS}@redis:6379/${CLP_REDIS_COMPRESSION_BACKEND_DB:-1} + volumes: + - *volume_aws_config_readonly + - *volume_clp_config_readonly + - *volume_root_logs_readonly + - type: "bind" + source: "${CLP_COMPRESSION_SCHEDULER_LOG_FILE_HOST:-./var/log/compression_scheduler.log}" + target: "/var/log/compression_scheduler.log" + depends_on: + db-table-creator: + condition: "service_completed_successfully" + queue: + condition: "service_healthy" + redis: + condition: "service_healthy" + command: [ + "python3", + "-u", + "-m", "job_orchestration.scheduler.compress.compression_scheduler", + "--config", "/etc/clp-config.yml" + ] + + compression-worker: + <<: *service_defaults + hostname: "compression_worker" + environment: + AWS_ACCESS_KEY_ID: "${CLP_AWS_ACCESS_KEY_ID}" + AWS_SECRET_ACCESS_KEY: "${CLP_AWS_SECRET_ACCESS_KEY}" + BROKER_URL: "amqp://${CLP_QUEUE_USER}:${CLP_QUEUE_PASS}@queue:5672" + CLP_CONFIG_PATH: "/etc/clp-config.yml" + CLP_HOME: "/opt/clp" + CLP_LOGGING_LEVEL: "${CLP_COMPRESSION_WORKER_LOGGING_LEVEL:-INFO}" + CLP_LOGS_DIR: "/var/log/compression_worker" + CLP_WORKER_LOG_PATH: "/var/log/compression_worker/worker.log" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + RESULT_BACKEND: >- + redis://default:${CLP_REDIS_PASS}@redis:6379/${CLP_REDIS_COMPRESSION_BACKEND_DB:-1} + volumes: + - *volume_aws_config_readonly + - *volume_clp_config_readonly + - *volume_root_logs_readonly + - type: "bind" + source: "${CLP_ARCHIVE_OUTPUT_DIR_HOST:-./var/data/archives}" + target: "/var/data/archives" + - type: "bind" + source: "${CLP_ARCHIVE_OUTPUT_DIR_HOST:-./var/data/staged-archives}" + target: "/var/data/staged-archives" + - type: "bind" + source: "${CLP_COMPRESSION_WORKER_LOGS_DIR_HOST:-./var/log/compression_worker}" + target: "/var/log/compression_worker" + - type: "bind" + source: "${CLP_DATA_DIR_HOST:-./var/data}" + target: "/var/data" + command: [ + "python3", + "-u", + "/opt/clp/lib/python3/site-packages/bin/celery", + "-A", "job_orchestration.executor.compress", + "worker", + "--concurrency", "${CLP_COMPRESSION_WORKER_CONCURRENCY:-1}", + "--loglevel", "WARNING", + "-f", "/var/log/compression_worker/worker.log", + "-Q", "compression", + "-n", "compression-worker" + ] + + webui: + <<: *service_defaults + hostname: "webui" + environment: + CLP_DB_PASS: "${CLP_DB_PASS}" + CLP_DB_USER: "${CLP_DB_USER}" + HOST: "0.0.0.0" + NODE_ENV: "production" + NODE_PATH: "/opt/clp/var/www/webui/server/node_modules" + PORT: "4000" + RATE_LIMIT: "${CLP_WEBUI_RATE_LIMIT:-1000}" + ports: + - host_ip: "${CLP_WEBUI_HOST:-127.0.0.1}" + published: "${CLP_WEBUI_PORT:-4000}" + target: 4000 + volumes: + - *volume_aws_config_readonly + - type: "bind" + source: "${CLP_STREAM_OUTPUT_DIR_HOST:-./var/data/streams}" + target: "/var/data/streams" + - type: "bind" + source: "./var/www/webui/client/settings.json" + target: "/opt/clp/var/www/webui/client/settings.json" + read_only: true + - type: "bind" + source: "./var/www/webui/server/dist/settings.json" + target: "/opt/clp/var/www/webui/server/dist/settings.json" + read_only: true + depends_on: + db-table-creator: + condition: "service_completed_successfully" + results-cache-indices-creator: + condition: "service_completed_successfully" + command: [ + "/opt/clp/bin/node-22", + "/opt/clp/var/www/webui/server/dist/src/main.js" + ] + healthcheck: + <<: *healthcheck_defaults + test: [ + "CMD", + "bash", + "-c", + "< /dev/tcp/webui/4000" + ] + + garbage-collector: + <<: *service_defaults + hostname: "garbage_collector" + environment: + CLP_DB_PASS: "${CLP_DB_PASS}" + CLP_DB_USER: "${CLP_DB_USER}" + CLP_HOME: "/opt/clp" + CLP_LOGGING_LEVEL: "${CLP_GC_LOGGING_LEVEL:-INFO}" + CLP_LOGS_DIR: "/var/log/garbage_collector" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + volumes: + - *volume_clp_config_readonly + - type: "bind" + source: "${CLP_LOGS_DIR_HOST:-./var/log}/garbage_collector" + target: "/var/log/garbage_collector" + depends_on: + db-table-creator: + condition: "service_completed_successfully" + results-cache-indices-creator: + condition: "service_completed_successfully" + command: [ + "python3", "-u", + "-m", "job_orchestration.garbage_collector.garbage_collector", + "--config", "/etc/clp-config.yml", + ] diff --git a/tools/deployment/package/docker-compose.yaml b/tools/deployment/package/docker-compose.yaml new file mode 100644 index 0000000000..44ecae5e89 --- /dev/null +++ b/tools/deployment/package/docker-compose.yaml @@ -0,0 +1,138 @@ +name: "clp-package" + +include: ["docker-compose.base.yaml"] + +# Below x-* definitions are duplicated from docker-compose.base.yaml. Refer to that file for +# documentation. +x-service-defaults: &service_defaults + image: "${CLP_PACKAGE_CONTAINER:-clp-package}" + logging: + driver: "local" + stop_grace_period: "3s" + user: "${CLP_UID_GID:-1000:1000}" +x-healthcheck-defaults: &healthcheck_defaults + interval: "30s" + retries: 3 + start_interval: "2s" + start_period: "60s" + timeout: "2s" +x-volume-definitions: + aws-config-readonly: &volume_aws_config_readonly + type: "bind" + source: "${CLP_AWS_CONFIG_DIR_HOST:-~/.aws}" + target: "/.aws" + read_only: true + clp-config-readonly: &volume_clp_config_readonly + type: "bind" + source: "${CLP_LOGS_DIR_HOST:-./var/log}/.clp-config.yml" + target: "/etc/clp-config.yml" + read_only: true + +services: + query-scheduler: + <<: *service_defaults + hostname: "query_scheduler" + environment: + BROKER_URL: "amqp://${CLP_QUEUE_USER}:${CLP_QUEUE_PASS}@queue:5672" + CLP_DB_PASS: "${CLP_DB_PASS}" + CLP_DB_USER: "${CLP_DB_USER}" + CLP_LOGGING_LEVEL: "${CLP_QUERY_SCHEDULER_LOGGING_LEVEL:-INFO}" + CLP_LOGS_DIR: "/var/log" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + RESULT_BACKEND: >- + redis://default:${CLP_REDIS_PASS}@redis:6379/${CLP_REDIS_QUERY_BACKEND_DB:-0} + volumes: + - *volume_clp_config_readonly + - type: "bind" + source: "${CLP_QUERY_SCHEDULER_LOG_FILE_HOST:-./var/log/query_scheduler.log}" + target: "/var/log/query_scheduler.log" + depends_on: + db-table-creator: + condition: "service_completed_successfully" + queue: + condition: "service_healthy" + redis: + condition: "service_healthy" + command: [ + "python3", + "-u", + "-m", "job_orchestration.scheduler.query.query_scheduler", + "--config", "/etc/clp-config.yml" + ] + healthcheck: + <<: *healthcheck_defaults + test: [ + "CMD", + "bash", + "-c", + "< /dev/tcp/query_scheduler/7000" + ] + + query-worker: + <<: *service_defaults + hostname: "query_worker" + environment: + AWS_ACCESS_KEY_ID: "${CLP_AWS_ACCESS_KEY_ID}" + AWS_SECRET_ACCESS_KEY: "${CLP_AWS_SECRET_ACCESS_KEY}" + BROKER_URL: "amqp://${CLP_QUEUE_USER}:${CLP_QUEUE_PASS}@queue:5672" + CLP_CONFIG_PATH: "/etc/clp-config.yml" + CLP_HOME: "/opt/clp" + CLP_LOGGING_LEVEL: "${CLP_QUERY_WORKER_LOGGING_LEVEL:-INFO}" + CLP_LOGS_DIR: "/var/log/query_worker" + CLP_WORKER_LOG_PATH: "/var/log/query_worker/worker.log" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + RESULT_BACKEND: >- + redis://default:${CLP_REDIS_PASS}@redis:6379/${CLP_REDIS_QUERY_BACKEND_DB:-0} + volumes: + - *volume_aws_config_readonly + - *volume_clp_config_readonly + - type: "bind" + source: "${CLP_ARCHIVE_OUTPUT_DIR_HOST:-./var/data/archives}" + target: "/var/data/archives" + - type: "bind" + source: "${CLP_QUERY_WORKER_LOGS_DIR_HOST:-./var/log/query_worker}" + target: "/var/log/query_worker" + - type: "bind" + source: "${CLP_STREAM_OUTPUT_DIR_HOST:-./var/data/staged-streams}" + target: "/var/data/staged-streams" + - type: "bind" + source: "${CLP_STREAM_OUTPUT_DIR_HOST:-./var/data/streams}" + target: "/var/data/streams" + command: [ + "python3", + "-u", + "/opt/clp/lib/python3/site-packages/bin/celery", + "-A", "job_orchestration.executor.query", + "worker", + "--concurrency", "${CLP_QUERY_WORKER_CONCURRENCY:-1}", + "--loglevel", "WARNING", + "-f", "/var/log/query_worker/worker.log", + "-Q", "query", + "-n", "query-worker" + ] + + reducer: + <<: *service_defaults + hostname: "reducer" + environment: + CLP_HOME: "/opt/clp" + CLP_LOGGING_LEVEL: "${CLP_REDUCER_LOGGING_LEVEL:-INFO}" + CLP_LOGS_DIR: "/var/log/reducer" + PYTHONPATH: "/opt/clp/lib/python3/site-packages" + volumes: + - *volume_clp_config_readonly + - type: "bind" + source: "${CLP_REDUCER_LOGS_DIR_HOST:-./var/log/reducer}" + target: "/var/log/reducer" + depends_on: + query-scheduler: + condition: "service_healthy" + results-cache-indices-creator: + condition: "service_completed_successfully" + command: [ + "python3", "-u", + "-m", "job_orchestration.reducer.reducer", + "--config", "/etc/clp-config.yml", + "--concurrency", "${CLP_REDUCER_CONCURRENCY:-1}", + "--upsert-interval", "${CLP_REDUCER_UPSERT_INTERVAL:-100}" + ]