Skip to content

feat(0.4.0): dataset upload + streaming#2

Merged
trentleslie merged 11 commits into
mainfrom
feat/0.4.0-dataset-upload-streaming
Apr 15, 2026
Merged

feat(0.4.0): dataset upload + streaming#2
trentleslie merged 11 commits into
mainfrom
feat/0.4.0-dataset-upload-streaming

Conversation

@trentleslie
Copy link
Copy Markdown
Owner

Summary

Adds file-based mapping backed by POST /map/dataset/stream (NDJSON). Two public entry points:

  • BioMapperClient.map_dataset_file_iter(path, ...) — async method yielding MappingResult per NDJSON line as it arrives. For live dashboards / per-result streaming (Entity Linker UI).
  • map_dataset_file_sync(path, *, progress=False, on_result=None, total_hint=None, ...) — free function in new src/ddharmon/dataset.py. Blocks, returns a completed DatasetMappingResult. For notebooks + scripts.

Plan: ~/.claude/plans/2026-04-14-001-feat-ddharmon-0.4.0-dataset-upload-streaming-plan.md

What changed

Area Change
src/ddharmon/models.py New DatasetMappingResult with .raise_for_error() (mirrors httpx.Response.raise_for_status)
src/ddharmon/client.py New async method map_dataset_file_iter; contextlib.AsyncExitStack ensures stream closes before file handle on cancellation; _dataset_query_params rejects commas in column/annotator names to prevent silent wire-format splitting
src/ddharmon/dataset.py New module — single public function map_dataset_file_sync. Explicit __anext__ loop so on_result exceptions propagate unwrapped while transport errors get captured into .error
src/ddharmon/__init__.py + pyproject.toml Version bump 0.3.0 to 0.4.0; export new symbols
README + tutorial notebook New "Dataset upload" sections; notebook executes end-to-end against live API
tests/fixtures/metabolites_sample.tsv Shared 10-row fixture for notebook + any file-based tests

Scope tightening from the plan draft

Review (scope-guardian, product-lens, adversarial) surfaced that the originally-proposed third entry point (map_dataset_file async collector) had no named user — the UI uses the iterator directly, and the notebook uses the sync wrapper. Dropped, along with free-function wrappers around it. dataset.py contains only map_dataset_file_sync. Async callers wanting a list write [r async for r in c.map_dataset_file_iter(...)] — a one-liner.

Live-API spot check findings (affected the implementation)

The plan had three Deferred-to-Implementation questions; ran a 5-row upload against the live API to resolve:

Question Finding
provided_id_columns wire format Comma-separated accepted as assumed
Per-NDJSON-line shape Slimmer than batch. Fields are {row_index, name, chosen_kg_id, curies, kg_ids} — no assigned_ids block. Our permissive RawApiResult handles the extra row_index fine (silently ignored); the missing assigned_ids means confidence_score is always None for dataset results. Docstring updated to warn callers.
Terminal summary line None emitted. DatasetMappingResult.stats stays {} — documented
Content-Type Server sends application/x-ndjson (OpenAPI schema declared application/json); we read aiter_lines() regardless so this doesn't affect behavior
Heartbeat envelopes None observed at 5 rows. Pre-authorized fallback remains for larger datasets if they appear

Exception-handling contract

The sync wrapper uses a streaming_started boolean flag (not exception type) to discriminate:

  • Initial-request errors (before any row yielded) — propagate unwrapped: BioMapperAuthError, BioMapperRateLimitError, BioMapperServerError, BioMapperTimeoutError, httpx.HTTPStatusError
  • Mid-stream transport errors — captured into DatasetMappingResult.error; partial results preserved in .results
  • Parse errors on individual lines — yielded as per-row MappingResult.error values; stream continues
  • Callback (on_result) exceptions — propagate unwrapped, replace return value (partial results NOT returned)
  • asyncio.CancelledError — always propagates unwrapped

Flag-based discriminator is exception-type-agnostic — future refactors that wrap mid-stream timeouts as typed errors won't silently flip semantics.

