Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions kubeflow/optimizer/api/optimizer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Iterator
import logging
from typing import Any, Optional

Expand Down Expand Up @@ -119,6 +120,46 @@ def get_job(self, name: str) -> OptimizationJob:

return self.backend.get_job(name=name)

def get_job_logs(
self,
name: str,
trial_name: Optional[str] = None,
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a specific trial of an OptimizationJob.

You can watch for the logs in realtime as follows:
```python
from kubeflow.optimizer import OptimizerClient

# Get logs from the best current trial
for logline in OptimizerClient().get_job_logs(name="n7fb28dbee94"):
print(logline)

# Get logs from a specific trial
for logline in OptimizerClient().get_job_logs(
name="n7fb28dbee94", trial_name="n7fb28dbee94-abc123", follow=True
):
print(logline)
```

Args:
name: Name of the OptimizationJob.
trial_name: Optional name of a specific Trial. If not provided, logs from the
current best trial are returned. If no best trial is available yet, logs
from the first trial are returned.
follow: Whether to stream logs in realtime as they are produced.

Returns:
Iterator of log lines.


Raises:
TimeoutError: Timeout to get an OptimizationJob.
RuntimeError: Failed to get an OptimizationJob.
"""
return self.backend.get_job_logs(name=name, trial_name=trial_name, follow=follow)

def wait_for_job_status(
self,
name: str,
Expand Down
10 changes: 10 additions & 0 deletions kubeflow/optimizer/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import abc
from collections.abc import Iterator
from typing import Any, Optional

from kubeflow.optimizer.constants import constants
Expand Down Expand Up @@ -47,6 +48,15 @@ def list_jobs(self) -> list[OptimizationJob]:
def get_job(self, name: str) -> OptimizationJob:
raise NotImplementedError()

@abc.abstractmethod
def get_job_logs(
self,
name: str,
trial_name: Optional[str],
follow: bool,
) -> Iterator[str]:
raise NotImplementedError()

@abc.abstractmethod
def wait_for_job_status(
self,
Expand Down
37 changes: 37 additions & 0 deletions kubeflow/optimizer/backends/kubernetes/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Iterator
import logging
import multiprocessing
import random
Expand Down Expand Up @@ -212,6 +213,42 @@ def get_job(self, name: str) -> OptimizationJob:
optimization_job = self.__get_experiment_cr(name)
return self.__get_optimization_job_from_cr(optimization_job)

def get_job_logs(
self,
name: str,
trial_name: Optional[str] = None,
follow: bool = False,
) -> Iterator[str]:
"""Get the OptimizationJob logs from a Trial"""
# Determine what trial to get logs from.
if trial_name is None:
# Get logs from the best current trial.
best_trial = self.get_best_trial(name)
if best_trial is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, if the best Trial is empty, let's take the first Trial from the OptimizationJob if list is not empty:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated in 66927dc

# Get first trial if available.
optimization_job = self.get_job(name)
if not optimization_job.trials:
return
trial_name = optimization_job.trials[0].name
else:
trial_name = best_trial.name
logger.debug(f"Getting logs from trial: {trial_name}")

# Get the Trial's Pod name.
pod_name = None
step = trainer_constants.NODE + "-0"
for c in self.trainer_backend.get_job(trial_name).steps:
if c.status != trainer_constants.POD_PENDING and c.name == step:
pod_name = c.pod_name
break
if pod_name is None:
return

container_name = constants.METRICS_COLLECTOR_CONTAINER
yield from self.trainer_backend._read_pod_logs(
pod_name=pod_name, container_name=container_name, follow=follow
)

def wait_for_job_status(
self,
name: str,
Expand Down
3 changes: 3 additions & 0 deletions kubeflow/optimizer/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
# The failed status of the OptimizationJob, defined when Experiment CR has failed condition.
OPTIMIZATION_JOB_FAILED = "Failed"

# The name of the Katib metrics collector sidecar container.
METRICS_COLLECTOR_CONTAINER = "metrics-logger-and-collector"

# Katib search space parameter types.
DOUBLE_PARAMETER = "double"
CATEGORICAL_PARAMETERS = "categorical"
56 changes: 31 additions & 25 deletions kubeflow/trainer/backends/kubernetes/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,31 +347,9 @@ def get_job_logs(

# Remove the number for the node step.
container_name = re.sub(r"-\d+$", "", step)
try:
if follow:
log_stream = watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
name=pod_name,
namespace=self.namespace,
container=container_name,
follow=True,
)

# Stream logs incrementally.
yield from log_stream # type: ignore
else:
logs = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=container_name,
)

yield from logs.splitlines()

except Exception as e:
raise RuntimeError(
f"Failed to read logs for the pod {self.namespace}/{pod_name}"
) from e
yield from self._read_pod_logs(
pod_name=pod_name, container_name=container_name, follow=follow
)

def wait_for_job_status(
self,
Expand Down Expand Up @@ -466,6 +444,34 @@ def __get_runtime_from_cr(
),
)

def _read_pod_logs(self, pod_name: str, container_name: str, follow: bool) -> Iterator[str]:
"""Read logs from a pod container."""
try:
if follow:
log_stream = watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
name=pod_name,
namespace=self.namespace,
container=container_name,
follow=True,
)

# Stream logs incrementally.
yield from log_stream # type: ignore
else:
logs = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=container_name,
)

yield from logs.splitlines()

except Exception as e:
raise RuntimeError(
f"Failed to read logs for the pod {self.namespace}/{pod_name}"
) from e

def __get_trainjob_from_cr(
self,
trainjob_cr: models.TrainerV1alpha1TrainJob,
Expand Down
Loading