diff --git a/src/ragas/callbacks.py b/src/ragas/callbacks.py index fb704846c..63664b58b 100644 --- a/src/ragas/callbacks.py +++ b/src/ragas/callbacks.py @@ -1,24 +1,38 @@ +from __future__ import annotations + +import json import typing as t +import uuid +from dataclasses import dataclass, field +from enum import Enum from langchain_core.callbacks import ( - AsyncCallbackManager, - AsyncCallbackManagerForChainGroup, - AsyncCallbackManagerForChainRun, + BaseCallbackHandler, CallbackManager, CallbackManagerForChainGroup, CallbackManagerForChainRun, Callbacks, ) +from pydantic import BaseModel, Field def new_group( - name: str, inputs: t.Dict, callbacks: Callbacks + name: str, + inputs: t.Dict, + callbacks: Callbacks, + tags: t.Optional[t.List[str]] = None, + metadata: t.Optional[t.Dict[str, t.Any]] = None, ) -> t.Tuple[CallbackManagerForChainRun, CallbackManagerForChainGroup]: + tags = tags or [] + metadata = metadata or {} + # start evaluation chain if isinstance(callbacks, list): cm = CallbackManager.configure(inheritable_callbacks=callbacks) else: cm = t.cast(CallbackManager, callbacks) + cm.tags = tags + cm.metadata = metadata rm = cm.on_chain_start({"name": name}, inputs) child_cm = rm.get_child() group_cm = CallbackManagerForChainGroup( @@ -35,24 +49,118 @@ def new_group( return rm, group_cm -async def new_async_group( - name: str, inputs: t.Dict, callbacks: Callbacks -) -> t.Tuple[AsyncCallbackManagerForChainRun, AsyncCallbackManagerForChainGroup]: - # start evaluation chain - if isinstance(callbacks, list): - cm = AsyncCallbackManager.configure(inheritable_callbacks=callbacks) - else: - cm = t.cast(AsyncCallbackManager, callbacks) - rm = await cm.on_chain_start({"name": name}, inputs) - child_cm = rm.get_child() - group_cm = AsyncCallbackManagerForChainGroup( - child_cm.handlers, - child_cm.inheritable_handlers, - child_cm.parent_run_id, - parent_run_manager=rm, - tags=child_cm.tags, - inheritable_tags=child_cm.inheritable_tags, - metadata=child_cm.metadata, - inheritable_metadata=child_cm.inheritable_metadata, - ) - return rm, group_cm +class ChainType(Enum): + EVALUATION = "evaluation" + METRIC = "metric" + ROW = "row" + RAGAS_PROMPT = "ragas_prompt" + + +class ChainRun(BaseModel): + run_id: uuid.UUID + parent_run_id: t.Optional[uuid.UUID] + name: str + inputs: t.Dict[str, t.Any] + metadata: t.Dict[str, t.Any] + outputs: t.Dict[str, t.Any] = Field(default_factory=dict) + children: t.List[uuid.UUID] = Field(default_factory=list) + + +class ChainRunEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, uuid.UUID): + return str(o) + if isinstance(o, ChainType): + return o.value + return json.JSONEncoder.default(self, o) + + +@dataclass +class RagasTracer(BaseCallbackHandler): + traces: t.Dict[uuid.UUID, ChainRun] = field(default_factory=dict) + + def on_chain_start( + self, + serialized: t.Dict[str, t.Any], + inputs: t.Dict[str, t.Any], + *, + run_id: uuid.UUID, + parent_run_id: t.Optional[uuid.UUID] = None, + tags: t.Optional[t.List[str]] = None, + metadata: t.Optional[t.Dict[str, t.Any]] = None, + **kwargs: t.Any, + ) -> t.Any: + self.traces[run_id] = ChainRun( + run_id=run_id, + parent_run_id=parent_run_id, + name=serialized["name"], + inputs=inputs, + metadata=metadata or {}, + children=[], + ) + + if parent_run_id and parent_run_id in self.traces: + self.traces[parent_run_id].children.append(run_id) + + def on_chain_end( + self, + outputs: t.Dict[str, t.Any], + *, + run_id: uuid.UUID, + **kwargs: t.Any, + ) -> t.Any: + self.traces[run_id].outputs = outputs + + def to_jsons(self) -> str: + return json.dumps( + [t.model_dump() for t in self.traces.values()], + indent=4, + cls=ChainRunEncoder, + ) + + +@dataclass +class MetricTrace(dict): + scores: t.Dict[str, float] = field(default_factory=dict) + + def __repr__(self): + return self.scores.__repr__() + + def __str__(self): + return self.__repr__() + + +def parse_run_traces( + traces: t.Dict[uuid.UUID, ChainRun], +) -> t.List[t.Dict[str, t.Any]]: + root_traces = [ + chain_trace + for chain_trace in traces.values() + if chain_trace.parent_run_id is None + ] + if len(root_traces) > 1: + raise ValueError( + "Multiple root traces found! This is a bug on our end, please file an issue and we will fix it ASAP :)" + ) + root_trace = root_traces[0] + + # get all the row traces + parased_traces = [] + for row_uuid in root_trace.children: + row_trace = traces[row_uuid] + metric_traces = MetricTrace() + for metric_uuid in row_trace.children: + metric_trace = traces[metric_uuid] + metric_traces.scores[metric_trace.name] = metric_trace.outputs["output"] + # get all the prompt IO from the metric trace + prompt_traces = {} + for i, prompt_uuid in enumerate(metric_trace.children): + prompt_trace = traces[prompt_uuid] + prompt_traces[f"{i}_{prompt_trace.name}"] = { + "input": prompt_trace.inputs["data"], + "output": prompt_trace.outputs["output"], + } + metric_traces[f"{metric_trace.name}"] = prompt_traces + parased_traces.append(metric_traces) + + return parased_traces diff --git a/src/ragas/dataset_schema.py b/src/ragas/dataset_schema.py index 060db899b..936ba4c66 100644 --- a/src/ragas/dataset_schema.py +++ b/src/ragas/dataset_schema.py @@ -8,16 +8,19 @@ from datasets import Dataset as HFDataset from pydantic import BaseModel, field_validator +from ragas.callbacks import parse_run_traces from ragas.cost import CostCallbackHandler from ragas.messages import AIMessage, HumanMessage, ToolCall, ToolMessage from ragas.utils import safe_nanmean if t.TYPE_CHECKING: + import uuid from pathlib import Path from datasets import Dataset as HFDataset from pandas import DataFrame as PandasDataframe + from ragas.callbacks import ChainRun from ragas.cost import TokenUsage @@ -137,6 +140,7 @@ def pretty_repr(self): Sample = t.TypeVar("Sample", bound=BaseSample) +T = t.TypeVar("T", bound="RagasDataset") class RagasDataset(ABC, BaseModel, t.Generic[Sample]): @@ -149,7 +153,7 @@ def to_list(self) -> t.List[t.Dict]: @classmethod @abstractmethod - def from_list(cls, data: t.List[t.Dict]) -> RagasDataset[Sample]: + def from_list(cls: t.Type[T], data: t.List[t.Dict]) -> T: """Creates an EvaluationDataset from a list of dictionaries.""" pass @@ -181,7 +185,7 @@ def to_hf_dataset(self) -> HFDataset: return HFDataset.from_list(self.to_list()) @classmethod - def from_hf_dataset(cls, dataset: HFDataset): + def from_hf_dataset(cls: t.Type[T], dataset: HFDataset) -> T: """Creates an EvaluationDataset from a Hugging Face Dataset.""" return cls.from_list(dataset.to_list()) @@ -202,7 +206,7 @@ def features(self): return self.samples[0].get_features() @classmethod - def from_dict(cls, mapping: t.Dict): + def from_dict(cls: t.Type[T], mapping: t.Dict) -> T: """Creates an EvaluationDataset from a dictionary.""" samples = [] if all( @@ -237,7 +241,7 @@ def to_jsonl(self, path: t.Union[str, Path]): jsonlfile.write(json.dumps(sample.to_dict(), ensure_ascii=False) + "\n") @classmethod - def from_jsonl(cls, path: t.Union[str, Path]): + def from_jsonl(cls: t.Type[T], path: t.Union[str, Path]) -> T: """Creates an EvaluationDataset from a JSONL file.""" with open(path, "r") as jsonlfile: data = [json.loads(line) for line in jsonlfile] @@ -334,12 +338,6 @@ def from_list(cls, data: t.List[t.Dict]) -> EvaluationDataset: return cls(samples=samples) -class EvaluationResultRow(BaseModel): - dataset_row: t.Dict - scores: t.Dict[str, t.Any] - trace: t.Dict[str, t.Any] = field(default_factory=dict) # none for now - - @dataclass class EvaluationResult: """ @@ -361,6 +359,8 @@ class EvaluationResult: dataset: EvaluationDataset binary_columns: t.List[str] = field(default_factory=list) cost_cb: t.Optional[CostCallbackHandler] = None + traces: t.List[t.Dict[str, t.Any]] = field(default_factory=list) + ragas_traces: t.Dict[uuid.UUID, ChainRun] = field(default_factory=dict, repr=False) def __post_init__(self): # transform scores from list of dicts to dict of lists @@ -377,6 +377,9 @@ def __post_init__(self): value = t.cast(float, value) values.append(value + 1e-10) + # parse the traces + self.traces = parse_run_traces(self.ragas_traces) + def to_pandas(self, batch_size: int | None = None, batched: bool = False): """ Convert the result to a pandas DataFrame. @@ -413,18 +416,6 @@ def to_pandas(self, batch_size: int | None = None, batched: bool = False): dataset_df = self.dataset.to_pandas() return pd.concat([dataset_df, scores_df], axis=1) - def serialized(self) -> t.List[EvaluationResultRow]: - """ - Convert the result to a list of EvaluationResultRow. - """ - return [ - EvaluationResultRow( - dataset_row=self.dataset[i].to_dict(), - scores=self.scores[i], - ) - for i in range(len(self.scores)) - ] - def total_tokens(self) -> t.Union[t.List[TokenUsage], TokenUsage]: """ Compute the total tokens used in the evaluation. diff --git a/src/ragas/evaluation.py b/src/ragas/evaluation.py index 6ee234296..bc7d53fda 100644 --- a/src/ragas/evaluation.py +++ b/src/ragas/evaluation.py @@ -9,7 +9,7 @@ from langchain_core.language_models import BaseLanguageModel as LangchainLLM from ragas._analytics import EvaluationEvent, track, track_was_completed -from ragas.callbacks import new_group +from ragas.callbacks import ChainType, RagasTracer, new_group from ragas.dataset_schema import ( EvaluationDataset, EvaluationResult, @@ -229,6 +229,10 @@ def evaluate( # init the callbacks we need for various tasks ragas_callbacks: t.Dict[str, BaseCallbackHandler] = {} + # Ragas Tracer which traces the run + tracer = RagasTracer() + ragas_callbacks["tracer"] = tracer + # check if cost needs to be calculated if token_usage_parser is not None: from ragas.cost import CostCallbackHandler @@ -246,7 +250,10 @@ def evaluate( # new evaluation chain row_run_managers = [] evaluation_rm, evaluation_group_cm = new_group( - name=RAGAS_EVALUATION_CHAIN_NAME, inputs={}, callbacks=callbacks + name=RAGAS_EVALUATION_CHAIN_NAME, + inputs={}, + callbacks=callbacks, + metadata={"type": ChainType.EVALUATION}, ) sample_type = dataset.get_sample_type() @@ -256,6 +263,7 @@ def evaluate( name=f"row {i}", inputs=row, callbacks=evaluation_group_cm, + metadata={"type": ChainType.ROW, "row_index": i}, ) row_run_managers.append((row_rm, row_group_cm)) if sample_type == SingleTurnSample: @@ -321,6 +329,7 @@ def evaluate( t.Union["CostCallbackHandler", None], cost_cb, ), + ragas_traces=tracer.traces, ) if not evaluation_group_cm.ended: evaluation_rm.on_chain_end(result) diff --git a/src/ragas/metrics/_aspect_critic.py b/src/ragas/metrics/_aspect_critic.py index 95f17ff08..99c909a14 100644 --- a/src/ragas/metrics/_aspect_critic.py +++ b/src/ragas/metrics/_aspect_critic.py @@ -117,7 +117,7 @@ class AspectCritic(MetricWithLLM, SingleTurnMetric, MultiTurnMetric): strictness: int = field(default=1, repr=False) max_retries: int = 1 - def __post_init__(self: t.Self): + def __post_init__(self): if self.name == "": raise ValueError("Expects a name") if self.definition == "": @@ -141,12 +141,12 @@ def _compute_score( return score async def _single_turn_ascore( - self: t.Self, sample: SingleTurnSample, callbacks: Callbacks + self, sample: SingleTurnSample, callbacks: Callbacks ) -> float: row = sample.to_dict() return await self._ascore(row, callbacks) - async def _ascore(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: assert self.llm is not None, "set LLM before use" user_input, context, response = ( diff --git a/src/ragas/metrics/_faithfulness.py b/src/ragas/metrics/_faithfulness.py index 1485b8fa5..8a31f80fe 100644 --- a/src/ragas/metrics/_faithfulness.py +++ b/src/ragas/metrics/_faithfulness.py @@ -275,7 +275,7 @@ async def _single_turn_ascore( row = sample.to_dict() return await self._ascore(row, callbacks) - async def _ascore(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: """ returns the NLI score for each (q, c, a) pair """ @@ -330,7 +330,7 @@ def _create_batch( for ndx in range(0, length_of_pairs, self.batch_size): yield pairs[ndx : min(ndx + self.batch_size, length_of_pairs)] - async def _ascore(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: """ returns the NLI score for each (q, c, a) pair """ diff --git a/src/ragas/metrics/base.py b/src/ragas/metrics/base.py index 3676db1ec..8d3457e0e 100644 --- a/src/ragas/metrics/base.py +++ b/src/ragas/metrics/base.py @@ -10,7 +10,7 @@ from pysbd import Segmenter -from ragas.callbacks import new_group +from ragas.callbacks import ChainType, new_group from ragas.dataset_schema import MultiTurnSample, SingleTurnSample from ragas.executor import is_event_loop_running from ragas.prompt import PromptMixin @@ -97,7 +97,12 @@ def score(self, row: t.Dict, callbacks: Callbacks = None) -> float: This method is deprecated and will be removed in 0.3. Please use `single_turn_ascore` or `multi_turn_ascore` instead. """ callbacks = callbacks or [] - rm, group_cm = new_group(self.name, inputs=row, callbacks=callbacks) + rm, group_cm = new_group( + self.name, + inputs=row, + callbacks=callbacks, + metadata={"type": ChainType.METRIC}, + ) try: if is_event_loop_running(): try: @@ -134,7 +139,12 @@ async def ascore( This method is deprecated and will be removed in 0.3. Please use `single_turn_ascore` instead. """ callbacks = callbacks or [] - rm, group_cm = new_group(self.name, inputs=row, callbacks=callbacks) + rm, group_cm = new_group( + self.name, + inputs=row, + callbacks=callbacks, + metadata={"type": ChainType.METRIC}, + ) try: score = await asyncio.wait_for( self._ascore(row=row, callbacks=group_cm), @@ -193,6 +203,17 @@ class SingleTurnMetric(Metric): This class provides methods to score single-turn samples, both synchronously and asynchronously. """ + def _only_required_columns_single_turn( + self, sample: SingleTurnSample + ) -> SingleTurnSample: + """ + Simplify the sample to only include the required columns. + """ + required_columns = self.required_columns.get(MetricType.SINGLE_TURN.name, set()) + if not required_columns: + return sample + return SingleTurnSample(**sample.model_dump(include=required_columns)) + def single_turn_score( self, sample: SingleTurnSample, @@ -204,8 +225,13 @@ def single_turn_score( May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment. """ callbacks = callbacks or [] + # only get the required columns + sample = self._only_required_columns_single_turn(sample) rm, group_cm = new_group( - self.name, inputs=sample.model_dump(), callbacks=callbacks + self.name, + inputs=sample.to_dict(), + callbacks=callbacks, + metadata={"type": ChainType.METRIC}, ) try: if is_event_loop_running(): @@ -242,8 +268,14 @@ async def single_turn_ascore( May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout. """ callbacks = callbacks or [] - row = sample.model_dump() - rm, group_cm = new_group(self.name, inputs=row, callbacks=callbacks) + # only get the required columns + sample = self._only_required_columns_single_turn(sample) + rm, group_cm = new_group( + self.name, + inputs=sample.to_dict(), + callbacks=callbacks, + metadata={"type": ChainType.METRIC}, + ) try: score = await asyncio.wait_for( self._single_turn_ascore(sample=sample, callbacks=group_cm), @@ -278,6 +310,17 @@ class MultiTurnMetric(Metric): for scoring multi-turn conversation samples. """ + def _only_required_columns_multi_turn( + self, sample: MultiTurnSample + ) -> MultiTurnSample: + """ + Simplify the sample to only include the required columns. + """ + required_columns = self.required_columns.get(MetricType.MULTI_TURN.name, set()) + if not required_columns: + return sample + return MultiTurnSample(**sample.model_dump(include=required_columns)) + def multi_turn_score( self, sample: MultiTurnSample, @@ -289,8 +332,12 @@ def multi_turn_score( May raise ImportError if nest_asyncio is not installed in Jupyter-like environments. """ callbacks = callbacks or [] + sample = self._only_required_columns_multi_turn(sample) rm, group_cm = new_group( - self.name, inputs=sample.model_dump(), callbacks=callbacks + self.name, + inputs=sample.to_dict(), + callbacks=callbacks, + metadata={"type": ChainType.METRIC}, ) try: if is_event_loop_running(): @@ -327,8 +374,13 @@ async def multi_turn_ascore( May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout. """ callbacks = callbacks or [] + sample = self._only_required_columns_multi_turn(sample) + rm, group_cm = new_group( - self.name, inputs=sample.model_dump(), callbacks=callbacks + self.name, + inputs=sample.to_dict(), + callbacks=callbacks, + metadata={"type": ChainType.METRIC}, ) try: score = await asyncio.wait_for( diff --git a/src/ragas/prompt/pydantic_prompt.py b/src/ragas/prompt/pydantic_prompt.py index efeb38b20..459c4bf95 100644 --- a/src/ragas/prompt/pydantic_prompt.py +++ b/src/ragas/prompt/pydantic_prompt.py @@ -11,7 +11,7 @@ from pydantic import BaseModel from ragas._version import __version__ -from ragas.callbacks import new_group +from ragas.callbacks import ChainType, new_group from ragas.exceptions import RagasOutputParserException from ragas.llms.prompt import PromptValue @@ -176,6 +176,7 @@ async def generate_multiple( name=self.name, inputs={"data": processed_data}, callbacks=callbacks, + metadata={"type": ChainType.RAGAS_PROMPT}, ) prompt_value = PromptValue(prompt_str=self.to_string(processed_data)) resp = await llm.generate( diff --git a/tests/unit/test_metric.py b/tests/unit/test_metric.py index 2a20b834c..4f589318d 100644 --- a/tests/unit/test_metric.py +++ b/tests/unit/test_metric.py @@ -1,3 +1,6 @@ +import typing as t +from dataclasses import dataclass, field + from ragas.dataset_schema import EvaluationDataset, SingleTurnSample from ragas.metrics.base import MetricType from ragas.metrics.utils import get_available_metrics @@ -34,3 +37,37 @@ async def _single_turn_ascore(self, sample: SingleTurnSample, callbacks): fm = FakeMetric() assert fm.single_turn_score(SingleTurnSample(user_input="a", response="b")) == 0 + + +def test_required_columns(): + from ragas.metrics.base import MetricType, SingleTurnMetric + + @dataclass + class FakeMetric(SingleTurnMetric): + name = "fake_metric" # type: ignore + _required_columns: t.Dict[MetricType, t.Set[str]] = field( + default_factory=lambda: { + MetricType.SINGLE_TURN: {"user_input", "response"}, + } + ) + + def init(self, run_config): + pass + + async def _ascore(self, row, callbacks) -> float: + return 0 + + async def _single_turn_ascore(self, sample: SingleTurnSample, callbacks): + return 0 + + fm = FakeMetric() + assert fm.required_columns[MetricType.SINGLE_TURN.name] == { + "user_input", + "response", + } + assert ( + fm._only_required_columns_single_turn( + SingleTurnSample(user_input="a", response="b", reference="c") + ).to_dict() + == SingleTurnSample(user_input="a", response="b").to_dict() + )