Quality

  • 138 tests pass (57 new, 9 preexisting skips)
  • mypy --strict clean across all 10 source files
  • Coverage: 87.55% overall — 94% dataset.py, 91% client.py, 99% models.py
  • Ruff clean on changed files (3 preexisting warnings in client.py from 0.3.0 — not addressed here)
  • Tutorial notebook executes end-to-end against live API
  • Live-API spot check verified against biomapper.expertintheloop.io/api/v1/map/dataset/stream with a 5-row TSV

Test plan

  • Unit tests: poetry run pytest — 138 pass
  • Type check: poetry run mypy src/ddharmon/ — clean
  • Notebook execute: poetry run jupyter nbconvert --to notebook --execute — clean
  • Live-API spot check: 5-row TSV uploaded via map_dataset_file_iter, verified row order, resolution, no spurious <unknown> parse errors
  • Soak test: Entity Linker UI upgrades to 0.4.0 on its own cadence; per-release tracking issue filed (see checklist below)

Pre-merge checklist

  • Filed tracking issue in Entity Linker UI repo
  • Reviewed by: @trentleslie

Generated with Claude Code.

Mirrors httpx.Response.raise_for_status so callers can opt into
exception semantics on mid-stream failures instead of having to
remember to check .error before consuming .results.
Uploads TSV/CSV via multipart to POST /map/dataset/stream and yields
MappingResult per NDJSON line. Uses AsyncExitStack to ensure the HTTP
stream closes before the file handle, so cancellation mid-handshake
cannot leave httpx reading from a closed fh. Rejects commas in column
names and annotator lists at the boundary to prevent silent wire-format
splitting.
Live-API spot check confirmed the /map/dataset/stream endpoint emits a
slimmer per-row shape than /map/batch — no assigned_ids block means
no scores for downstream extraction.
Blocking sync entry point that wraps map_dataset_file_iter with tqdm
progress, on_result callback, and asyncio.run bridge.

Uses explicit __anext__ iteration so the transport-error try/except
wraps only the iterator call — on_result exceptions are positioned
outside it and propagate unwrapped. contextlib.aclosing guarantees
the generator's file/stream contexts unwind on early break or
callback-raised exception rather than waiting for GC.

Initial-request errors propagate unchanged; mid-stream failures are
captured into DatasetMappingResult.error. The discriminator is a
streaming_started flag rather than exception type, so future
timeout-wrapping refactors cannot turn a captured mid-stream failure
into a propagated one.
- README gets a 20-line Dataset upload subsection with map_dataset_file_sync
  example including raise_for_error, highlighting the required
  name_column / provided_id_columns contract.
- Notebook adds two cells: sync + tqdm + raise_for_error teaches the
  check-before-use idiom; async iterator demonstrates per-result
  streaming against the live API.
- Fixture lives at tests/fixtures/metabolites_sample.tsv (10 rows)
  so both test suite and notebook share a single canonical sample.
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 15, 2026

Greptile Summary

This PR introduces dataset file streaming over POST /map/dataset/stream (NDJSON), adding BioMapperClient.map_dataset_file_iter (async iterator) and map_dataset_file_sync (sync wrapper) as the two public entry points. The implementation is well-structured, with a clear exception-handling contract (flag-based discriminator for initial vs. mid-stream errors), proper resource cleanup via AsyncExitStack + aclosing, and solid test coverage at 87.55%.

  • The timeout: float annotation on BioMapperClient.__init__ conflicts with the map_dataset_file_iter docstring that explicitly recommends BioMapperClient(timeout=httpx.Timeout(read=None, connect=30.0)) — users following this guidance with mypy strict-mode enabled will hit a type error.

Confidence Score: 5/5

Safe to merge; the only finding is a minor type annotation mismatch that does not affect runtime behavior.

All findings are P2. The exception-handling contract, resource cleanup, and comma-validation guard are correctly implemented and well-tested. The timeout annotation issue is a cosmetic/mypy concern with no runtime impact.

src/ddharmon/client.py — the timeout annotation on init is the only outstanding item.

Important Files Changed

