harness: stabilize embed OOM tuning and defaults#1823
harness: stabilize embed OOM tuning and defaults#1823jioffe502 wants to merge 2 commits intoNVIDIA:mainfrom
Conversation
Port embed OOM hardening and decoupled inference microbatch controls into the graph-era harness path so bo767 runs can be tuned for stability without conflating Ray scheduling batch size. Capture OOM outlier metadata for attribution, restore graph Ray init parity knobs, and document the refactor/validation context for lead review. Signed-off-by: jioffe502 <jioffe@nvidia.com>
Greptile SummaryThis PR decouples two embedding control knobs — The harness path is functionally correct:
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/model/local/llama_nemotron_embed_1b_v2_embedder.py | Adds adaptive OOM retry with halving/streak recovery; logic is sound. warnings.warn deduplication may hide repeated OOM events from operators. Previously-flagged bare-except and stale-state issues remain open. |
| nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py | New --embed-inference-batch-size CLI arg correctly feeds into EmbedParams.inference_batch_size (line 446). Line 447 also sets embed_inference_batch_size in the dict, but this field is silently discarded by embed_text_main_text_embed() via **_: Any. |
| nemo_retriever/src/nemo_retriever/harness/config.py | embed_inference_batch_size cleanly added to HarnessConfig, TUNING_FIELDS, validation loop, and env-override map. No issues. |
| nemo_retriever/src/nemo_retriever/harness/run.py | Correctly forwards --embed-inference-batch-size only in the non-heuristics path (consistent with how all other explicit tuning params are handled). |
| nemo_retriever/src/nemo_retriever/utils/ray_resource_hueristics.py | EMBED_BATCH_SIZE comment on line 25 says 'Ray batch size AND EMBEDDING inference batch size' — now stale after this PR decouples the two knobs. |
| nemo_retriever/src/nemo_retriever/graph_ingestor.py | Adds _resolve_object_store_memory_bytes() with proper proportion/bytes env-var handling and /dev/shm capping. No issues. |
Sequence Diagram
sequenceDiagram
participant H as Harness run.py
participant GP as graph_pipeline.py
participant EP as EmbedParams
participant BA as BatchEmbedGPUActor
participant RT as embed_text_main_text_embed
participant EM as LlamaNemotronEmbedder
H->>GP: --embed-batch-size 256 --embed-inference-batch-size 32
GP->>EP: inference_batch_size=32 via line 446
GP->>EP: embed_inference_batch_size=32 via line 447 (dead field)
EP->>BA: model_dump sends both fields
BA->>RT: inference_batch_size=32 matched, embed_inference_batch_size dropped by kwargs
RT->>EM: model.embed(batch, batch_size=32)
Note over EM: OOM triggers halve and retry
Note over EM: 3 successes triggers batch size growth
EM-->>RT: CPU tensor shape N x D
RT-->>BA: DataFrame with embeddings
BA-->>GP: result batch
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
Line: 446-447
Comment:
**`embed_inference_batch_size` in `EmbedParams` is silently discarded**
`embed_text_main_text_embed()` absorbs unknown keyword arguments via `**_: Any`, so line 447's `"embed_inference_batch_size": embed_inference_batch_size or None` is set on `EmbedParams` but never read by the runtime. The operative control knob is `inference_batch_size` on line 446 — that is what flows through to `model.embed(batch, batch_size=...)`. Line 447 is dead code and creates a misleading API: a direct Python API caller who does `EmbedParams(embed_inference_batch_size=64)` will silently see no change in forward-pass batching.
Consider either removing line 447 (the value is already fully captured by line 446), or wiring `embed_inference_batch_size` as a named parameter in `embed_text_main_text_embed()` if it is meant to carry independent semantics in the future.
```suggestion
"inference_batch_size": embed_inference_batch_size or embed_batch_size or None,
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/utils/ray_resource_hueristics.py
Line: 25
Comment:
**Stale comment after decoupling**
The comment `# Ray batch size AND EMBEDDING inference batch size` now contradicts the core intent of this PR. After decoupling, `EMBED_BATCH_SIZE` is the Ray Data transport/scheduling batch size only; the inference microbatch is controlled by `embed_inference_batch_size` (default 32 in `HarnessConfig`).
```suggestion
EMBED_BATCH_SIZE = 256 # Ray Data transport/scheduling batch size (not the model forward-pass microbatch)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/model/local/llama_nemotron_embed_1b_v2_embedder.py
Line: 258-263
Comment:
**`warnings.warn` deduplicates — repeated OOM events go silent**
Python's warnings machinery shows the same `(message, category, module, lineno)` tuple only once per interpreter session by default. In a long-running embedding job that hits OOM many times across different batches, only the first OOM retry warning will surface; subsequent ones are silently swallowed. This makes it impossible for operators to track OOM frequency from logs.
Using `logger.warning(...)` instead ensures every event is emitted and respects the application's log-level configuration. The same applies to the identical `warnings.warn` call in the `RuntimeError` handler below (line ~279).
```suggestion
logger.warning(
"CUDA OOM during embedding; retrying with batch_size=%d "
"(requested=%d, %s)",
current_bs,
target_bs,
diag,
)
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (3): Last reviewed commit: "refactor(embed): remove debug-only OOM a..." | Re-trigger Greptile
| capture_path = Path(capture_path_raw).expanduser() | ||
| capture_is_dir = capture_path_raw.endswith("/") or (capture_path.exists() and capture_path.is_dir()) | ||
| if capture_is_dir: | ||
| capture_path.mkdir(parents=True, exist_ok=True) | ||
| capture_file = capture_path / f"embed_oom_outliers_pid{os.getpid()}.jsonl" | ||
| else: | ||
| capture_path.parent.mkdir(parents=True, exist_ok=True) | ||
| capture_file = capture_path | ||
|
|
||
| line = json.dumps(event, ensure_ascii=True) + "\n" | ||
| with capture_file.open("a", encoding="utf-8") as f: |
There was a problem hiding this comment.
Unguarded I/O in
_capture_oom_outlier_event can abort OOM retry
_capture_oom_outlier_event is called from inside except torch.cuda.OutOfMemoryError: and except RuntimeError: blocks where the intent is to halve the batch size and retry. If capture_path.mkdir() or capture_file.open(...) raises (disk full, permission denied, race on directory creation) the new I/O exception propagates out of the except-handler, replacing the OOM and killing the embedding job instead of retrying. The whole function must be wrapped in a top-level try/except so that any failure is silently dropped rather than breaking recovery.
try:
capture_path = Path(capture_path_raw).expanduser()
capture_is_dir = capture_path_raw.endswith("/") or (capture_path.exists() and capture_path.is_dir())
if capture_is_dir:
capture_path.mkdir(parents=True, exist_ok=True)
capture_file = capture_path / f"embed_oom_outliers_pid{os.getpid()}.jsonl"
else:
capture_path.parent.mkdir(parents=True, exist_ok=True)
capture_file = capture_path
line = json.dumps(event, ensure_ascii=True) + "\n"
with capture_file.open("a", encoding="utf-8") as f:
try:
import fcntl
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
f.write(line)
f.flush()
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except Exception:
f.write(line)
f.flush()
self._oom_capture_event_count += 1
except Exception as capture_exc:
logger.debug("OOM capture write failed (non-fatal): %s", capture_exc)Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/model/local/llama_nemotron_embed_1b_v2_embedder.py
Line: 271-281
Comment:
**Unguarded I/O in `_capture_oom_outlier_event` can abort OOM retry**
`_capture_oom_outlier_event` is called from inside `except torch.cuda.OutOfMemoryError:` and `except RuntimeError:` blocks where the intent is to halve the batch size and retry. If `capture_path.mkdir()` or `capture_file.open(...)` raises (disk full, permission denied, race on directory creation) the new I/O exception propagates out of the except-handler, replacing the OOM and killing the embedding job instead of retrying. The whole function must be wrapped in a top-level `try/except` so that any failure is silently dropped rather than breaking recovery.
```python
try:
capture_path = Path(capture_path_raw).expanduser()
capture_is_dir = capture_path_raw.endswith("/") or (capture_path.exists() and capture_path.is_dir())
if capture_is_dir:
capture_path.mkdir(parents=True, exist_ok=True)
capture_file = capture_path / f"embed_oom_outliers_pid{os.getpid()}.jsonl"
else:
capture_path.parent.mkdir(parents=True, exist_ok=True)
capture_file = capture_path
line = json.dumps(event, ensure_ascii=True) + "\n"
with capture_file.open("a", encoding="utf-8") as f:
try:
import fcntl
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
f.write(line)
f.flush()
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except Exception:
f.write(line)
f.flush()
self._oom_capture_event_count += 1
except Exception as capture_exc:
logger.debug("OOM capture write failed (non-fatal): %s", capture_exc)
```
How can I resolve this? If you propose a fix, please make it concise.| try: | ||
| tokenized = tokenizer( | ||
| list(chunk), | ||
| padding=False, | ||
| truncation=True, | ||
| max_length=max(1, int(max_length)), | ||
| return_length=True, | ||
| ) | ||
| token_lengths = tokenized.get("length") | ||
| except Exception: | ||
| token_lengths = None |
There was a problem hiding this comment.
Silent exception swallow in tokenization path
The bare except Exception: on the tokenizer call in _batch_length_summary swallows tokenization errors without logging. Since this helper is called during OOM recovery to build diagnostic information, a silent failure means the warning message emitted to the user will have tok_max=None, tok_p95=None with no indication that tokenization itself failed. Consider adding a logger.debug(...) at minimum so OOM diagnostic information gaps are traceable.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/model/local/llama_nemotron_embed_1b_v2_embedder.py
Line: 83-93
Comment:
**Silent exception swallow in tokenization path**
The bare `except Exception:` on the tokenizer call in `_batch_length_summary` swallows tokenization errors without logging. Since this helper is called during OOM recovery to build diagnostic information, a silent failure means the warning message emitted to the user will have `tok_max=None, tok_p95=None` with no indication that tokenization itself failed. Consider adding a `logger.debug(...)` at minimum so OOM diagnostic information gaps are traceable.
How can I resolve this? If you propose a fix, please make it concise.Drop per-item metadata and JSONL OOM outlier capture paths from the local embed runtime while retaining adaptive OOM retry and decoupled inference batch controls. This keeps the PR focused on core stability behavior and reduces instrumentation overhead. Signed-off-by: jioffe502 <jioffe@nvidia.com>
|
|
||
| outs: List[torch.Tensor] = [] | ||
| target_bs = max(1, int(batch_size)) | ||
| current_bs = min(target_bs, self._adaptive_batch_size) if self._adaptive_batch_size is not None else target_bs |
There was a problem hiding this comment.
Adaptive batch size reads stale state across calls
self._adaptive_batch_size is actor-level state that persists between embed() calls. On a fresh call with a new (larger) batch_size argument, line 224 initialises current_bs = min(target_bs, self._adaptive_batch_size), which silently caps the first batch at whatever the worst historical OOM level was, even if VRAM pressure has since been relieved. The streak-based growth inside the loop never propagates back across the call boundary. Consider resetting when target_bs exceeds the stored value, or document the cross-call persistence explicitly.
current_bs = (
min(target_bs, self._adaptive_batch_size)
if self._adaptive_batch_size is not None and self._adaptive_batch_size < target_bs
else target_bs
)Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/model/local/llama_nemotron_embed_1b_v2_embedder.py
Line: 224
Comment:
**Adaptive batch size reads stale state across calls**
`self._adaptive_batch_size` is actor-level state that persists between `embed()` calls. On a fresh call with a new (larger) `batch_size` argument, line 224 initialises `current_bs = min(target_bs, self._adaptive_batch_size)`, which silently caps the first batch at whatever the worst historical OOM level was, even if VRAM pressure has since been relieved. The streak-based growth inside the loop never propagates back across the call boundary. Consider resetting when `target_bs` exceeds the stored value, or document the cross-call persistence explicitly.
```python
current_bs = (
min(target_bs, self._adaptive_batch_size)
if self._adaptive_batch_size is not None and self._adaptive_batch_size < target_bs
else target_bs
)
```
How can I resolve this? If you propose a fix, please make it concise.| if not outs: | ||
| return torch.empty((0, 0), dtype=torch.float32) | ||
|
|
||
| sorted_embeddings = torch.cat(outs, dim=0) | ||
| reordered_embeddings: List[torch.Tensor | None] = [None] * len(sorted_to_original) | ||
| for sorted_idx, original_idx in enumerate(sorted_to_original): | ||
| reordered_embeddings[original_idx] = sorted_embeddings[sorted_idx] | ||
| if any(emb is None for emb in reordered_embeddings): | ||
| raise RuntimeError("Failed to reconstruct embedding order after length sorting.") | ||
| return torch.stack([emb for emb in reordered_embeddings if emb is not None], dim=0) |
There was a problem hiding this comment.
Length mismatch between
sorted_embeddings and sorted_to_original when OOM leaves partial results
If OOM is raised when current_bs <= 1 (re-raised, not retried), execution exits the while loop with outs containing embeddings for only the texts processed before the fatal OOM. torch.cat(outs) produces a tensor shorter than len(sorted_to_original), and the loop for sorted_idx, original_idx in enumerate(sorted_to_original) will raise IndexError when sorted_idx exceeds that length. The if any(emb is None …) guard never executes because the IndexError fires first. Currently unreachable because the fatal re-raise exits before this code, but it is a correctness landmine for future refactors. Add an explicit length pre-check before the indexing loop.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/model/local/llama_nemotron_embed_1b_v2_embedder.py
Line: 284-293
Comment:
**Length mismatch between `sorted_embeddings` and `sorted_to_original` when OOM leaves partial results**
If OOM is raised when `current_bs <= 1` (re-raised, not retried), execution exits the while loop with `outs` containing embeddings for only the texts processed before the fatal OOM. `torch.cat(outs)` produces a tensor shorter than `len(sorted_to_original)`, and the loop `for sorted_idx, original_idx in enumerate(sorted_to_original)` will raise `IndexError` when `sorted_idx` exceeds that length. The `if any(emb is None …)` guard never executes because the IndexError fires first. Currently unreachable because the fatal re-raise exits before this code, but it is a correctness landmine for future refactors. Add an explicit length pre-check before the indexing loop.
How can I resolve this? If you propose a fix, please make it concise.
DGX bo767 sweep evidenceRan the dedicated 6-run
Observed results:
Recommendation:
|
bo767 repeat sweep follow-up at
|
embed_inference_batch_size |
Passes | Mean PPS | PPS stdev | PPS range | Mean recall@5 |
recall@5 stdev |
OOM evidence |
|---|---|---|---|---|---|---|---|
1 |
10/10 |
152.82 |
1.38 |
151.46-156.59 |
0.8355 |
0.0006 |
none |
32 |
10/10 |
159.34 |
1.40 |
156.94-162.11 |
0.8354 |
0.0014 |
none |
64 |
10/10 |
158.64 |
1.67 |
156.68-163.34 |
0.8309 |
0.0108 |
none |
256 |
9/10 |
158.51* |
1.59* |
156.12-161.49* |
0.8124* |
0.0246* |
1 failed run, 8 OOM retries in the failed run |
* computed over the 9 successful mb=256 runs only.
Takeaways:
32is the best default candidate in this repeat sweep: highest mean PPS, no OOM retries, and stable recall.64is within observed run-to-run noise on ingest (-0.70 PPS,-0.44%vs32) and shows worse recall stability, so this does not support moving the default from32to64.1is clearly slower (-6.52 PPS,-4.09%vs32) with no stability upside.256is not a viable default candidate: one run failed, the failed run logged8CUDA OOM during embedding; retryingevents, and the successful runs still showed materially worse recall stability.- This is directionally consistent with the current HF/local embedding path not realizing reliable end-to-end gains from larger microbatches in this workload.
- The VDB tail is already de-bottlenecked (
VDBUploadOperator batch_size=64), so pushing embed microbatch higher is not buying a downstream write-path win here; in practice it mainly adds memory pressure and instability.
Recommendation / next step:
- If we want one default from the current evidence, this repeat sweep supports
embed_inference_batch_size=32atgpu_embed=0.25. - From here, I think there are two reasonable paths:
- Land this change with
32as the default and treat broader SKU validation as follow-up work. - Keep testing in this PR, but narrow it to cross-SKU validation focused on
32vs64rather than continuing to probe larger batch sizes.
- Land this change with
TLDR
This PR hardens embedding stability by separating two different control knobs:
embed_batch_sizefor Ray Data transport/scheduling, andembed_inference_batch_sizefor local model forward-pass VRAM pressure.It keeps the change focused on core runtime behavior: decoupled tuning controls plus adaptive OOM retry.
Problem
During bo767 validation, we observed long-sequence tail batches driving CUDA OOM-like failures and throughput volatility. The key discovery is that one batch knob is doing two jobs:
embed_batch_size), andinference_batch_size).That coupling makes stability tuning hard, especially when sequence lengths are heterogeneous.
What Changed
1) Decoupled controls in harness/CLI
embed_inference_batch_sizeto harness config/defaults/validation.--embed-inference-batch-sizethrough to runtime.embed_inference_batch_sizeoverembed_batch_sizefor model forward-pass batching.2) Adaptive OOM stabilization in local embed runtime
3) Runtime parity + operational controls
RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION/RAY_OBJECT_STORE_MEMORY_BYTES.Quantitative Evidence (bo767)
All runs below use
embed_inference_batch_size=32.nemo_retriever/artifacts/bo767_20260406_193158_UTC/results.jsonnemo_retriever/artifacts/bo767_20260406_202408_UTC/results.jsonnemo_retriever/artifacts/bo767_20260406_204929_UTC/results.jsonnemo_retriever/artifacts/bo767_20260408_154723_UTC/results.jsonnemo_retriever/artifacts/bo767_20260408_160637_UTC/results.jsonnemo_retriever/artifacts/bo767_20260408_172412_UTC/results.jsonnemo_retriever/artifacts/bo767_20260408_161941_UTC/results.jsonnemo_retriever/artifacts/bo767_20260408_173706_UTC/results.jsonKey Takeaways
embed_inference_batch_size=32) is stable and repeatable across runs.gpu_embedpolicy in these validations:gpu_embed=1.0cohort: ~146 PPSgpu_embed=0.25cohort: >150 PPSRecommendation
embed_inference_batch_size=32in harness presets.gpu_embedas an explicit policy knob for follow-up performance decisioning (isolation vs throughput).Why This Is Safe
32) aligns with observed stability.When To Override
embed_inference_batch_sizeonly with sustained VRAM headroom and zero OOM retry pressure.gpu_embedaccording to target objective:1.00.25) with recall guardrails.Rollout
embed_inference_batch_size=32defaults.gpu_embedperformance policy in a dedicated follow-up decision.Rollback
embed_inference_batch_sizeand/orgpu_embedvia harness CLI.Open Questions For Lead Decision
gpu_embedpolicy should define productized performance baselines?Test Plan
uv run pytest tests/test_harness_config.py tests/test_harness_run.py tests/test_resource_heuristics.pyuv run pytest tests/test_ingest_interface.py tests/test_graph_pipeline_registry.pyuv run pytest tests/test_create_local_embedder.py tests/test_multimodal_embed.py tests/test_operator_flags_and_cpu_actors.pyembed_inference_batch_size=32(artifacts listed above)