Skip to content

Commit b20073a

Browse files
authored
Merge branch 'main' into ODSC-55290/update_job_byoc_api
2 parents da1a47b + 7e9b454 commit b20073a

File tree

6 files changed

+48
-10
lines changed

6 files changed

+48
-10
lines changed

ads/opctl/operator/lowcode/anomaly/const.py

+1
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,4 @@ class OutputColumns(str, metaclass=ExtendedEnumMeta):
9494

9595

9696
TODS_DEFAULT_MODEL = "ocsvm"
97+
SUBSAMPLE_THRESHOLD = 1000

ads/opctl/operator/lowcode/anomaly/model/base_model.py

+23-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from ads.common.object_storage_details import ObjectStorageDetails
1818
from ads.opctl import logger
19-
from ads.opctl.operator.lowcode.anomaly.const import OutputColumns, SupportedMetrics
19+
from ads.opctl.operator.lowcode.anomaly.const import OutputColumns, SupportedMetrics, SUBSAMPLE_THRESHOLD
2020
from ads.opctl.operator.lowcode.anomaly.utils import _build_metrics_df, default_signer
2121
from ads.opctl.operator.lowcode.common.utils import (
2222
disable_print,
@@ -79,7 +79,7 @@ def generate_report(self):
7979
anomaly_output, test_data, elapsed_time
8080
)
8181
table_blocks = [
82-
rc.DataTable(df, label=col, index=True)
82+
rc.DataTable(df.head(SUBSAMPLE_THRESHOLD) if self.spec.subsample_report_data and len(df) > SUBSAMPLE_THRESHOLD else df, label=col, index=True)
8383
for col, df in self.datasets.full_data_dict.items()
8484
]
8585
data_table = rc.Select(blocks=table_blocks)
@@ -94,20 +94,36 @@ def generate_report(self):
9494
anomaly_col = anomaly_output.get_anomalies_by_cat(category=target)[
9595
OutputColumns.ANOMALY_COL
9696
]
97+
anomaly_indices = [i for i, index in enumerate(anomaly_col) if index == 1]
98+
downsampled_time_col = time_col
99+
selected_indices = list(range(len(time_col)))
100+
if self.spec.subsample_report_data:
101+
non_anomaly_indices = [i for i in range(len(time_col)) if i not in anomaly_indices]
102+
# Downsample non-anomalous data if it exceeds the threshold (1000)
103+
if len(non_anomaly_indices) > SUBSAMPLE_THRESHOLD:
104+
downsampled_non_anomaly_indices = non_anomaly_indices[::len(non_anomaly_indices)//SUBSAMPLE_THRESHOLD]
105+
selected_indices = anomaly_indices + downsampled_non_anomaly_indices
106+
selected_indices.sort()
107+
downsampled_time_col = time_col[selected_indices]
108+
97109
columns = set(df.columns).difference({date_column})
98110
for col in columns:
99111
y = df[col].reset_index(drop=True)
112+
113+
downsampled_y = y[selected_indices]
114+
100115
fig, ax = plt.subplots(figsize=(8, 3), layout="constrained")
101116
ax.grid()
102-
ax.plot(time_col, y, color="black")
103-
for i, index in enumerate(anomaly_col):
104-
if index == 1:
105-
ax.scatter(time_col[i], y[i], color="red", marker="o")
117+
ax.plot(downsampled_time_col, downsampled_y, color="black")
118+
# Plot anomalies
119+
for i in anomaly_indices:
120+
ax.scatter(time_col[i], y[i], color="red", marker="o")
106121
plt.xlabel(date_column)
107122
plt.ylabel(col)
108123
plt.title(f"`{col}` with reference to anomalies")
109124
figure_blocks.append(rc.Widget(ax))
110-
blocks.append(rc.Group(*figure_blocks, label=target))
125+
126+
blocks.append(rc.Group(*figure_blocks, label=target))
111127
plots = rc.Select(blocks)
112128

113129
report_sections = []

ads/opctl/operator/lowcode/anomaly/operator_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class AnomalyOperatorSpec(DataClassSerializable):
7777
model: str = None
7878
model_kwargs: Dict = field(default_factory=dict)
7979
contamination: float = None
80+
subsample_report_data: bool = None
8081

8182
def __post_init__(self):
8283
"""Adjusts the specification details."""

ads/opctl/operator/lowcode/anomaly/schema.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -377,4 +377,8 @@ spec:
377377
type: dict
378378
required: false
379379

380+
subsample_report_data:
381+
type: boolean
382+
required: false
383+
380384
type: dict

ads/pipeline/ads_pipeline_run.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8; -*-
33

4-
# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
4+
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66
import copy
77
import logging
@@ -689,6 +689,16 @@ def _build_filter_expression(self, steps: List = [], log_type: str = None) -> st
689689
sources = []
690690
subjects = []
691691
skipped_step_list = []
692+
693+
is_service_logging_enabled = False
694+
try:
695+
if self.service_logging:
696+
is_service_logging_enabled = True
697+
except LogNotConfiguredError:
698+
logger.warning(
699+
"Service log is not configured for pipeline. Streaming custom log."
700+
)
701+
692702
for step_run in self.step_runs:
693703
if not steps or (step_run.step_name in steps):
694704
step_name = step_run.step_name
@@ -703,7 +713,8 @@ def _build_filter_expression(self, steps: List = [], log_type: str = None) -> st
703713
subjects.append(f"subject = '{step_name}'")
704714
else:
705715
sources.append(f"source = '*{job_run_id}'")
706-
subjects.append(f"subject = '{step_name}'")
716+
if is_service_logging_enabled:
717+
subjects.append(f"subject = '{step_name}'")
707718
else:
708719
subjects.append(f"subject = '{step_name}'")
709720

tests/unitary/default_setup/pipeline/test_pipeline_run.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright (c) 2023 Oracle and/or its affiliates.
3+
# Copyright (c) 2024 Oracle and/or its affiliates.
44
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
55

66
from datetime import datetime
@@ -391,6 +391,8 @@ def test_stream_log(self, mock_stream, mock_sync):
391391
**PIPELINE_RUN_LOG_DETAILS
392392
)
393393
pipeline_run.time_accepted = datetime.now()
394+
service_logging = OCILog()
395+
pipeline_run._set_service_logging_resource(service_logging)
394396
pipeline_run._PipelineRun__stream_log(
395397
ConsolidatedLog(OCILog()),
396398
[custom_script_step.step_name, ml_job_step.step_name],
@@ -749,6 +751,9 @@ def test_build_filter_expression(self):
749751
ml_job_step_without_job_run_id,
750752
]
751753

754+
service_logging = OCILog()
755+
pipeline_run._set_service_logging_resource(service_logging)
756+
752757
consolidated_log_expression = pipeline_run._build_filter_expression()
753758

754759
assert (

0 commit comments

Comments
 (0)