Filename Overview
src/ddharmon/client.py New map_dataset_file_iter and helpers added; timeout: float annotation on __init__ is narrower than the runtime behavior and documented usage (httpx.Timeout is also accepted).
src/ddharmon/dataset.py New module implementing map_dataset_file_sync; exception-handling contract (streaming_started flag, aclosing, on_result propagation) is correct and well-tested.
src/ddharmon/models.py New DatasetMappingResult with raise_for_error() mirrors httpx.Response.raise_for_status; implementation is clean.
tests/test_dataset.py Thorough coverage of happy-path, progress, callback semantics, error cases, and kwarg forwarding — including the important callback-vs-transport exception disambiguation.
tests/test_client.py New TestMapDatasetFileIter class added alongside existing batch tests; respx mocking, comma-validation, and multipart content-type coverage all present.
tests/conftest.py Shared helpers (make_ndjson_body, TruncatingByteStream, make_truncating_stream) cleanly support new streaming tests.
.github/workflows/ci.yml New CI workflow covering Python 3.11 and 3.12 with ruff, mypy --strict, and pytest; no issues.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant map_dataset_file_sync
    participant BioMapperClient
    participant map_dataset_file_iter
    participant BioMapper API

    Caller->>map_dataset_file_sync: path, name_column, ...
    map_dataset_file_sync->>BioMapperClient: async with (asyncio.run)
    BioMapperClient->>map_dataset_file_iter: aclosing(gen)
    map_dataset_file_iter->>BioMapper API: POST /map/dataset/stream (multipart)
    BioMapper API-->>map_dataset_file_iter: 200 OK + NDJSON stream

    loop per NDJSON line
        BioMapper API-->>map_dataset_file_iter: line
        map_dataset_file_iter-->>map_dataset_file_sync: yield MappingResult
        map_dataset_file_sync->>map_dataset_file_sync: append + streaming_started=True
        opt on_result
            map_dataset_file_sync->>Caller: on_result(r) callback
        end
    end

    alt clean finish (StopAsyncIteration)
        map_dataset_file_sync-->>Caller: DatasetMappingResult(results, error=None)
    else mid-stream transport error (streaming_started=True)
        map_dataset_file_sync-->>Caller: DatasetMappingResult(results, error=str(exc))
    else initial-request error (streaming_started=False)
        map_dataset_file_sync-->>Caller: raises unwrapped (BioMapperAuthError, etc.)
    end
Loading

Fix All in Claude Code

Reviews (2): Last reviewed commit: "ci: add GitHub Actions workflow + clean ..." | Re-trigger Greptile

Comment thread src/ddharmon/dataset.py Outdated
trentleslie and others added 5 commits April 15, 2026 07:50
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Retain comments while modifying iterator handling.
The previous 'Fix comment formatting' commit de-indented the while loop
by one level too many, placing it outside the async with block. Python
refused to parse the file (IndentationError: expected an indented block
after 'with' statement on line 172). Re-indent the loop and its comments
to sit inside the async with, restoring import and test parity.
Workflow runs ruff, mypy --strict, and pytest on every push and pull
request across all branches, with a matrix over Python 3.11 and 3.12
(matching pyproject classifiers). Concurrency group cancels superseded
runs on the same ref so pushing again doesn't double-queue.

Also cleans up 13 preexisting ruff warnings so CI lands green on the
first run:
- Auto-fixed 9 (unused math import in test_export.py; 8 UP037
  quoted-type-annotations in test_metabolon.py now that
  from __future__ import annotations is effective).
- Refactored try/except ValueError: pass in _raise_for_status to
  contextlib.suppress(ValueError) for clarity.
- Kept three idiomatic uses of typing.Any with short noqa justifications:
  **httpx_kwargs forwarded verbatim to httpx.AsyncClient; *args on
  __aexit__ (the standard (exc_type, exc, tb) tuple); and
  results_to_dataframe return type (pandas is an optional extra).
@trentleslie trentleslie merged commit 2c5ec6a into main Apr 15, 2026
5 checks passed
@trentleslie trentleslie deleted the feat/0.4.0-dataset-upload-streaming branch April 15, 2026 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant