Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Dataset:
_id: str
_records: List[DatasetRecord]
_version: int
_latest_version: int
_dne_client: "LLMObsExperimentsClient"
_new_records_by_record_id: Dict[str, DatasetRecordRaw]
_updated_record_ids_to_new_fields: Dict[str, UpdatableDatasetRecord]
Expand All @@ -121,13 +122,15 @@ def __init__(
dataset_id: str,
records: List[DatasetRecord],
description: str,
latest_version: int,
version: int,
_dne_client: "LLMObsExperimentsClient",
) -> None:
self.name = name
self.project = project
self.description = description
self._id = dataset_id
self._latest_version = latest_version
self._version = version
self._dne_client = _dne_client
self._records = records
Expand Down Expand Up @@ -168,7 +171,10 @@ def push(self) -> None:
record["record_id"] = record_id # type: ignore

# FIXME: we don't get version numbers in responses to deletion requests
self._version = new_version if new_version != -1 else self._version + 1
self._latest_version = new_version if new_version != -1 else self._latest_version + 1
# no matter what the version was before the push, pushing will result in the dataset being on the current
# version tracked by the backend
self._version = self._latest_version
self._new_records_by_record_id = {}
self._deleted_record_ids = []
self._updated_record_ids_to_new_fields = {}
Expand Down Expand Up @@ -225,6 +231,14 @@ def url(self) -> str:
# FIXME: will not work for subdomain orgs
return f"{_get_base_url()}/llm/datasets/{self._id}"

@property
def latest_version(self) -> int:
return self._latest_version

@property
def version(self) -> int:
return self._version

def _estimate_delta_size(self) -> int:
"""rough estimate (in bytes) of the size of the next batch update call if it happens"""
size = len(safe_json(self._new_records_by_record_id)) + len(safe_json(self._updated_record_ids_to_new_fields))
Expand Down Expand Up @@ -434,6 +448,7 @@ def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional
dataset_id=self._dataset._id,
records=subset_records,
description=self._dataset.description,
latest_version=self._dataset._latest_version,
version=self._dataset._version,
_dne_client=self._dataset._dne_client,
)
Expand Down
8 changes: 6 additions & 2 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,12 @@ def _on_asyncio_execute_task(self, task_data: Dict[str, Any]) -> None:
self._llmobs_context_provider.activate(llmobs_ctx)

@classmethod
def pull_dataset(cls, dataset_name: str, project_name: Optional[str] = None) -> Dataset:
ds = cls._instance._dne_client.dataset_get_with_records(dataset_name, (project_name or cls._project_name))
def pull_dataset(
cls, dataset_name: str, project_name: Optional[str] = None, version: Optional[int] = None
) -> Dataset:
ds = cls._instance._dne_client.dataset_get_with_records(
dataset_name, (project_name or cls._project_name), version
)
return ds

@classmethod
Expand Down
42 changes: 35 additions & 7 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import TypedDict
from typing import Union
from typing import cast
import urllib
from urllib.parse import quote
from urllib.parse import urlparse

Expand Down Expand Up @@ -400,7 +401,16 @@ def dataset_create(
if dataset_id is None or dataset_id == "":
raise ValueError(f"unexpected dataset state, invalid ID (is None: {dataset_id is None})")
curr_version = response_data["data"]["attributes"]["current_version"]
return Dataset(dataset_name, project, dataset_id, [], description, curr_version, _dne_client=self)
return Dataset(
name=dataset_name,
project=project,
dataset_id=dataset_id,
records=[],
description=description,
latest_version=curr_version,
version=curr_version,
_dne_client=self,
)

@staticmethod
def _get_record_json(record: Union[UpdatableDatasetRecord, DatasetRecordRaw], is_update: bool) -> JSONType:
Expand Down Expand Up @@ -458,10 +468,14 @@ def dataset_batch_update(
new_record_ids: List[str] = [r["id"] for r in data] if data else []
return new_version, new_record_ids

def dataset_get_with_records(self, dataset_name: str, project_name: Optional[str] = None) -> Dataset:
def dataset_get_with_records(
self, dataset_name: str, project_name: Optional[str] = None, version: Optional[int] = None
) -> Dataset:
project = self.project_create_or_get(project_name)
project_id = project.get("_id")
logger.debug("getting records with project ID %s for %s", project_id, project_name)
logger.debug(
"getting records with project ID %s for %s, version: %s", project_id, project_name, str(version) or "latest"
)

path = f"/api/unstable/llm-obs/v1/{project_id}/datasets?filter[name]={quote(dataset_name)}"
resp = self.request("GET", path)
Expand All @@ -480,11 +494,17 @@ def dataset_get_with_records(self, dataset_name: str, project_name: Optional[str
dataset_id = data[0]["id"]

list_base_path = f"/api/unstable/llm-obs/v1/datasets/{dataset_id}/records"

has_next_page = True
class_records: List[DatasetRecord] = []
list_path = list_base_path
page_num = 0
url_options = {}
while has_next_page:
if version:
url_options["filter[version]"] = version

list_path = f"{list_base_path}?{urllib.parse.urlencode(url_options, safe='[]')}"
logger.debug("list records page %d, request path=%s", page_num, list_path)
resp = self.request("GET", list_path, timeout=self.LIST_RECORDS_TIMEOUT)
if resp.status != 200:
raise ValueError(
Expand All @@ -504,14 +524,22 @@ def dataset_get_with_records(self, dataset_name: str, project_name: Optional[str
}
)
next_cursor = records_data.get("meta", {}).get("after")

url_options = {}
has_next_page = False
if next_cursor:
has_next_page = True
list_path = f"{list_base_path}?page[cursor]={next_cursor}"
logger.debug("next list records request path %s", list_path)
url_options["page[cursor]"] = next_cursor
page_num += 1
return Dataset(
dataset_name, project, dataset_id, class_records, dataset_description, curr_version, _dne_client=self
name=dataset_name,
project=project,
dataset_id=dataset_id,
records=class_records,
description=dataset_description,
latest_version=curr_version,
version=version or curr_version,
_dne_client=self,
)

def dataset_bulk_upload(self, dataset_id: str, records: List[DatasetRecord]):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
features:
- |
LLM Observability: Previous dataset versions can be optionally pulled by passing the ``version``
argument to ``LLMObs.pull_dataset``
- |
LLM Observability: Datasets have new properties ``version`` and ``latest_version`` to provide information on the
version of the dataset that is being worked with and the latest global version of the dataset, respectively
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
interactions:
- request:
body: '{"data": {"type": "datasets", "id": "0bb93ae7-43c4-48ff-91e4-d2817fee85fe",
"attributes": {"insert_records": [{"input": {"prompt": "What is the capital
of France?"}, "expected_output": {"answer": "Paris"}, "metadata": null}], "update_records":
[], "delete_records": []}}}'
headers:
Accept:
- '*/*'
? !!python/object/apply:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
Content-Length:
- '271'
? !!python/object/apply:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/0bb93ae7-43c4-48ff-91e4-d2817fee85fe/batch_update
response:
body:
string: '{"data":[{"id":"eaadecb4-836e-49b3-8390-212b3fffb60b","type":"datasets","attributes":{"author":{"id":"de473b30-eb9f-11e9-a77a-c7405862b8bd"},"created_at":"2025-10-21T18:26:24.929416376Z","dataset_id":"0bb93ae7-43c4-48ff-91e4-d2817fee85fe","expected_output":{"answer":"Paris"},"input":{"prompt":"What
is the capital of France?"},"updated_at":"2025-10-21T18:26:24.929416376Z","version":1}}]}'
headers:
content-length:
- '389'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Tue, 21 Oct 2025 18:26:24 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
? !!python/object/apply:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
? !!python/object/apply:multidict._multidict.istr
- Content-Length
: - '0'
? !!python/object/apply:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: GET
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/0bb93ae7-43c4-48ff-91e4-d2817fee85fe/records?filter%5Bversion%5D=420
response:
body:
string: '{"errors":[{"title":"Generic Error","detail":"invalid version: version
is greater than the current version or negative"}]}'
headers:
content-length:
- '122'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Tue, 21 Oct 2025 18:26:27 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 400
message: Bad Request
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
interactions:
- request:
body: '{"data": {"type": "datasets", "id": "4607e918-094d-4aa9-8b7a-50fa63a95b56",
"attributes": {"insert_records": [{"input": {"prompt": "What is the capital
of France?"}, "expected_output": {"answer": "Paris"}, "metadata": null}], "update_records":
[], "delete_records": []}}}'
headers:
Accept:
- '*/*'
? !!python/object/apply:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
Content-Length:
- '271'
? !!python/object/apply:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/4607e918-094d-4aa9-8b7a-50fa63a95b56/batch_update
response:
body:
string: '{"data":[{"id":"93328f7a-bfd2-4672-8b94-76b0698cd754","type":"datasets","attributes":{"author":{"id":"de473b30-eb9f-11e9-a77a-c7405862b8bd"},"created_at":"2025-10-21T18:25:23.855004356Z","dataset_id":"4607e918-094d-4aa9-8b7a-50fa63a95b56","expected_output":{"answer":"Paris"},"input":{"prompt":"What
is the capital of France?"},"updated_at":"2025-10-21T18:25:23.855004356Z","version":1}}]}'
headers:
content-length:
- '389'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Tue, 21 Oct 2025 18:25:23 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
interactions:
- request:
body: '{"data": {"type": "datasets", "id": "4607e918-094d-4aa9-8b7a-50fa63a95b56",
"attributes": {"insert_records": [{"input": {"prompt": "What is the capital
of China?"}, "expected_output": {"answer": "Beijing"}, "metadata": null}], "update_records":
[], "delete_records": []}}}'
headers:
Accept:
- '*/*'
? !!python/object/apply:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
Content-Length:
- '272'
? !!python/object/apply:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/4607e918-094d-4aa9-8b7a-50fa63a95b56/batch_update
response:
body:
string: '{"data":[{"id":"5bbd89ec-4eba-4f41-bd47-2a23a005a20a","type":"datasets","attributes":{"author":{"id":"de473b30-eb9f-11e9-a77a-c7405862b8bd"},"created_at":"2025-10-21T18:25:26.024986597Z","dataset_id":"4607e918-094d-4aa9-8b7a-50fa63a95b56","expected_output":{"answer":"Beijing"},"input":{"prompt":"What
is the capital of China?"},"updated_at":"2025-10-21T18:25:26.024986597Z","version":2}}]}'
headers:
content-length:
- '390'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Tue, 21 Oct 2025 18:25:26 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 200
message: OK
version: 1
Loading
Loading