diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/__init__.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/__init__.py index da02caae218c..2a3510cb5ba6 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/__init__.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/__init__.py @@ -3,8 +3,15 @@ # --------------------------------------------------------- from .eval_run_context import EvalRunContext from .code_client import CodeClient -from .proxy_client import ProxyClient +from .proxy_client import ProxyClient, ProxyRun +from ._run_submitter_client import RunSubmitterClient from .target_run_context import TargetRunContext -from .proxy_client import ProxyRun -__all__ = ["CodeClient", "ProxyClient", "EvalRunContext", "TargetRunContext", "ProxyRun"] +__all__ = [ + "CodeClient", + "ProxyClient", + "EvalRunContext", + "TargetRunContext", + "ProxyRun", + "RunSubmitterClient", +] diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py new file mode 100644 index 000000000000..b3165e9f4b26 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py @@ -0,0 +1,104 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import logging +import pandas as pd +import sys +from collections import defaultdict +from concurrent.futures import Future, ThreadPoolExecutor +from os import PathLike +from typing import Any, Callable, Dict, Final, List, Mapping, Optional, Sequence, Union, cast + +from .batch_clients import BatchClientRun, HasAsyncCallable +from ..._legacy._batch_engine._run_submitter import RunSubmitter +from ..._legacy._batch_engine._config import BatchEngineConfig +from ..._legacy._batch_engine._run import Run + + +LOGGER = logging.getLogger(__name__) + + +class RunSubmitterClient: + def __init__(self, config: Optional[BatchEngineConfig] = None) -> None: + self._config = config or BatchEngineConfig(LOGGER, use_async=True) + self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread") + + def run( + self, + flow: Callable, + data: Union[str, PathLike, pd.DataFrame], + column_mapping: Optional[Dict[str, str]] = None, + evaluator_name: Optional[str] = None, + **kwargs: Any, + ) -> BatchClientRun: + if not isinstance(data, pd.DataFrame): + # Should never get here + raise ValueError("Data must be a pandas DataFrame") + if not column_mapping: + raise ValueError("Column mapping must be provided") + + # The column mappings are index by data to indicate they come from the data + # input. Update the inputs so that each entry is a dictionary with a data key + # that contains the original input data. + inputs = [{"data": input_data} for input_data in data.to_dict(orient="records")] + + # always uses async behind the scenes + if isinstance(flow, HasAsyncCallable): + flow = flow._to_async() # pylint: disable=protected-access + + run_submitter = RunSubmitter(self._config) + run_future = self._thread_pool.submit( + run_submitter.submit, + dynamic_callable=flow, + inputs=inputs, + column_mapping=column_mapping, + name_prefix=evaluator_name, + created_on=kwargs.pop("created_on", None), + storage_creator=kwargs.pop("storage_creator", None), + **kwargs, + ) + + return run_future + + def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pd.DataFrame: + run = self._get_run(client_run) + + data: Dict[str, List[Any]] = defaultdict(list) + stop_at: Final[int] = self._config.default_num_results if not all_results else sys.maxsize + + def _update(prefix: str, items: Sequence[Mapping[str, Any]]) -> None: + for i, line in enumerate(items): + if i >= stop_at: + break + for k, value in line.items(): + key = f"{prefix}.{k}" + data[key].append(value) + + _update("inputs", run.inputs) + _update("outputs", run.outputs) + + df = pd.DataFrame(data).reindex(columns=[k for k in data.keys()]) + return df + + def get_metrics(self, client_run: BatchClientRun) -> Dict[str, Any]: + run = self._get_run(client_run) + return dict(run.metrics) + + def get_run_summary(self, client_run: BatchClientRun) -> Dict[str, Any]: + run = self._get_run(client_run) + + total_lines = run.result.total_lines if run.result else 0 + failed_lines = run.result.failed_lines if run.result else 0 + + return { + "status": run.status.value, + "duration": str(run.duration), + "completed_lines": total_lines - failed_lines, + "failed_lines": failed_lines, + # "log_path": "", + } + + @staticmethod + def _get_run(run: BatchClientRun) -> Run: + return cast(Future[Run], run).result() diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/batch_clients.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/batch_clients.py new file mode 100644 index 000000000000..700bd0b1a72f --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/batch_clients.py @@ -0,0 +1,82 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import pandas +from os import PathLike +from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, Union, runtime_checkable + + +class BatchClientRun(Protocol): + """The protocol for the batch client run.""" + + pass + + +@runtime_checkable +class HasAsyncCallable(Protocol): + """The protocol for an object that has an async callable.""" + + def _to_async(self) -> Callable[[Any, Any], Awaitable[Any]]: ... + + +class BatchClient(Protocol): + """The protocol for the batch client. This allows for running a flow on a data source + and getting the details of the run.""" + + def run( + self, + flow: Callable, + data: Union[str, PathLike, pandas.DataFrame], + column_mapping: Optional[Dict[str, str]] = None, + evaluator_name: Optional[str] = None, + **kwargs: Any, + ) -> BatchClientRun: + """Run the given flow on the data with the given column mapping. + + :param flow: The flow to run. + :type flow: Union[Callable, HasAsyncCallable] + :param data: The JSONL file containing the data to run the flow on, + or the loaded data + :type data: Union[str, PathLike] + :param column_mapping: The column mapping to use. + :type column_mapping: Mapping[str, str] + :param name: The name of the run. + :type name: Optional[str] + :param kwargs: Additional keyword arguments to pass to the flow. + :return: The result of the batch client run. + :rtype: BatchClientRun + """ + ... + + def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pandas.DataFrame: + """Get the details of the run. + + :param client_run: The run to get the details of. + :type client_run: BatchClientRun + :param all_results: Whether to get all results. + :type all_results: bool + :return: The details of the run. + :rtype: pandas.DataFrame + """ + ... + + def get_metrics(self, client_run: BatchClientRun) -> Dict[str, Any]: + """Get the metrics of the run. + + :param client_run: The run to get the metrics of. + :type client_run: BatchClientRun + :return: The metrics of the run. + :rtype: Mapping[str, Any] + """ + ... + + def get_run_summary(self, client_run: BatchClientRun) -> Dict[str, Any]: + """Get the summary of the run. + + :param client_run: The run to get the summary of. + :type client_run: BatchClientRun + :return: The summary of the run. + :rtype: Mapping[str, Any] + """ + ... diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/__init__.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/__init__.py similarity index 100% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/__init__.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/__init__.py diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/__init__.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/__init__.py new file mode 100644 index 000000000000..fe4d7b2124fc --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/__init__.py @@ -0,0 +1,9 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# NOTE: This is a direct port of the bare minimum needed for BatchEngine functionality from +# the original Promptflow code. The goal here is expediency, not elegance. As such +# parts of this code may be a little "quirky", seem incomplete in places, or contain +# longer TODOs comments than usual. In a future code update, large swaths of this code +# will be refactored or deleted outright. diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_config.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_config.py new file mode 100644 index 000000000000..749f92f5ead4 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_config.py @@ -0,0 +1,45 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from dataclasses import dataclass +from logging import Logger + +from ..._constants import PF_BATCH_TIMEOUT_SEC_DEFAULT + + +@dataclass +class BatchEngineConfig: + """Context for a batch of evaluations. This will contain the configuration, + logging, and other needed information.""" + + logger: Logger + """The logger to use for logging messages.""" + + batch_timeout_seconds: int = PF_BATCH_TIMEOUT_SEC_DEFAULT + """The maximum amount of time to wait for all evaluations in the batch to complete.""" + + run_timeout_seconds: int = 600 + """The maximum amount of time to wait for an evaluation to run against a single entry + in the data input to complete.""" + + max_concurrency: int = 10 + """The maximum number of evaluations to run concurrently.""" + + use_async: bool = True + """Whether to use asynchronous evaluation.""" + + default_num_results: int = 100 + """The default number of results to return if you don't ask for all results.""" + + def __post_init__(self): + if self.logger is None: + raise ValueError("logger cannot be None") + if self.batch_timeout_seconds <= 0: + raise ValueError("batch_timeout_seconds must be greater than 0") + if self.run_timeout_seconds <= 0: + raise ValueError("run_timeout_seconds must be greater than 0") + if self.max_concurrency <= 0: + raise ValueError("max_concurrency must be greater than 0") + if self.default_num_results <= 0: + raise ValueError("default_num_results must be greater than 0") diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_engine.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_engine.py new file mode 100644 index 000000000000..b3146d285ee0 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_engine.py @@ -0,0 +1,368 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# This contains code merged together from the following files: +# promptflow-devkit/promptflow/batch/_batch_engine.py +# promptflow-devkit/promptflow/_proxy/_python_executor_proxy.py +# promptflow-core/promptflow/executor/_script_executor.py +# TODO ralphe: The way this code does batch execution needs to be improved. For now +# porting over the code largely as is to remove the Promptflow dependency +# as quickly as possible. In phase 2 this code will be heavily refactored. + +import re +import asyncio +from math import floor +from asyncio import Semaphore +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, Final, Generator, Mapping, MutableMapping, Optional, Sequence, Set, Tuple +from uuid import uuid4 + +from ._utils import get_int_env_var, get_value_from_path +from ._status import BatchStatus +from ._result import BatchResult, BatchRunDetails, BatchRunError, TokenMetrics +from ._run_storage import AbstractRunStorage, NoOpRunStorage +from ._logging import log_progress, NodeLogManager +from ..._exceptions import ErrorBlame +from ._exceptions import ( + BatchEngineCanceledError, + BatchEngineError, + BatchEngineRunFailedError, + BatchEngineTimeoutError, + BatchEngineValidationError, +) +from ._utils_deprecated import ( + async_run_allowing_running_loop, + convert_eager_flow_output_to_dict, +) + + +MAX_WORKER_COUNT: Final[int] = 10 +KEYWORD_PATTERN: Final = re.compile(r"^\${([^{}]+)}$") + + +class BatchEngine: + """This class is used to execute flows in batch mode""" + + def __init__( + self, + executor: Callable, + *, + storage: Optional[AbstractRunStorage] = None, + batch_timeout_sec: Optional[int] = None, + line_timeout_sec: Optional[int] = None, + max_worker_count: Optional[int] = None, + **kwargs: Any, + ): + """Create a new batch engine instance + + :param Callable executor: The executor to run the flow + :param Optional[AbstractRunStorage] storage: The storage to store execution results + :param Optional[int] batch_timeout_sec: The timeout of batch run in seconds + :param Optional[int] line_timeout_sec: The timeout of each line in seconds + :param Optional[int] max_worker_count: The concurrency limit of batch run + :param kwargs: The keyword arguments related to creating the executor proxy class + :type kwargs: Any + """ + + self._executor = executor + # self._working_dir = working_dir + + # self._is_eager_flow = True + # self._is_prompty_flow = False + # self._program_language = FlowLanguage.Python + # self._message_format = MessageFormatType.BASIC + # self._multimedia_processor = MultimediaProcessor.create(self._message_format) + # self._connections = {} + + self._storage: AbstractRunStorage = storage or NoOpRunStorage() + + # TODO ralphe: Consume these from the batch context/config instead of from + # kwargs or (even worse) environment variables + # self._batch_use_async = kwargs.get("batch_use_async", True) + self._batch_timeout_sec = batch_timeout_sec or get_int_env_var("PF_BATCH_TIMEOUT_SEC") + self._line_timeout_sec = line_timeout_sec or get_int_env_var("PF_LINE_TIMEOUT_SEC", 600) + self._max_worker_count = max_worker_count or get_int_env_var("PF_WORKER_COUNT") or MAX_WORKER_COUNT + # update kwargs with worker_count and line_timeout_sec + kwargs.update({"worker_count": self._max_worker_count, "line_timeout_sec": self._line_timeout_sec}) + + self._is_canceled: bool = False + self._kwargs: Mapping[str, Any] = kwargs + # self._init_kwargs: Mapping[str, Any] = init_kwargs or {} + + def run( + self, + data: Sequence[Mapping[str, Any]], + column_mapping: Mapping[str, str], + *, + id: Optional[str] = None, + max_lines: Optional[int] = None, + ) -> BatchResult: + if not data: + raise BatchEngineValidationError("Please provide a non-empty data mapping.") + if not column_mapping: + raise BatchEngineValidationError("The column mapping is required.") + + start_time = datetime.now(timezone.utc) + + batch_inputs = self._apply_column_mapping(data, column_mapping, max_lines) + if not batch_inputs or all(len(data) == 0 for data in batch_inputs): + raise BatchEngineValidationError("No data to process.") + + try: + id = id or str(uuid4()) + + result: BatchResult = async_run_allowing_running_loop(self._exec_in_task, id, batch_inputs, start_time) + + return result + except Exception as ex: + raise BatchEngineError( + "Unexpected error while running the batch run.", blame=ErrorBlame.SYSTEM_ERROR + ) from ex + + def cancel(self): + # TODO ralphe: Make sure this works + self._is_canceled = True + + @staticmethod + def _apply_column_mapping( + data: Sequence[Mapping[str, Any]], + column_mapping: Mapping[str, str], + max_lines: Optional[int], + ) -> Sequence[Mapping[str, str]]: + data = data[:max_lines] if max_lines else data + + inputs: Sequence[Mapping[str, Any]] = [] + line: int = 0 + + for input in data: + line += 1 + mapped: Dict[str, Any] = {} + missing_inputs: Set[str] = set() + + for key, value in column_mapping.items(): + if not isinstance(value, str): + # All non-string values are literal values. + mapped[key] = value + continue + + match: Optional[re.Match[str]] = re.search(KEYWORD_PATTERN, value) + if match is None: + # Literal string value value + mapped[key] = value + continue + + dict_path = match.group(1) + found, value = get_value_from_path(dict_path, input) + if found: + mapped[key] = value + else: + missing_inputs.add(dict_path) + + if missing_inputs: + missing = ", ".join(missing_inputs) + raise BatchEngineValidationError(f"Missing inputs for line {line}: '{missing}'") + + inputs.append(mapped) + + return inputs + + async def _exec_in_task( + self, run_id: str, batch_inputs: Sequence[Mapping[str, Any]], start_time: datetime + ) -> BatchResult: + # Since the batch execution is not guaranteed to be completed in the same order + # as the inputs, we keep track of these in a mapping from index to result + results: Dict[int, BatchRunDetails] = {} + status: BatchStatus = BatchStatus.Completed + error: Optional[Exception] = None + + task = asyncio.create_task(self._exec_batch(run_id, batch_inputs, start_time, results)) + + while not task.done(): + # check whether the task is completed or canceled every 1s + await asyncio.sleep(1) + if self._is_canceled: + task.cancel() + # use current completed line results and aggregation results to create a BatchResult + status = BatchStatus.Canceled + error = BatchEngineCanceledError("The batch run is canceled by user.") + break + elif self._batch_timeout_expired(start_time): + task.cancel() + status = BatchStatus.Failed + error = BatchEngineTimeoutError( + f"The batch run failed due to timeout [{self._batch_timeout_sec}s]. " + f"Please adjust the timeout to a higher value." + ) + break + + end_time = datetime.now(timezone.utc) + metrics = TokenMetrics(0, 0, 0) + failed_lines: int = 0 + + # generate the details in the same order as the inputs and fill in the missing results + # with a failed status + result_details = [ + ( + results[i] + if i in results + else BatchRunDetails( + id=BatchRunDetails.create_id(run_id, i), + status=BatchStatus.Failed, + result=None, + start_time=None, + end_time=None, + tokens=TokenMetrics(0, 0, 0), + error=BatchRunError("The line run is not completed.", None), + ) + ) + for i in range(len(batch_inputs)) + ] + + for line_result in result_details: + # Indicate the worst status of the batch run. This works because + # canceled and failed have a higher value than completed. + status = max(status, line_result.status) + if BatchStatus.is_failed(line_result.status): + failed_lines += 1 + if line_result.tokens: + metrics.prompt_tokens += line_result.tokens.prompt_tokens + metrics.completion_tokens += line_result.tokens.completion_tokens + metrics.total_tokens += line_result.tokens.total_tokens + + if failed_lines and not error: + error = BatchEngineRunFailedError( + str(floor(failed_lines / len(batch_inputs) * 100)) + f"% of the batch run failed." + ) + + return BatchResult( + status=status, + total_lines=len(batch_inputs), + failed_lines=failed_lines, + start_time=start_time, + end_time=end_time, + tokens=metrics, + details=result_details, + error=error, + ) + + async def _exec_batch( + self, + run_id: str, + batch_inputs: Sequence[Mapping[str, Any]], + start_time: datetime, + results: MutableMapping[int, BatchRunDetails], + ) -> None: + semaphore: Semaphore = Semaphore(self._max_worker_count) + + # TODO ralphe: This async code needs to refactored to use e.g. asyncio.gather, or + # asyncio.as_completed. + # TODO ralphe: This code needs to handle cancellation better + async def create_under_semaphore(index: int, inputs: Mapping[str, Any]): + async with semaphore: + return await self._exec_line_async(run_id, inputs, index) + + pending = [ + asyncio.create_task(create_under_semaphore(index, inputs)) for index, inputs in enumerate(batch_inputs) + ] + + total_lines: int = len(batch_inputs) + completed_lines: int = 0 + while completed_lines < total_lines: + # TODO ralphe: Fix this code so it doesn't re-order the outputs + # wait for any task to complete + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + completed_line_results = [task.result() for task in done] + # persist node run infos and flow run info in line result to storage + self._persist_run_info([result for _, result in completed_line_results]) + results.update({index: result for index, result in completed_line_results}) + # update the progress log + completed_lines += len(completed_line_results) + log_progress( + run_start_time=start_time, + total_count=total_lines, + current_count=completed_lines, + # TODO ralphe: set logger to use here + ) + + async def _exec_line_async( + self, + run_id: str, + inputs: Mapping[str, Any], + index: int, + ) -> Tuple[int, BatchRunDetails]: + with self._exec_line_context(run_id, index): + details: BatchRunDetails = BatchRunDetails( + id=f"{run_id}_{index}", + status=BatchStatus.NotStarted, + result=None, + start_time=datetime.now(timezone.utc), + end_time=None, + tokens=TokenMetrics(0, 0, 0), + error=None, + ) + + try: + # TODO ralphe: Handle line timeouts here + output: Any = await self._executor(**inputs) + details.status = BatchStatus.Completed + details.result = convert_eager_flow_output_to_dict(output) + + # TODO figure out how to get the token metrics here + except Exception as ex: + details.status = BatchStatus.Failed + details.error = BatchRunError( + f"Error while evaluating single input: {ex.__class__.__name__}: {str(ex)}", ex + ) + finally: + details.end_time = datetime.now(timezone.utc) + + return index, details + + def _persist_run_info(self, line_results: Sequence[BatchRunDetails]): + # TODO ralphe: implement? + pass + + def _batch_timeout_expired(self, start_time: datetime) -> bool: + if self._batch_timeout_sec is None: + return False + return (datetime.now(timezone.utc) - start_time).total_seconds() > self._batch_timeout_sec + + @contextmanager + def _exec_line_context(self, run_id: str, line_number: int) -> Generator[None, Any, None]: + # TODO ralphe: Do proper tracing and logging here + log_manager = NodeLogManager() + log_manager.set_node_context(run_id, "Flex", line_number) + with log_manager, self._update_operation_context(run_id, line_number): + yield + + @contextmanager + def _update_operation_context(self, run_id: str, line_number: int) -> Generator[None, Any, None]: + # operation_context = OperationContext.get_instance() + # original_context = operation_context.copy() + # original_mode = operation_context.get("run_mode", RunMode.Test.name) + # values_for_context = {"flow_id": self._flow_id, "root_run_id": run_id} + # if original_mode == RunMode.Batch.name: + # values_for_otel = { + # "batch_run_id": run_id, + # "line_number": line_number, + # } + # else: + # values_for_otel = {"line_run_id": run_id} + # try: + # append_promptflow_package_ua(operation_context) + # operation_context.set_execution_target(execution_target=self._execution_target) + # operation_context.set_default_tracing_keys(DEFAULT_TRACING_KEYS) + # operation_context.run_mode = original_mode + # operation_context.update(values_for_context) + # for k, v in values_for_otel.items(): + # operation_context._add_otel_attributes(k, v) + # # Inject OpenAI API to make sure traces and headers injection works and + # # update OpenAI API configs from environment variables. + # inject_openai_api() + yield + + # finally: + # OperationContext.set_instance(original_context) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_exceptions.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_exceptions.py new file mode 100644 index 000000000000..d158e018cfe0 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_exceptions.py @@ -0,0 +1,88 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from ..._exceptions import ErrorCategory, ErrorBlame, ErrorTarget, EvaluationException + + +class BatchEngineError(EvaluationException): + """Exception class for batch engine errors. + + This exception is used to indicate that the error was caused by or in the batch engine. + + :param message: The error message. + :type message: str + """ + + def __init__(self, message: str, **kwargs): + kwargs.setdefault("category", ErrorCategory.FAILED_EXECUTION) + kwargs.setdefault("target", ErrorTarget.EVAL_RUN) + kwargs.setdefault("blame", ErrorBlame.UNKNOWN) + + super().__init__(message, **kwargs) + + +class BatchEngineValidationError(BatchEngineError): + """Exception raised when validation fails + + :param message: The error message. + :type message: str + """ + + def __init__(self, message: str, **kwargs): + kwargs.setdefault("category", ErrorCategory.INVALID_VALUE) + kwargs.setdefault("blame", ErrorBlame.USER_ERROR) + super().__init__(message, **kwargs) + + +class BatchEngineTimeoutError(BatchEngineError): + """Exception raised when a batch engine operation times out. + + :param message: The error message. + :type message: str + """ + + def __init__(self, message: str, **kwargs): + kwargs.setdefault("category", ErrorCategory.FAILED_EXECUTION) + kwargs.setdefault("blame", ErrorBlame.SYSTEM_ERROR) + super().__init__(message, **kwargs) + + +class BatchEngineCanceledError(BatchEngineError): + """Exception raised when a batch engine operation is canceled. + + :param message: The error message. + :type message: str + """ + + def __init__(self, message: str, **kwargs): + kwargs.setdefault("category", ErrorCategory.FAILED_EXECUTION) + kwargs.setdefault("blame", ErrorBlame.USER_ERROR) + super().__init__(message, **kwargs) + + +class BatchEngineRunFailedError(BatchEngineError): + """Exception raised when a batch engine run fails. + + :param message: The error message. + :type message: str + """ + + def __init__(self, message: str, **kwargs): + kwargs.setdefault("category", ErrorCategory.FAILED_EXECUTION) + kwargs.setdefault("blame", ErrorBlame.SYSTEM_ERROR) + super().__init__(message, **kwargs) + + +class BatchEnginePartialError(BatchEngineError): + """Exception raised when a batch engine run has some successfull lines, mixed in + with some failures. + + :param message: The error message. + :type message: str + """ + + def __init__(self, message: str, **kwargs): + kwargs.setdefault("category", ErrorCategory.FAILED_EXECUTION) + kwargs.setdefault("blame", ErrorBlame.SYSTEM_ERROR) + super().__init__(message, **kwargs) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_logging.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_logging.py new file mode 100644 index 000000000000..9d6a5507aaf9 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_logging.py @@ -0,0 +1,292 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# Original source: +# - promptflow-core/promptflow/_core/log_manager.py +# - promptflow-core/promptflow/_utils/logger_utils.py + +import os +import logging +import re +import sys +from re import Pattern +from contextvars import ContextVar +from datetime import datetime, timezone +from dataclasses import dataclass +from io import StringIO, TextIOBase +from typing import Any, Dict, Final, Mapping, Optional, Set, TextIO, Tuple, Union + + +valid_logging_level: Final[Set[str]] = {"CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG", "NOTSET"} + + +def get_pf_logging_level(default=logging.INFO): + logging_level = os.environ.get("PF_LOGGING_LEVEL", None) + if logging_level not in valid_logging_level: + # Fall back to info if user input is invalid. + logging_level = default + return logging_level + + +def _get_format_for_logger( + default_log_format: Optional[str] = None, default_date_format: Optional[str] = None +) -> Tuple[str, str]: + """ + Get the logging format and date format for logger. + + This function attempts to find the handler of the root logger with a configured formatter. + If such a handler is found, it returns the format and date format used by this handler. + This can be configured through logging.basicConfig. If no configured formatter is found, + it defaults to LOG_FORMAT and DATETIME_FORMAT. + """ + log_format = ( + os.environ.get("PF_LOG_FORMAT") + or default_log_format + or "%(asctime)s %(thread)7d %(name)-18s %(levelname)-8s %(message)s" + ) + datetime_format = os.environ.get("PF_LOG_DATETIME_FORMAT") or default_date_format or "%Y-%m-%d %H:%M:%S %z" + return log_format, datetime_format + + +def get_logger(name: str) -> logging.Logger: + """Get logger used during execution.""" + logger = logging.Logger(name) + logger.setLevel(get_pf_logging_level()) + # logger.addHandler(FileHandlerConcurrentWrapper()) + stdout_handler = logging.StreamHandler(sys.stdout) + fmt, datefmt = _get_format_for_logger() + # TODO ralphe: Do we need a credentials scrubber here like the old code had? We are not logging + # logging anything that sensitive here. + stdout_handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt)) + logger.addHandler(stdout_handler) + return logger + + +def scrub_credentials(s: str): + """Scrub credentials in string s. + + For example, for input string: "print accountkey=accountKey", the output will be: + "print accountkey=**data_scrubbed**" + """ + # for h in logger.handlers: + # if isinstance(h, FileHandlerConcurrentWrapper): + # if h.handler and h.handler._formatter: + # credential_scrubber = h.handler._formatter.credential_scrubber + # if credential_scrubber: + # return credential_scrubber.scrub(s) + return CredentialScrubber.scrub(s) + + +class CredentialScrubber: + """Scrub sensitive information in string.""" + + PLACE_HOLDER = "**data_scrubbed**" + LENGTH_THRESHOLD = 2 + DEFAULT_REGEX_SET: Final[Set[Pattern[str]]] = { + re.compile(r"(?<=sig=)[^\s;&]+", flags=re.IGNORECASE), # Replace signature. + re.compile(r"(?<=key=)[^\s;&]+", flags=re.IGNORECASE), # Replace key. + } + + @staticmethod + def scrub(input: str) -> str: + """Replace sensitive information in input string with PLACE_HOLDER. + + For example, for input string: "print accountkey=accountKey", the output will be: + "print accountkey=**data_scrubbed**" + """ + output = input + for regex in CredentialScrubber.DEFAULT_REGEX_SET: + output = regex.sub(CredentialScrubber.PLACE_HOLDER, output) + return output + + +# Logs by flow_logger will only be shown in flow mode. +# These logs should contain all detailed logs from executor and runtime. +flow_logger = get_logger("execution.flow") + +# Logs by bulk_logger will only be shown in bulktest and eval modes. +# These logs should contain overall progress logs and error logs. +bulk_logger = get_logger("execution.bulk") + +# Logs by logger will be shown in all the modes above, +# such as error logs. +logger = get_logger("execution") + + +def log_progress( + run_start_time: datetime, + total_count: int, + current_count: int, + logger: logging.Logger = bulk_logger, + formatter="Finished {count} / {total_count} lines.", +) -> None: + if current_count > 0: + delta = datetime.now(timezone.utc).timestamp() - run_start_time.timestamp() + average_execution_time = round(delta / current_count, 2) + estimated_execution_time = round(average_execution_time * (total_count - current_count), 2) + logger.info(formatter.format(count=current_count, total_count=total_count)) + logger.info( + f"Average execution time for completed lines: {average_execution_time} seconds. " + f"Estimated time for incomplete lines: {estimated_execution_time} seconds." + ) + + +def incremental_print(log: str, printed: int, fileout: Union[TextIO, Any]) -> int: + count = 0 + for line in log.splitlines(): + if count >= printed: + fileout.write(line + "\n") + printed += 1 + count += 1 + return printed + + +def print_red_error(message): + try: + from colorama import Fore, init + + init(autoreset=True) + print(Fore.RED + message) + except ImportError: + print(message) + + +@dataclass +class NodeInfo: + run_id: str + node_name: str + line_number: int + + +class NodeLogManager: + """Replace sys.stdout and sys.stderr with NodeLogWriter. + + This class intercepts and saves logs to stdout/stderr when executing a node. For example: + with NodeLogManager() as log_manager: + print('test stdout') + print('test stderr', file=sys.stderr) + + log_manager.get_logs() will return: {'stdout': 'test stdout\n', 'stderr': 'test stderr\n'} + """ + + def __init__(self, record_datetime: bool = True): + self.stdout_logger = NodeLogWriter(sys.stdout, record_datetime) + self.stderr_logger = NodeLogWriter(sys.stderr, record_datetime, is_stderr=True) + + def __enter__(self) -> "NodeLogManager": + """Replace sys.stdout and sys.stderr with NodeLogWriter.""" + self._prev_stdout = sys.stdout + self._prev_stderr = sys.stderr + sys.stdout = self.stdout_logger + sys.stderr = self.stderr_logger + return self + + def __exit__(self, *args) -> None: + """Restore sys.stdout and sys.stderr.""" + sys.stdout = self._prev_stdout + sys.stderr = self._prev_stderr + + def set_node_context(self, run_id: str, node_name: str, line_number: int) -> None: + """Set node context.""" + self.stdout_logger.set_node_info(run_id, node_name, line_number) + self.stderr_logger.set_node_info(run_id, node_name, line_number) + + def clear_node_context(self, run_id: str) -> None: + """Clear node context.""" + self.stdout_logger.clear_node_info(run_id) + self.stderr_logger.clear_node_info(run_id) + + def get_logs(self, run_id: str) -> Mapping[str, str]: + return { + "stdout": self.stdout_logger.get_log(run_id), + "stderr": self.stderr_logger.get_log(run_id), + } + + +class NodeLogWriter(TextIOBase): + """Record node run logs.""" + + DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z" + + def __init__(self, prev_stdout: Union[TextIOBase, Any], record_datetime: bool = True, is_stderr: bool = False): + self.run_id_to_stdout: Dict[str, StringIO] = {} + self._context: ContextVar[Optional[NodeInfo]] = ContextVar("run_log_info", default=None) + self._prev_out: Union[TextIOBase, Any] = prev_stdout + self._record_datetime: bool = record_datetime + self._is_stderr: bool = is_stderr + + def set_node_info(self, run_id: str, node_name: str, line_number: int) -> None: + """Set node info to a context variable. + + After set node info, write method will write to string IO associated with this node. + """ + run_log_info = NodeInfo(run_id, node_name, line_number) + self._context.set(run_log_info) + self.run_id_to_stdout.update({run_id: StringIO()}) + + def clear_node_info(self, run_id: str): + """Clear context variable associated with run id.""" + log_info: Optional[NodeInfo] = self._context.get() + if log_info and log_info.run_id == run_id: + self._context.set(None) + + if run_id in self.run_id_to_stdout: + self.run_id_to_stdout.pop(run_id) + + def get_log(self, run_id: str) -> str: + """Get log associated with run id.""" + string_io: Optional[StringIO] = self.run_id_to_stdout.get(run_id) + if string_io is None: + return "" + + return string_io.getvalue() + + def write(self, s: str) -> int: + """Override TextIO's write method and writes input string into a string IO + + The written string is compliant without any credentials. + The string is also recorded to flow/bulk logger. + If node info is not set, write to previous stdout. + """ + log_info: Optional[NodeInfo] = self._context.get() + s = scrub_credentials(s) # Remove credential from string. + if log_info is None: + return self._prev_out.write(s) + else: + self._write_to_flow_log(log_info, s) + stdout: Optional[StringIO] = self.run_id_to_stdout.get(log_info.run_id) + # When the line execution timeout is reached, all running nodes will be cancelled and node info will + # be cleared. This will remove StringIO from self.run_id_to_stdout. For sync tools running in a worker + # thread, they can't be stopped and self._context won't change in the worker + # thread because it's a thread-local variable. Therefore, we need to check if StringIO is None here. + if stdout is None: + return 0 + if self._record_datetime and s != "\n": # For line breaker, do not add datetime prefix. + s = f"[{datetime.now(timezone.utc).strftime(self.DATETIME_FORMAT)}] {s}" + return stdout.write(s) + + def flush(self): + """Override TextIO's flush method.""" + node_info: Optional[NodeInfo] = self._context.get() + if node_info is None: + self._prev_out.flush() + else: + string_io = self.run_id_to_stdout.get(node_info.run_id) + if string_io is not None: + string_io.flush() + + def _write_to_flow_log(self, log_info: NodeInfo, s: str): + """Save stdout log to flow_logger and stderr log to logger.""" + # If user uses "print('log message.')" to log, then + # "write" method will be called twice and the second time input is only '\n'. + # For this case, should not log '\n' in flow_logger. + if s != "\n": + if self._is_stderr: + flow_log = f"[{str(log_info)}] stderr> " + s.rstrip("\n") + # Log stderr in all scenarios so we can diagnose problems. + logger.warning(flow_log) + else: + flow_log = f"[{str(log_info)}] stdout> " + s.rstrip("\n") + # Log stdout only in flow mode. + flow_logger.info(flow_log) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_openai_injector.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_openai_injector.py new file mode 100644 index 000000000000..60dcc258c6f5 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_openai_injector.py @@ -0,0 +1,23 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# Original source code: promptflow-tracing/promptflow/tracing/_integrations/_openai_injector.py + + +def inject_openai_api(): + """This function: + 1. Modifies the create methods of the OpenAI API classes to inject logic before calling the original methods. + It stores the original methods as _original attributes of the create methods. + 2. Updates the openai api configs from environment variables. + """ + # TODO ralphe: Port function? + pass + + +def recover_openai_api(): + """This function restores the original create methods of the OpenAI API classes + by assigning them back from the _original attributes of the modified methods. + """ + # TODO ralphe: Port function? + pass diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_result.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_result.py new file mode 100644 index 000000000000..0e6c5e74f0a2 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_result.py @@ -0,0 +1,99 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any, Mapping, Optional, Sequence + +from ._status import BatchStatus + + +@dataclass +class TokenMetrics: + """The token metrics of a run.""" + + prompt_tokens: int + """The number of tokens used in the prompt for the run.""" + completion_tokens: int + """The number of tokens used in the completion for the run.""" + total_tokens: int + """The total number of tokens used in the run.""" + + +@dataclass +class BatchRunError: + """The error of a batch run.""" + + details: str + """The details of the error.""" + exception: Optional[BaseException] + """The exception of the error.""" + + +@dataclass +class BatchRunDetails: + """The error of a line in a batch run.""" + + id: str + """The ID of the line run.""" + status: BatchStatus + """The status of the line run.""" + result: Optional[Mapping[str, Any]] + """The result of the line run.""" + start_time: Optional[datetime] + """The start time of the line run. If this was never started, this should be None.""" + end_time: Optional[datetime] + """The end time of the line run. If this never completed, this should be None.""" + tokens: TokenMetrics + """The token metrics of the line run.""" + error: Optional[BatchRunError] + """The error of the line run. This will only be set if the status is Failed.""" + + @property + def duration(self) -> timedelta: + """The duration of the line run.""" + if self.start_time is not None and self.end_time is not None: + return self.end_time - self.start_time + return timedelta(0) + + @staticmethod + def create_id(run_id: str, index: int) -> str: + """Helper method to create the ID for a line run.""" + return f"{run_id}_{index}" + + +@dataclass +class BatchResult: + """The result of a batch run.""" + + status: BatchStatus + """The overall status of the batch run.""" + total_lines: int + """The total number of lines in the batch run.""" + failed_lines: int + """The number of failed lines in the batch run.""" + start_time: datetime + """The start time of the batch run.""" + end_time: datetime + """The end time of the batch run.""" + tokens: TokenMetrics + """The overall token metrics of the batch run.""" + details: Sequence[BatchRunDetails] + """The details of each line in the batch run.""" + error: Optional[Exception] = None + """The error of the batch run. This will only be set if the status does not indicate success.""" + + @property + def duration(self) -> timedelta: + """The duration of the batch run.""" + if self.start_time is not None and self.end_time is not None: + return self.end_time - self.start_time + return timedelta(0) + + @property + def results(self) -> Sequence[Optional[Mapping[str, Any]]]: + """The results of the batch run.""" + if not self.details: + return [] + return [d.result for d in self.details] \ No newline at end of file diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run.py new file mode 100644 index 000000000000..4eee34b37c60 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run.py @@ -0,0 +1,121 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from uuid import uuid4 +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Any, Callable, Mapping, Optional, Sequence + +from ._utils import normalize_identifier_name +from ._result import BatchResult +from ._status import BatchStatus + + +class RunStatus(Enum): + # TODO ralphe: Trim this to just the statuses we need + # QUEUED = "Queued" + NOT_STARTED = "NotStarted" + PREPARING = "Preparing" + # PROVISIONING = "Provisioning" + # STARTING = "Starting" + RUNNING = "Running" + # CANCEL_REQUESTED = "CancelRequested" + CANCELED = "Canceled" + # FINALIZING = "Finalizing" + COMPLETED = "Completed" + FAILED = "Failed" + # UNAPPROVED = "Unapproved" + # NOTRESPONDING = "NotResponding" + # PAUSING = "Pausing" + # PAUSED = "Paused" + + @staticmethod + def from_batch_result_status(status: BatchStatus) -> "RunStatus": + if status == BatchStatus.NotStarted: + return RunStatus.NOT_STARTED + if status == BatchStatus.Running: + return RunStatus.RUNNING + if status == BatchStatus.Completed: + return RunStatus.COMPLETED + if status == BatchStatus.Canceled: + return RunStatus.CANCELED + if status == BatchStatus.Failed: + return RunStatus.FAILED + + return RunStatus.FAILED + + +class Run: + """The equivalent of a Promptflow Run + promptflow-devkit/promptflow/_sdk/entities/_run.py + + THIS WILL BE REMOVED IN A FUTURE CODE UPDATE""" + + def __init__( + self, + *, + dynamic_callable: Callable, + name_prefix: Optional[str], + inputs: Sequence[Mapping[str, Any]], + column_mapping: Mapping[str, str], + created_on: Optional[datetime] = None, + ): + self._status: RunStatus = RunStatus.NOT_STARTED + self._created_on = created_on or datetime.now(timezone.utc) + self._start_time: Optional[datetime] = None + self._end_time: Optional[datetime] = None + + self.dynamic_callable = dynamic_callable + self.name = self._generate_run_name(name_prefix, self._created_on) + self.inputs = inputs + self.column_mapping = column_mapping + self.result: Optional[BatchResult] = None + self.metrics: Mapping[str, Any] = {} + + # self._use_remote_flow = False + # self._from_flex_flow = True + # self._from_prompty = False + # self.flow = path to pointless flow file + # self._experiment_name = name of folder containing pointless flow file + # self._lineage_id = basically equivalent to a hex digest of the SHA256 hash of: + # f"{uuid.getnod()}/{posix_full_path_to_pointless_folder}" + # self._output_path = Path("/.promptflow/runs/") + # self._flow_name = name of pointless folder + + @property + def status(self) -> RunStatus: + return self._status + + @property + def created_on(self) -> datetime: + return self._created_on + + @property + def duration(self) -> Optional[timedelta]: + if self._start_time is None or self._end_time is None: + return None + + return self._end_time - self._start_time + + @property + def outputs(self) -> Sequence[Mapping[str, Any]]: + if self.result is None: + return [] + + return [value or {} for value in self.result.results] + + @staticmethod + def _generate_run_name(name_prefix: Optional[str], creation_time: datetime) -> str: + # The Promptflow code looked at the folder name of the temporary folder used to + # store the temporary flow YAML file which was a single entry that told it look + # at the passed in dynamic_callable. Example folder name: + # azure_ai_evaluation_evaluators_common_base_eval_asyncevaluatorbase_l82059h3 + # instead we will use the passed in name_prefix or use a UUID (which is equally + # opaque as what the original code did) + if not name_prefix: + name_prefix = str(uuid4()) + + timestamp = creation_time.strftime("%Y%m%d_%H%M%S_%f") + name = f"{name_prefix}_{timestamp}" + return normalize_identifier_name(name) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_storage.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_storage.py new file mode 100644 index 000000000000..4848c3247e4d --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_storage.py @@ -0,0 +1,128 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# Original source: +# promptflow-devkit/promptflow/_sdk/operations/_local_storage_operations.py + +from abc import ABC, abstractmethod +from contextlib import AbstractContextManager +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Final, Mapping, Optional, Tuple, Union + +from ._result import BatchResult, TokenMetrics, BatchStatus + + +EVAL_USER_SUBFOLDER: Final[str] = ".evaluation" + + +class AbstractRunLogger(AbstractContextManager): + @property + @abstractmethod + def file_path(self) -> Path: + """Get the file path of the logger. + + :return: The file path of the logger. + :rtype: Path + """ + ... + + @abstractmethod + def get_logs(self) -> str: + """Get the logs of the run. + + :return: The logs of the run. + :rtype: str + """ + ... + + +class AbstractRunStorage(ABC): + @property + @abstractmethod + def logger(self) -> "AbstractRunLogger": + """Get the logger of the run. + + :return: The logger of the run. + :rtype: ~promptflow.contracts.run_logger.RunLogger + """ + ... + + @abstractmethod + def persist_result(self, result: Optional[BatchResult]) -> None: + """Persist results of a batch engine execution (including any errors). + + :param Optional[BatchResult] result: The result to persist. + """ + ... + + @abstractmethod + def load_exception(self) -> Mapping[str, Any]: + """Load the exception from the storage. If there was no exception, an empty + mapping will be returned. + + :return: The exception. + :rtype: Optional[Exception] + """ + ... + + @abstractmethod + def load_inputs_and_outputs(self) -> Tuple[Mapping[str, Any], BatchResult]: + """Load the inputs and outputs from the storage. + + :return: The inputs and outputs. + :rtype: Tuple(Mapping[str, Any], BatchResult) + """ + ... + + @abstractmethod + def load_metrics(self) -> Mapping[str, Union[int, float, str]]: + """Load the metrics from the storage. + + :return: The metrics. + :rtype: Mapping[str, Union[int, float, str]] + """ + ... + + +class NoOpRunStorage(AbstractRunStorage): + """A no-op implementation of the run storage.""" + + def __init__(self): + self._logger = NoOpLogger() + pass + + @property + def logger(self) -> AbstractRunLogger: + return self._logger + + def persist_result(self, result: Optional[BatchResult]) -> None: + pass + + def load_exception(self) -> Mapping[str, Any]: + return {} + + def load_inputs_and_outputs(self) -> Tuple[Mapping[str, Any], BatchResult]: + now = datetime.now(timezone.utc) + return {}, BatchResult(BatchStatus.NotStarted, 0, 0, now, now, TokenMetrics(0, 0, 0), []) + + def load_metrics(self) -> Mapping[str, Union[int, float, str]]: + return {} + + +class NoOpLogger(AbstractRunLogger): + """A no-op implementation of the run logger.""" + + @property + def file_path(self) -> Path: + return Path.home() / EVAL_USER_SUBFOLDER + + def __enter__(self) -> None: + pass + + def __exit__(self, *args) -> None: + pass + + def get_logs(self) -> str: + return "" diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_submitter.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_submitter.py new file mode 100644 index 000000000000..3d278d45004b --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_submitter.py @@ -0,0 +1,217 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import dataclasses +import sys +from datetime import datetime, timezone +from typing import Any, Callable, Dict, Mapping, Optional, Sequence, TextIO, Union + +from ._run import Run, RunStatus +from ._trace import start_trace, is_collection_writeable +from ._run_storage import AbstractRunStorage, NoOpRunStorage +from ._logging import incremental_print, print_red_error +from ._config import BatchEngineConfig +from ._exceptions import BatchEngineValidationError +from ._engine import BatchEngine, BatchEngineError, BatchResult + + +class RunSubmitter: + """Submits run to executor + promptflow-devkit/promptflow/_sdk/_orchestrator/run_submitter.py + + THIS WILL BE REMOVED IN A FUTURE CODE UPDATE""" + + def __init__(self, config: BatchEngineConfig): + # self._client = PFClient instance + # self._config = PFClient config + # self.run_operations = RunOperations instance + + # TODO ralphe: Use proper logger here. Old code did LoggerFactory.get_logger(__name__) + self._config = config + + def submit( + self, + dynamic_callable: Callable, + inputs: Sequence[Mapping[str, Any]], + column_mapping: Mapping[str, str], + *, + name_prefix: Optional[str] = None, + created_on: Optional[datetime] = None, + storage_creator: Optional[Callable[[Run], AbstractRunStorage]] = None, + **kwargs, + ) -> Run: + # The old code always spun up two threads here using a ThreadPoolExecutor: + # 1. One thread essentially did nothing of value (since tracing was disabled, and we + # don't care about checking for the latest PromptFlow version number now) + # 2. The other thread did the _run_bulk call. This was followed by a + # wait(return_when=ALL_COMPLETED) + # This quite frankly is unnecessary complexity since the the evaluation code already + # calls this in the context of ThreadPoolThread. So we can just do the equivalent + # of the _run_bulk code here directly. + # In a future code refactor, all of this will be cleaned up in favour of proper + # async/await code. + run: Run = kwargs.pop("run", None) or Run( + dynamic_callable=dynamic_callable, + name_prefix=name_prefix, + inputs=inputs, + column_mapping=column_mapping, + created_on=created_on, + ) + + logger = self._config.logger + attributes: Dict[str, Any] = kwargs.get("attributes", {}) + collection_for_run: Optional[str] = None + + logger.debug("start trace for flow run...") + logger.debug("flow path for run.start_trace: %s", run.name) + + if is_collection_writeable(): + logger.debug("trace collection is writeable, will use flow name as collection...") + collection_for_run = run.name + logger.debug("collection for run: %s", collection_for_run) + else: + logger.debug("trace collection is protected, will honor existing collection.") + start_trace(attributes=attributes, run=run, _collection=collection_for_run) + + self._validate_inputs(run=run) + + local_storage = storage_creator(run) if storage_creator else NoOpRunStorage() + with local_storage.logger: + run._status = RunStatus.PREPARING + + # unnecessary Flow loading code was removed here. Instead do direct calls to _submit_bulk_run + self._submit_bulk_run(run=run, local_storage=local_storage, **kwargs) + + self.stream_run(run=run, storage=local_storage, raise_on_error=True) + return run + + def _submit_bulk_run(self, run: Run, local_storage: AbstractRunStorage, **kwargs) -> None: + logger = self._config.logger + + logger.info(f"Submitting run {run.name}, log path: {local_storage.logger.file_path}") + + # Old code loaded the Flex flow, parsed input and outputs types. That logic has been + # removed since it is unnecessary. It also parsed and set environment variables. This + # has also been removed since it can be problematic in a multi-threaded environment. + + self._validate_column_mapping(run.column_mapping) + + run._status = RunStatus.RUNNING + run._start_time = datetime.now(timezone.utc) + batch_result: Optional[BatchResult] = None + + try: + batch_engine = BatchEngine( + run.dynamic_callable, + storage=local_storage, + batch_timeout_sec=self._config.batch_timeout_seconds, + line_timeout_sec=self._config.run_timeout_seconds, + max_worker_count=self._config.max_concurrency, + **kwargs, + ) + + batch_result = batch_engine.run(data=run.inputs, column_mapping=run.column_mapping, id=run.name) + run._status = RunStatus.from_batch_result_status(batch_result.status) + + error_logs: Sequence[str] = [] + if run._status != RunStatus.COMPLETED: + error_logs.append(f"Run {run.name} failed with status {batch_result.status}.") + if batch_result.error: + error_logs.append(f"Error: {str(batch_result.error)}") + + if error_logs: + logger.warning("\n".join(error_logs)) + except Exception as e: + run._status = RunStatus.FAILED + # when run failed in executor, store the exception in result and dump to file + logger.warning(f"Run {run.name} failed when executing in executor with exception {e}.") + # for user error, swallow stack trace and return failed run since user don't need the stack trace + if not isinstance(e, BatchEngineValidationError): + # for other errors, raise it to user to help debug root cause. + raise e + # won't raise the exception since it's already included in run object. + finally: + # persist inputs, outputs and metrics + local_storage.persist_result(batch_result) + # exceptions + # local_storage.dump_exception(exception=exception, batch_result=batch_result) # TODO ralphe: persist_result should handle this + # system metrics + system_metrics = {} + if batch_result: + system_metrics.update(dataclasses.asdict(batch_result.tokens)) # token related + system_metrics.update( + { + "duration": batch_result.duration.total_seconds(), + # "__pf__.lines.completed": batch_result.total_lines - batch_result.failed_lines, + # "__pf__.lines.failed": batch_result.failed_lines, + } + ) + + run._end_time = datetime.now(timezone.utc) + run.metrics = system_metrics + run.result = batch_result + + @staticmethod + def _validate_inputs(run: Run): + if not run.inputs: + raise BatchEngineValidationError("Data must be specified for evaluation run.") + + @staticmethod + def _validate_column_mapping(column_mapping: Mapping[str, str]): + if not isinstance(column_mapping, Mapping): + raise BatchEngineValidationError(f"Column mapping must be a dict, got {type(column_mapping)}.") + + has_mapping = any([isinstance(v, str) and v.startswith("$") for v in column_mapping.values()]) + if not has_mapping: + raise BatchEngineValidationError( + "Column mapping must contain at least one mapping binding, " + f"current column mapping contains all static values: {column_mapping}" + ) + + @staticmethod + def stream_run(run: Run, storage: AbstractRunStorage, raise_on_error: bool) -> None: + """ + Stream the output of the batch execution. + + :param Run run: The run to stream. + :param AbstractRunStorage storage: The storage to use for the output. + """ + + # TODO ralphe: This doesn't seem to be do anything useful beyond just print + # a run summary at the end. This is because by the time it gets + # invoked even in the original code, the run has already completed. + + if run is None or storage is None: + return + + file_handler = sys.stdout + try: + printed = 0 + available_logs = storage.logger.get_logs() + incremental_print(available_logs, printed, file_handler) + RunSubmitter._print_run_summary(run, file_handler) + except KeyboardInterrupt: + error_message = "The output streaming for the run was interrupted, but the run is still executing." + print(error_message) + + if run.status == RunStatus.FAILED or run.status == RunStatus.CANCELED: + if run.status == RunStatus.FAILED: + error_message = storage.load_exception().get("message", "Run fails with unknown error.") + else: + error_message = "Run is canceled." + if raise_on_error: + raise BatchEngineError(error_message) + else: + print_red_error(error_message) + + @staticmethod + def _print_run_summary(run: Run, text_out: Union[TextIO, Any]) -> None: + duration = str(run.duration) + text_out.write( + "======= Run Summary =======\n\n" + f'Run name: "{run.name}"\n' + f'Run status: "{run.status.value}"\n' + f'Start time: "{run.created_on}"\n' + f'Duration: "{duration}"\n\n' + ) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_status.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_status.py new file mode 100644 index 000000000000..ea79b45158e0 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_status.py @@ -0,0 +1,25 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from enum import IntEnum, auto, unique + + +@unique +class BatchStatus(IntEnum): + NotStarted = 0 + Running = auto() + + # NOTE: DO NOT REORDER THESE ENUMS. The order is important for the is_terminated method + # and other logic in the code to work properly + Completed = auto() + Canceled = auto() + Failed = auto() + + @staticmethod + def is_terminated(status: "BatchStatus") -> bool: + return status >= BatchStatus.Completed + + @staticmethod + def is_failed(status: "BatchStatus") -> bool: + return status == BatchStatus.Failed or status == BatchStatus.Canceled \ No newline at end of file diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_trace.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_trace.py new file mode 100644 index 000000000000..88df221da930 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_trace.py @@ -0,0 +1,105 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# Pretty much all this code will be removed + +import logging +import os +from typing import Any, Dict, Optional + +from ._openai_injector import inject_openai_api + + +def start_trace( + *, + resource_attributes: Optional[Dict] = None, + collection: Optional[str] = None, + **kwargs: Any, +) -> None: + """Promptflow instrumentation. + + :param resource_attributes: Specify the resource attributes for current process. + :type resource_attributes: typing.Optional[dict] + :param collection: Specify the collection for current tracing. + :type collection: typing.Optional[str] + """ + + logging.debug("injecting OpenAI API...") + inject_openai_api() + logging.debug("OpenAI API injected.") + + res_attrs: Dict[str, str] = {"service.name": "promptflow"} + if resource_attributes: + logging.debug("specified resource attributes: %s", resource_attributes) + res_attrs.update(resource_attributes) + + # determine collection + collection_user_specified = collection is not None + if not collection_user_specified: + collection = kwargs.get("_collection", _get_collection_from_cwd()) + # logging.debug("collection is not user specified") + # if is_collection_writeable(): + # # internal parameter for devkit call + # _collection = kwargs.get("_collection", None) + # if _collection is not None: + # logging.debug("received internal parameter _collection: %s, will use this", _collection) + # collection = _collection + # else: + # logging.debug("trying to get from current working directory...") + # collection = _get_collection_from_cwd() + # # TODO ralphe: OpenTelemetry dependency. This is a future task to resolve. + # # else: + # # logging.debug("collection is protected, will directly use that...") + # # tracer_provider: TracerProvider = trace.get_tracer_provider() + # # collection = tracer_provider.resource.attributes["collection"] + logging.info("collection: %s", collection) + res_attrs["collection"] = collection or "default" + logging.info("resource attributes: %s", res_attrs) + + # if user specifies collection, we will add a flag on tracer provider to avoid override + _set_tracer_provider(res_attrs, protected_collection=collection_user_specified) + + # Rest of code is removed since we are removing promptflow-devkit dependency + + +def is_collection_writeable() -> bool: + # TODO ralphe: This has OpenTelemetry dependency. That is a future task to resolve. + # return not getattr(trace.get_tracer_provider(), TRACER_PROVIDER_PROTECTED_COLLECTION_ATTR, False) + return True + + +def _get_collection_from_cwd() -> str: + """Try to use cwd folder name as collection name; will fall back to default value if run into exception.""" + cur_folder_name = "" + try: + cwd = os.getcwd() + cur_folder_name = os.path.basename(cwd) + except Exception: # pylint: disable=broad-except + # possible exception: PermissionError, FileNotFoundError, OSError, etc. + pass + collection = cur_folder_name or "default" + return collection + + +def _set_tracer_provider(res_attrs: Dict[str, str], protected_collection: bool) -> None: + # TODO ralphe: OpenTelemetry dependency. This is a future task to resolve. + pass + # res = Resource(attributes=res_attrs) + # tracer_provider = TracerProvider(resource=res) + + # cur_tracer_provider = trace.get_tracer_provider() + # if isinstance(cur_tracer_provider, TracerProvider): + # logging.info("tracer provider is already set, will merge the resource attributes...") + # cur_res = cur_tracer_provider.resource + # logging.debug("current resource: %s", cur_res.attributes) + # new_res = cur_res.merge(res) + # cur_tracer_provider._resource = new_res + # logging.info("tracer provider is updated with resource attributes: %s", new_res.attributes) + # else: + # trace.set_tracer_provider(tracer_provider) + # logging.info("tracer provider is set with resource attributes: %s", res.attributes) + + # if protected_collection: + # logging.info("user specifies collection, will add a flag on tracer provider to avoid override...") + # setattr(trace.get_tracer_provider(), TRACER_PROVIDER_PROTECTED_COLLECTION_ATTR, True) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_utils.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_utils.py new file mode 100644 index 000000000000..f775bd640bdf --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_utils.py @@ -0,0 +1,82 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import os +import re +from typing import Any, Mapping, Sequence, Tuple + + +def normalize_identifier_name(name: str) -> str: + """Normalize the identifier name to a valid Python variable name. + + Args: + name (str): The identifier name to normalize. + + Returns: + str: The normalized identifier name. + """ + normalized = re.sub(r"\W", "_", name.strip()) + if normalized[0].isdigit(): + normalized = f"_{normalized}" + return normalized + + +def get_int_env_var(env_var_name: str, default_value: int = 0) -> int: + """Get the integer value of the environment variable. + + Args: + env_var_name (str): The name of the environment variable. + default_value (int): The default value if the environment variable is not set. + + Returns: + int: The integer value of the environment variable. + """ + try: + value = os.getenv(env_var_name, default_value) + return int(value) + except ValueError: + return default_value + + +def get_value_from_path(path: str, data: Mapping[str, Any]) -> Tuple[bool, Any]: + """Tried to get a value from a mapping based on the specified path. The path is a + string with dot-separated keys (e.g. data.nested_1.nested_2). + + This will interpret the path prioritizing a depth first search with the shortest + key possible at each level. If for example you had the following data: + { + "foo": { + "bar": { + "happy": 12 + } + }, + "foo.bar": { + "none": 14, + "random": { "some": 15 } + }, + "foo.bar.none": 16 + } + And you asked for foo.bar.none, the returned value would be 14" + """ + + def _get_value(data: Mapping[str, Any], parts: Sequence[str]) -> Tuple[bool, Any]: + if len(parts) == 0: + return True, data + + for i in range(1, len(parts) + 1): + key = ".".join(parts[:i]) + if isinstance(data, Mapping) and key in data: + found, match = _get_value(data[key], parts[i:]) + if found: + return found, match + + return False, None + + if path is None or data is None: + return False, None + + parts = path.strip().split(".") + if len(parts) == 0: + return False, None + return _get_value(data, parts) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_utils_deprecated.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_utils_deprecated.py new file mode 100644 index 000000000000..1bc0aa153b16 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_utils_deprecated.py @@ -0,0 +1,131 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import asyncio +import contextvars +import dataclasses +from asyncio import Task +from concurrent.futures import ThreadPoolExecutor +from typing import Any, AsyncIterator, Callable, Iterator, Mapping, Optional, Sequence, Tuple, cast + + +class ThreadPoolExecutorWithContext(ThreadPoolExecutor): + # Original source: + # promptflow-tracing/promptflow/tracing/_context_utils.py + + def __init__( + self, + max_workers: Optional[int] = None, + thread_name_prefix: str = "", + initializer: Optional[Callable] = None, + initargs: Tuple[Any, ...] = (), + ) -> None: + """The ThreadPoolExecutionWithContext is an extended thread pool implementation + which will copy the context from the current thread to the child threads. + Thus the traced functions in child threads could keep parent-child relationship in the tracing system. + The arguments are the same as ThreadPoolExecutor. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. + """ + current_context = contextvars.copy_context() + initializer_args = (current_context, initializer, initargs) + super().__init__(max_workers, thread_name_prefix, self.set_context_then_call, initializer_args) + + @staticmethod + def set_context_then_call( + context: contextvars.Context, + initializer: Optional[Callable], + initargs: Tuple[Any, ...], + ) -> None: + for var, value in context.items(): + var.set(value) + if initializer: + initializer(*initargs) + + +def _has_running_loop() -> bool: + """Check if the current thread has a running event loop.""" + # When using asyncio.get_running_loop(), a RuntimeError is raised if there is no running event loop. + # So, we use a try-catch block to determine whether there is currently an event loop in place. + # + # Note that this is the only way to check whether there is a running loop now, see: + # https://docs.python.org/3/library/asyncio-eventloop.html?highlight=get_running_loop#asyncio.get_running_loop + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False + + +def async_run_allowing_running_loop(async_func, *args, **kwargs): + """Run an async function in a new thread, allowing the current thread to have a running event loop. + + When run in an async environment (e.g., in a notebook), because each thread allows only one event + loop, using asyncio.run directly leads to a RuntimeError ("asyncio.run() cannot be called from a + running event loop"). + + To address this issue, we add a check for the event loop here. If the current thread already has an + event loop, we run _exec_batch in a new thread; otherwise, we run it in the current thread. + """ + + if _has_running_loop(): + # TODO ralphe: The logic here makes absolutely no sense to me. If you already have an + # async event loop running, why would you want to start up a new thread, + # create a new event loop, and run the async function in a new thread? + # You can just use the following to schedule the async function call on + # the existing event loop: + # asyncio.get_running_loop().create_task(async_func(*args, *args, **kwargs)).result() + # The correct thing to do here is not make these decisions here at all. + # Instead, all the BatchEngine code should be async first, with the event + # loop being started by the callers of that code. For now, I am keeping + # this odd logic as is, and in phase 2 of the migration, this will be + # refactored to be more idiomatic asyncio code. + with ThreadPoolExecutorWithContext() as executor: + return executor.submit(lambda: asyncio.run(async_func(*args, **kwargs))).result() + else: + return asyncio.run(async_func(*args, **kwargs)) + + +async def stringify_output_async(output: Any) -> str: + if isinstance(output, AsyncIterator): + return await stringify_output_async([v async for v in output]) + if isinstance(output, Iterator): + return await stringify_output_async([v for v in output]) + if isinstance(output, Mapping): + return ", ".join( + [f"{await stringify_output_async(k)}:{await stringify_output_async(v)}" for k, v in output.items()] + ) + if isinstance(output, Sequence): + return "".join([await stringify_output_async(v) for v in output]) + if isinstance(output, Task): + return await stringify_output_async(await output) + + return str(output) + + +def convert_eager_flow_output_to_dict(value: Any) -> Mapping[str, Any]: + """ + Convert the output of eager flow to a dict. Since the output of eager flow + may not be a dict, we need to convert it to a dict in batch mode. + + Examples: + 1. If the output is a dict, return it directly: + value = {"output": 1} -> {"output": 1} + 2. If the output is a dataclass, convert it to a dict: + value = SampleDataClass(output=1) -> {"output": 1} + 3. If the output is not a dict or dataclass, convert it to a dict by adding a key "output": + value = 1 -> {"output": 1} + """ + + if isinstance(value, Mapping): + return value + elif dataclasses.is_dataclass(value): + return dataclasses.asdict(cast(Any, value)) + else: + return {"output": value} diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/__init__.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/__init__.py similarity index 85% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/__init__.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/__init__.py index 23fbd6844d78..9eae145c5a6e 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/__init__.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/__init__.py @@ -2,9 +2,9 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -from azure.ai.evaluation.legacy.prompty._prompty import AsyncPrompty -from azure.ai.evaluation.legacy.prompty._connection import Connection, OpenAIConnection, AzureOpenAIConnection -from azure.ai.evaluation.legacy.prompty._exceptions import ( +from azure.ai.evaluation._legacy.prompty._prompty import AsyncPrompty +from azure.ai.evaluation._legacy.prompty._connection import Connection, OpenAIConnection, AzureOpenAIConnection +from azure.ai.evaluation._legacy.prompty._exceptions import ( PromptyException, MissingRequiredInputError, InvalidInputError, diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_connection.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_connection.py similarity index 97% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_connection.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_connection.py index 9308ad7380fc..b3240e06f49c 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_connection.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_connection.py @@ -8,8 +8,8 @@ from dataclasses import dataclass from typing import Any, ClassVar, Mapping, Optional, Set, Union -from azure.ai.evaluation.legacy.prompty._exceptions import MissingRequiredInputError -from azure.ai.evaluation.legacy.prompty._utils import dataclass_from_dict +from azure.ai.evaluation._legacy.prompty._exceptions import MissingRequiredInputError +from azure.ai.evaluation._legacy.prompty._utils import dataclass_from_dict ENV_VAR_PATTERN = re.compile(r"^\$\{env:(.*)\}$") diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_exceptions.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_exceptions.py similarity index 100% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_exceptions.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_exceptions.py diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_prompty.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_prompty.py similarity index 95% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_prompty.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_prompty.py index e6b977a81f69..0260773ad58b 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_prompty.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_prompty.py @@ -11,15 +11,15 @@ from azure.ai.evaluation._exceptions import ErrorTarget from azure.ai.evaluation._constants import DefaultOpenEncoding -from azure.ai.evaluation.legacy.prompty._exceptions import ( +from azure.ai.evaluation._legacy.prompty._exceptions import ( InvalidInputError, PromptyException, MissingRequiredInputError, NotSupportedError, ) -from azure.ai.evaluation.legacy.prompty._connection import AzureOpenAIConnection, Connection, OpenAIConnection -from azure.ai.evaluation.legacy.prompty._yaml_utils import load_yaml_string -from azure.ai.evaluation.legacy.prompty._utils import ( +from azure.ai.evaluation._legacy.prompty._connection import AzureOpenAIConnection, Connection, OpenAIConnection +from azure.ai.evaluation._legacy.prompty._yaml_utils import load_yaml_string +from azure.ai.evaluation._legacy.prompty._utils import ( dataclass_from_dict, PromptyModelConfiguration, OpenAIChatResponseType, @@ -67,7 +67,7 @@ class AsyncPrompty: .. code-block:: python - from azure.ai.evaluation.legacy.prompty import AsyncPrompty + from azure.ai.evaluation._legacy.prompty import AsyncPrompty prompty = Prompty(path="path/to/prompty.prompty") result = prompty(input_a=1, input_b=2) @@ -89,7 +89,7 @@ class AsyncPrompty: result = prompty(input_a=1, input_b=2) # Override model config with configuration - from azure.ai.evaluation.legacy.prompty._connection import AzureOpenAIConnection + from azure.ai.evaluation._legacy.prompty._connection import AzureOpenAIConnection model_config = { "api": "chat", "configuration": AzureOpenAIModelConfiguration( @@ -106,7 +106,7 @@ class AsyncPrompty: result = prompty(input_a=1, input_b=2) # Override model config with created connection - from azure.ai.evaluation.legacy.prompty._connection import AzureOpenAIConnection + from azure.ai.evaluation._legacy.prompty._connection import AzureOpenAIConnection model_config = { "api": "chat", "configuration": AzureOpenAIModelConfiguration( diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_utils.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_utils.py similarity index 99% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_utils.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_utils.py index be5275462af4..3a6521ca0987 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_utils.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_utils.py @@ -32,13 +32,13 @@ from openai.types.chat import ChatCompletion, ChatCompletionChunk from azure.ai.evaluation._constants import DefaultOpenEncoding -from azure.ai.evaluation.legacy.prompty._exceptions import ( +from azure.ai.evaluation._legacy.prompty._exceptions import ( InvalidInputError, JinjaTemplateError, PromptyException, ) -from azure.ai.evaluation.legacy.prompty._yaml_utils import load_yaml +from azure.ai.evaluation._legacy.prompty._yaml_utils import load_yaml # region: Resolving references diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_yaml_utils.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_yaml_utils.py similarity index 97% rename from sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_yaml_utils.py rename to sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_yaml_utils.py index 62e7039ac519..7ea2d5b4babb 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/legacy/prompty/_yaml_utils.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/prompty/_yaml_utils.py @@ -8,7 +8,7 @@ from ruamel.yaml import YAML, YAMLError # cspell:ignore ruamel from azure.ai.evaluation._constants import DefaultOpenEncoding -from azure.ai.evaluation.legacy.prompty._exceptions import MissingRequiredInputError +from azure.ai.evaluation._legacy.prompty._exceptions import MissingRequiredInputError def load_yaml(source: Optional[Union[str, PathLike, IO]]) -> Dict: diff --git a/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_prompty.py b/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_prompty.py index cad5a5c14308..47d2f41512ad 100644 --- a/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_prompty.py +++ b/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_prompty.py @@ -11,7 +11,7 @@ from openai.types.chat import ChatCompletion -from azure.ai.evaluation.legacy.prompty import AsyncPrompty, InvalidInputError +from azure.ai.evaluation._legacy.prompty import AsyncPrompty, InvalidInputError from azure.ai.evaluation import AzureOpenAIModelConfiguration