Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions .github/workflows/e2e_test-on-change.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,25 @@ on:
jobs:
e2e-tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.13']
steps:
- name: Checkout Code
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Set up PDM
uses: pdm-project/setup-pdm@v4

- name: Install Nix
uses: cachix/install-nix-action@v31
with:
python-version: ${{ matrix.python-version }}
github_access_token: ${{ secrets.GITHUB_TOKEN }}

- name: Install dependencies
run: |
pdm sync -d
- name: Run e2e tests
nix develop -c pdm sync -d

- name: Run end-to-end tests
run: |
pdm run test:e2e
nix develop -c pdm run test:e2e -o log_cli_level=DEBUG |& tee test_e2e.out

- name: Upload test_e2e.out
uses: actions/upload-artifact@v4
with:
name: test_e2e
path: test_e2e.out
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ __pycache__/
# Distribution / packaging
.Python
build/
result
develop-eggs/
dist/
downloads/
Expand All @@ -25,6 +26,7 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
/result

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down Expand Up @@ -174,4 +176,4 @@ cython_debug/
.pypirc

# Test Reports (directories)
reports-*/
reports-*/
20 changes: 20 additions & 0 deletions Dockerfile.e2e-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM docker.nix-community.org/nixpkgs/nix-flakes AS builder

# Don't stay in the root directory for this.
# https://github.com/NixOS/nix/issues/11217
WORKDIR /workspace

# Copy just enough for Nix pdm install without bringing in the entire project.
# This ensures we don't run the nix develop too often.
COPY flake.nix flake.lock pyproject.toml pdm.lock .

# Build Nix shell and install all dev PDM dependencies.
RUN nix develop path:///workspace -c pdm sync -d

# Copy the actual module and E2E tests, then install self.
# This is what will be edited during development, so we run it last.
COPY inference_perf ./inference_perf
COPY e2e ./e2e

ENTRYPOINT ["nix", "develop", "path:///workspace", "-c"]
CMD ["pdm", "run", "test:e2e"]
4 changes: 4 additions & 0 deletions e2e/testdata/models/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*
!.gitignore
!*.tar.gz
!*.tar.zst
Binary file added e2e/testdata/models/google_gemma-3-270m.tar.gz
Binary file not shown.
127 changes: 127 additions & 0 deletions e2e/tests/test_llm_d_inference_sim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
End-to-end integration testing of inference-perf using llm-d-inference-sim[1].

In order for these tests to run, you must have `llm-d-inference-sim` in your
PATH. The GitHub Actions runner will have this, but you may also install it
locally by following llm-d-inference-sim's README or by entering the Nix shell
of this repository (i.e. `nix develop`).

If your local environment is missing `llm-d-inference-sim`, tests here will
automatically be skipped.

[1]: https://github.com/llm-d/llm-d-inference-sim
"""

import pytest

from utils.llm_d_inference_sim import LLMDInferenceSimRunner
from utils.benchmark import run_benchmark_minimal
from utils.testdata import extract_tarball


TEST_MODEL_NAME = "google/gemma-3-270m"
TEST_MODEL_TARBALL = "e2e/testdata/models/google_gemma-3-270m.tar.gz"


@pytest.mark.asyncio
@pytest.mark.skipif(not LLMDInferenceSimRunner.is_available(), reason="local environment missing llm-d-inference-sim")
@pytest.mark.parametrize(
"data",
[
pytest.param(
{
"type": "mock",
},
id="data_mock",
),
pytest.param(
{
"type": "shared_prefix",
"shared_prefix": {
"num_groups": 256,
"num_prompts_per_group": 16,
"system_prompt_len": 512,
"question_len": 256,
"output_len": 256,
},
},
id="data_shared_prefix",
),
],
)
@pytest.mark.parametrize(
"load",
[
pytest.param(
{
"type": "constant",
"stages": [{"rate": 1, "duration": 5}],
"num_workers": 2,
},
id="load_constant_slow",
),
pytest.param(
{
"type": "constant",
"interval": 2,
"stages": [{"rate": 1, "duration": 5}, {"rate": 2, "duration": 5}],
"num_workers": 2,
},
id="load_constant_slow_two_stages",
),
pytest.param(
{
"type": "constant",
"stages": [{"rate": 100, "duration": 5}],
"num_workers": 2,
},
id="load_constant_fast",
),
],
)
async def test_completion_successful_run(data: dict, load: dict):
"""
Very simple inference-perf integration test that ensures a wide range of
vLLM benchmarking configurations can run successfully.
"""
model_name = TEST_MODEL_NAME
model_path = extract_tarball(TEST_MODEL_TARBALL)

async with LLMDInferenceSimRunner(model_name, port=18000) as sim:
result = await run_benchmark_minimal(
{
"data": data,
"load": load,
"api": {
"type": "completion",
"streaming": True,
},
"server": {
"type": "vllm",
"model_name": model_name,
"base_url": f"http://{sim.host}:{sim.port}",
"ignore_eos": True,
},
"tokenizer": {
"pretrained_model_name_or_path": str(model_path),
},
"report": {
"request_lifecycle": {
"summary": True,
"per_stage": True,
"per_request": True,
},
},
}
)

assert result.success, "Benchmark failed"
assert result.reports, "No reports generated from benchmark"

requests_report = result.reports["per_request_lifecycle_metrics.json"]
assert requests_report, "Missing requests report"
assert len(requests_report) > 1

summary_report = result.reports["summary_lifecycle_metrics.json"]
assert summary_report, "Missing summary report"
assert summary_report["successes"]["count"] > 1
5 changes: 3 additions & 2 deletions e2e/tests/test_mock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from utils.benchmark import run_benchmark_minimal


def test_simple_mock_client_benchmark():
result = run_benchmark_minimal("e2e/configs/e2e_simple_mock_client.yaml", timeout_sec=None)
@pytest.mark.asyncio
async def test_simple_mock_client_benchmark():
result = await run_benchmark_minimal("e2e/configs/e2e_simple_mock_client.yaml", timeout_sec=None)
assert result.success, "Benchmark failed"
assert result.reports, "No reports generated from benchmark"
assert result.reports["per_request_lifecycle_metrics.json"], "Missing requests report"
Expand Down
91 changes: 60 additions & 31 deletions e2e/utils/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import os
import shlex
import subprocess
import asyncio
import aiofiles
import aiofiles.os
import tempfile
import yaml
import signal
import logging
import textwrap
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, List, Union
Expand All @@ -18,23 +21,36 @@ class BenchmarkResult:

success: bool # True if process exit code == 0 and not timed out
timed_out: bool # True if we hit timeout and killed the process
returncode: int # Raw process return code (or -9/-15 on kill)
return_code: int # Raw process return code (or -9/-15 on kill)
stdout: str # Combined stdout/stderr text
work_dir: Path # Working directory used for the run
reports: Optional[Dict[str, Any]] # Parsed json for reports if present


def _process_yaml_config(config: Union[str, Path, Dict[str, Any]], out_dir: Path) -> Path:
async def _process_yaml_config(config: Union[str, Path, Dict[str, Any]], out_dir: Path) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
cfg_path = out_dir / "config_input.yaml"

if isinstance(config, (str, Path)):
src = Path(config)
if not src.exists():
raise FileNotFoundError(f"Config file not found: {src}")
config = yaml.safe_load(src.read_text(encoding="utf-8"))

# Overwrite output path to temporaty folder
# if config is a string pointing to an existing path, then convert it to
# Path.
if isinstance(config, str):
try:
await aiofiles.os.stat(config)
config = Path(config)
except Exception:
pass

# if config is a Path, then open it as a file.
if isinstance(config, Path):
async with aiofiles.open(config, mode="r") as file:
config = await file.read()

# if config is (still) a string, then directly parse it as YAML.
if isinstance(config, str):
config = yaml.safe_load(config)
assert isinstance(config, dict)

# Overwrite output path to temporary folder
config["storage"] = {"local_storage": {"path": out_dir.as_posix()}}

cfg_path.write_text(
Expand All @@ -52,7 +68,7 @@ def _find_report_files(path: Path) -> Optional[List[Path]]:
return candidates


def run_benchmark_minimal(
async def run_benchmark_minimal(
config: Union[str, Path, Dict[str, Any]],
*,
work_dir: Optional[Union[str, Path]] = None,
Expand All @@ -70,46 +86,59 @@ def run_benchmark_minimal(
- marks `timed_out=True`, returns collected stdout up to kill.
"""
wd = Path(work_dir) if work_dir else Path(tempfile.mkdtemp(prefix="inference-perf-e2e-"))
cfg_path = _process_yaml_config(config, wd)
cfg_path = await _process_yaml_config(config, wd)

env = os.environ.copy()
if extra_env:
env.update({k: str(v) for k, v in extra_env.items()})

cmd = f"{shlex.quote(executable)} --config_file {shlex.quote(str(cfg_path))} --log-level DEBUG"
args = [executable, "--config_file", str(cfg_path), "--log-level", "DEBUG"]
logger.debug(f"starting inference-perf, {args=}")

proc = await asyncio.create_subprocess_exec(
*args,
cwd=str(wd),
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
preexec_fn=os.setpgrp, # use process groups
)
logger.debug("inference-perf started!")

stdout = ""
timed_out = False
return_code = -1
try:
proc = subprocess.run(
cmd,
cwd=str(wd),
env=env,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=timeout_sec,
)
stdout = proc.stdout
stdout_bytes, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout_sec)
stdout = stdout_bytes.decode()
logger.info(f"benchmark status {proc.returncode}, output:\n{textwrap.indent(stdout, ' | ')}")
assert proc.returncode is not None
return_code = proc.returncode
except subprocess.TimeoutExpired as e:
except asyncio.exceptions.TimeoutError:
timed_out = True
stdout = e.stdout
return_code = -9
finally:
try:
# kill whole process group to ensure that forked workers are also
# terminated.
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGTERM)
# wait for process to finish cleaning up.
await proc.wait()
except ProcessLookupError:
pass

success = (return_code == 0) and (not timed_out)

logger.info("Benchmark output:\n%s", stdout)

# Attempt to read report.json (optional)
report_path = _find_report_files(wd)
reports = {report.name: json.loads(report.read_text(encoding="utf-8")) for report in report_path} if report_path else None

return BenchmarkResult(
success=success,
timed_out=timed_out,
returncode=return_code,
stdout=stdout or "",
return_code=return_code,
stdout=stdout,
work_dir=wd,
reports=reports,
)
Loading