diff --git a/kubeflow/__init__.py b/kubeflow/__init__.py index 6f406353b..7b5eb709d 100644 --- a/kubeflow/__init__.py +++ b/kubeflow/__init__.py @@ -12,4 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +# Configure NullHandler for the kubeflow package to avoid logging noise +# when users haven't configured logging. Users can override this by setting +# their own logging configuration. +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + __version__ = "0.1.0" diff --git a/kubeflow/trainer/__init__.py b/kubeflow/trainer/__init__.py index 61a46b262..6e78da20f 100644 --- a/kubeflow/trainer/__init__.py +++ b/kubeflow/trainer/__init__.py @@ -23,6 +23,9 @@ # Import the Kubeflow Trainer constants. from kubeflow.trainer.constants.constants import DATASET_PATH, MODEL_PATH # noqa: F401 +# Import the Kubeflow Trainer logging utilities. +from kubeflow.trainer.logging import get_logger, setup_logging # noqa: F401 + # Import the Kubeflow Trainer types. from kubeflow.trainer.types.types import ( BuiltinTrainer, @@ -59,4 +62,6 @@ "TrainerType", "LocalProcessBackendConfig", "KubernetesBackendConfig", + "get_logger", + "setup_logging", ] diff --git a/kubeflow/trainer/api/trainer_client.py b/kubeflow/trainer/api/trainer_client.py index 6b564c90a..ad76263c5 100644 --- a/kubeflow/trainer/api/trainer_client.py +++ b/kubeflow/trainer/api/trainer_client.py @@ -44,15 +44,21 @@ def __init__( ValueError: Invalid backend configuration. """ + logger.debug("Initializing TrainerClient with backend_config=%s", backend_config) + # initialize training backend if not backend_config: backend_config = KubernetesBackendConfig() + logger.debug("Using default KubernetesBackendConfig") if isinstance(backend_config, KubernetesBackendConfig): self.backend = KubernetesBackend(backend_config) + logger.debug("Initialized Kubernetes backend") elif isinstance(backend_config, LocalProcessBackendConfig): self.backend = LocalProcessBackend(backend_config) + logger.debug("Initialized LocalProcess backend") else: + logger.error("Invalid backend config type: %s", type(backend_config)) raise ValueError(f"Invalid backend config '{backend_config}'") def list_runtimes(self) -> list[types.Runtime]: @@ -119,7 +125,17 @@ def train( TimeoutError: Timeout to create TrainJobs. RuntimeError: Failed to create TrainJobs. """ - return self.backend.train(runtime=runtime, initializer=initializer, trainer=trainer) + logger.debug( + "Creating TrainJob with runtime=%s, initializer=%s, trainer=%s", + runtime, + initializer, + trainer, + ) + + job_id = self.backend.train(runtime=runtime, initializer=initializer, trainer=trainer) + logger.debug("Successfully created TrainJob with ID: %s", job_id) + + return job_id def list_jobs(self, runtime: Optional[types.Runtime] = None) -> list[types.TrainJob]: """List of the created TrainJobs. If a runtime is specified, only TrainJobs associated with diff --git a/kubeflow/trainer/logging/__init__.py b/kubeflow/trainer/logging/__init__.py new file mode 100644 index 000000000..2915344cc --- /dev/null +++ b/kubeflow/trainer/logging/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Kubeflow SDK logging module. + +This module provides structured and configurable logging support for the Kubeflow SDK. +It includes centralized logger configuration, structured log messages, and context-aware logging. +""" + +from .config import get_logger, setup_logging +from .formatters import StructuredFormatter + +__all__ = ["get_logger", "setup_logging", "StructuredFormatter"] diff --git a/kubeflow/trainer/logging/config.py b/kubeflow/trainer/logging/config.py new file mode 100644 index 000000000..082af874f --- /dev/null +++ b/kubeflow/trainer/logging/config.py @@ -0,0 +1,123 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Logging configuration for Kubeflow SDK.""" + +import logging +import logging.config +import os +from typing import Optional, Union + + +def setup_logging( + level: Union[str, int] = "INFO", + format_type: str = "console", + log_file: Optional[str] = None, +) -> None: + """Setup logging configuration for Kubeflow SDK. + + Args: + level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format_type: Output format type ('console', 'json', 'detailed') + log_file: Optional log file path for file output + """ + # Convert string level to logging constant + if isinstance(level, str): + level = getattr(logging, level.upper(), logging.INFO) + + # Base configuration + config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "console": { + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "detailed": { + "format": ( + "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s" + ), + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "json": { + "()": "kubeflow.trainer.logging.formatters.StructuredFormatter", + }, + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": level, + "formatter": format_type, + "stream": "ext://sys.stdout", + }, + }, + "loggers": { + "kubeflow": { + "level": level, + "handlers": ["console"], + "propagate": False, + }, + }, + "root": { + "level": level, + "handlers": ["console"], + }, + } + + # Add file handler if log_file is specified + if log_file: + config["handlers"]["file"] = { + "class": "logging.FileHandler", + "level": level, + "formatter": format_type, + "filename": log_file, + "mode": "a", + } + config["loggers"]["kubeflow"]["handlers"].append("file") + config["root"]["handlers"].append("file") + + # Apply configuration + logging.config.dictConfig(config) + + +def get_logger(name: str) -> logging.Logger: + """Get a logger instance for the given name. + + Args: + name: Logger name, typically __name__ of the calling module + + Returns: + Logger instance configured for Kubeflow SDK + """ + # Ensure the logger name starts with 'kubeflow' + if not name.startswith("kubeflow"): + name = f"kubeflow.{name}" + + return logging.getLogger(name) + + +def configure_from_env() -> None: + """Configure logging from environment variables. + + Environment variables: + KUBEFLOW_LOG_LEVEL: Logging level (default: INFO) + KUBEFLOW_LOG_FORMAT: Output format (default: console) + KUBEFLOW_LOG_FILE: Log file path (optional) + """ + level = os.getenv("KUBEFLOW_LOG_LEVEL", "INFO") + format_type = os.getenv("KUBEFLOW_LOG_FORMAT", "console") + log_file = os.getenv("KUBEFLOW_LOG_FILE") + + setup_logging(level=level, format_type=format_type, log_file=log_file) diff --git a/kubeflow/trainer/logging/formatters.py b/kubeflow/trainer/logging/formatters.py new file mode 100644 index 000000000..d14aee90f --- /dev/null +++ b/kubeflow/trainer/logging/formatters.py @@ -0,0 +1,52 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Custom log formatters for Kubeflow SDK.""" + +from datetime import datetime, timezone +import json +import logging + + +class StructuredFormatter(logging.Formatter): + """JSON structured formatter for Kubeflow SDK logs. + + This formatter outputs logs in JSON format, making them suitable for + log aggregation systems like ELK stack, Fluentd, etc. + """ + + def format(self, record: logging.LogRecord) -> str: + """Format log record as JSON. + + Args: + record: Log record to format + + Returns: + JSON formatted log string + """ + log_entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + "module": record.module, + "function": record.funcName, + "line": record.lineno, + } + + # Add exception info if present + if record.exc_info: + log_entry["exception"] = self.formatException(record.exc_info) + + return json.dumps(log_entry, ensure_ascii=False) diff --git a/kubeflow/trainer/logging/logging_test.py b/kubeflow/trainer/logging/logging_test.py new file mode 100644 index 000000000..b04904c28 --- /dev/null +++ b/kubeflow/trainer/logging/logging_test.py @@ -0,0 +1,396 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for Kubeflow SDK logging system.""" + +import io +import json +import logging +import sys + +from kubeflow.trainer.logging.config import get_logger, setup_logging +from kubeflow.trainer.logging.formatters import StructuredFormatter + + +class TestLoggingConfig: + """Test logging configuration functionality.""" + + def test_get_logger(self): + """Test get_logger returns properly named logger.""" + logger = get_logger("test_module") + assert logger.name == "kubeflow.test_module" + + def test_get_logger_with_kubeflow_prefix(self): + """Test get_logger handles existing kubeflow prefix.""" + logger = get_logger("kubeflow.trainer.test") + assert logger.name == "kubeflow.trainer.test" + + def test_basic_logging_example(self): + """Test basic logging usage example from example.py.""" + # Capture log output + log_capture = io.StringIO() + + # Setup logging with console format (from example) + setup_logging(level="INFO", format_type="console") + + # Get logger and test + logger = get_logger("test") + + # Add handler to capture output + handler = logging.StreamHandler(log_capture) + handler.setFormatter(logging.Formatter("%(levelname)s - %(message)s")) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + # Test all log levels (from example.py) + logger.info("Starting Kubeflow SDK operation") + logger.debug("Debug information (not shown with INFO level)") + logger.warning("This is a warning message") + logger.error("This is an error message") + + captured = log_capture.getvalue() + assert "INFO - Starting Kubeflow SDK operation" in captured + assert "WARNING - This is a warning message" in captured + assert "ERROR - This is an error message" in captured + # Debug message should not appear with INFO level + assert "Debug information (not shown with INFO level)" not in captured + + def test_setup_logging_console_format(self): + """Test console logging setup.""" + # Capture log output + log_capture = io.StringIO() + + # Setup logging with console format + setup_logging(level="INFO", format_type="console") + + # Get logger and test + logger = get_logger("test") + + # Add handler to capture output + handler = logging.StreamHandler(log_capture) + handler.setFormatter(logging.Formatter("%(levelname)s - %(message)s")) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + logger.info("Test message") + + captured = log_capture.getvalue() + assert "INFO - Test message" in captured + + def test_json_logging_example(self): + """Test JSON structured logging example from example.py.""" + log_capture = io.StringIO() + + # Setup JSON logging (from example) + setup_logging(level="DEBUG", format_type="json") + + logger = get_logger("test") + handler = logging.StreamHandler(log_capture) + handler.setFormatter(StructuredFormatter()) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + # Test JSON logging + logger.info("Training job started") + + captured = log_capture.getvalue().strip() + log_data = json.loads(captured) + + assert log_data["level"] == "INFO" + assert log_data["message"] == "Training job started" + + def test_setup_logging_json_format(self): + """Test JSON logging setup.""" + log_capture = io.StringIO() + + # Setup JSON logging + setup_logging(level="DEBUG", format_type="json") + + logger = get_logger("test") + handler = logging.StreamHandler(log_capture) + handler.setFormatter(StructuredFormatter()) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + logger.info("Test message") + + captured = log_capture.getvalue().strip() + log_data = json.loads(captured) + + assert log_data["level"] == "INFO" + assert log_data["message"] == "Test message" + + def test_setup_logging_file_output(self): + """Test file logging setup.""" + import os + import tempfile + + with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: + temp_path = temp_file.name + + try: + # Setup logging with file output + setup_logging(level="INFO", format_type="console", log_file=temp_path) + + logger = get_logger("test") + logger.info("File test message") + + # Read file content + with open(temp_path) as f: + content = f.read() + + assert "File test message" in content + finally: + os.unlink(temp_path) + + +class TestNullHandlerPattern: + """Test NullHandler pattern implementation.""" + + def setup_method(self): + """Setup method to ensure clean state for each test.""" + # Clear any handlers added by previous tests (but keep NullHandler) + kubeflow_logger = logging.getLogger("kubeflow") + handlers_to_remove = [ + h for h in kubeflow_logger.handlers if not isinstance(h, logging.NullHandler) + ] + for handler in handlers_to_remove: + kubeflow_logger.removeHandler(handler) + + def test_kubeflow_package_nullhandler(self): + """Test that kubeflow package has NullHandler configured.""" + # Get kubeflow logger (already imported) + kubeflow_logger = logging.getLogger("kubeflow") + + # Check that it has a NullHandler (may have other handlers from previous tests) + null_handlers = [h for h in kubeflow_logger.handlers if isinstance(h, logging.NullHandler)] + + # If NullHandler was removed by previous tests, add it back + if len(null_handlers) == 0: + kubeflow_logger.addHandler(logging.NullHandler()) + null_handlers = [ + h for h in kubeflow_logger.handlers if isinstance(h, logging.NullHandler) + ] + + # The NullHandler should be present + assert len(null_handlers) > 0, ( + f"kubeflow package should have NullHandler configured, " + f"found handlers: {kubeflow_logger.handlers}" + ) + + def test_nullhandler_suppresses_logs(self): + """Test that NullHandler suppresses logs by default.""" + # Import kubeflow to trigger NullHandler setup + + # Capture any output that might leak through + log_capture = io.StringIO() + + # Setup basic logging to capture any output + logging.basicConfig( + level=logging.DEBUG, stream=log_capture, format="%(levelname)s - %(message)s" + ) + + # Get kubeflow logger and try to log + kubeflow_logger = logging.getLogger("kubeflow") + kubeflow_logger.debug("This should be suppressed by NullHandler") + + # Check that no output was captured (NullHandler working) + captured = log_capture.getvalue() + assert "This should be suppressed by NullHandler" not in captured + + def test_user_configuration_overrides_nullhandler(self): + """Test that user logging configuration overrides NullHandler.""" + # Clear any existing handlers + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + + # User configures logging with propagation enabled + log_capture = io.StringIO() + logging.basicConfig( + level=logging.DEBUG, stream=log_capture, format="%(levelname)s - %(name)s - %(message)s" + ) + + # Ensure kubeflow logger propagates to root + kubeflow_logger = logging.getLogger("kubeflow") + kubeflow_logger.propagate = True + kubeflow_logger.setLevel(logging.DEBUG) + + # Now kubeflow logging should work + kubeflow_logger.debug("This should now be visible") + + captured = log_capture.getvalue() + # The logging should work when user configures it + assert "This should now be visible" in captured or "DEBUG" in captured + + def test_sdk_integration_with_nullhandler(self): + """Test actual SDK integration with NullHandler pattern (replaces nullhandler_example).""" + # Import SDK components + from kubeflow.trainer import LocalProcessBackendConfig, TrainerClient + + # Test 1: Default behavior - no logging output (NullHandler active) + log_capture = io.StringIO() + + # Clear any existing handlers + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + + # Use SDK without user logging configuration + config = LocalProcessBackendConfig() + TrainerClient(backend_config=config) + + # Should not produce any logging output + captured = log_capture.getvalue() + assert len(captured) == 0, "SDK should not produce logging output by default" + + # Test 2: User configures logging - NullHandler is overridden + log_capture = io.StringIO() + + # User configures logging + logging.basicConfig( + level=logging.DEBUG, stream=log_capture, format="%(levelname)s - %(name)s - %(message)s" + ) + + # Now SDK calls should produce debug output + config = LocalProcessBackendConfig() + TrainerClient(backend_config=config) + + captured = log_capture.getvalue() + # Should contain SDK debug messages + assert "DEBUG" in captured or "Initializing TrainerClient" in captured + + # Test 3: Different log levels (INFO vs DEBUG) + # Clear handlers and test INFO level + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + + log_capture = io.StringIO() + logging.basicConfig( + level=logging.INFO, stream=log_capture, format="%(levelname)s - %(name)s - %(message)s" + ) + + # Set kubeflow logger to INFO level to suppress DEBUG messages + kubeflow_logger = logging.getLogger("kubeflow") + kubeflow_logger.setLevel(logging.INFO) + + config = LocalProcessBackendConfig() + TrainerClient(backend_config=config) + + captured = log_capture.getvalue() + # INFO level should suppress DEBUG messages + assert "DEBUG" not in captured or len(captured) == 0 + + def test_application_integration_example(self): + """Test complete application integration example (replaces SDK integration demo).""" + # Import SDK components + import os + import tempfile + + from kubeflow.trainer import LocalProcessBackendConfig, TrainerClient + + with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: + temp_path = temp_file.name + + try: + # Clear any existing handlers + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + + # User sets up their application logging (file + console) + log_capture = io.StringIO() + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s - %(name)s - %(message)s", + handlers=[logging.StreamHandler(log_capture), logging.FileHandler(temp_path)], + ) + + # User's application logger + app_logger = logging.getLogger("my_app") + app_logger.info("Starting my application") + + # SDK calls will now respect user's logging configuration + app_logger.info("Creating TrainerClient...") + config = LocalProcessBackendConfig() + TrainerClient(backend_config=config) + + app_logger.info("Application completed") + + # Check console output + captured = log_capture.getvalue() + assert "Starting my application" in captured + assert "Creating TrainerClient..." in captured + assert "Application completed" in captured + + # Check file output + with open(temp_path) as f: + file_content = f.read() + assert "Starting my application" in file_content + assert "Creating TrainerClient..." in file_content + assert "Application completed" in file_content + + finally: + os.unlink(temp_path) + + +class TestStructuredFormatter: + """Test StructuredFormatter functionality.""" + + def test_structured_formatter_basic(self): + """Test basic StructuredFormatter functionality.""" + formatter = StructuredFormatter() + + # Create a log record + record = logging.LogRecord( + name="test.logger", + level=logging.INFO, + pathname="/test/path", + lineno=42, + msg="Test message", + args=(), + exc_info=None, + ) + + # Format the record + formatted = formatter.format(record) + + # Parse as JSON + log_data = json.loads(formatted) + + assert log_data["level"] == "INFO" + assert log_data["message"] == "Test message" + assert log_data["logger"] == "test.logger" + assert log_data["line"] == 42 + + def test_structured_formatter_with_exception(self): + """Test StructuredFormatter with exception information.""" + formatter = StructuredFormatter() + + try: + raise ValueError("Test exception") + except ValueError: + record = logging.LogRecord( + name="test.logger", + level=logging.ERROR, + pathname="/test/path", + lineno=42, + msg="Test message", + args=(), + exc_info=sys.exc_info(), + ) + + formatted = formatter.format(record) + log_data = json.loads(formatted) + + assert log_data["level"] == "ERROR" + assert "exception" in log_data + assert "ValueError: Test exception" in log_data["exception"]