Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions configs/experimental/dynamo/hendrycks_math_qwen4b_aime25.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
max_steps = 500
seq_len = 2048
max_async_level = 1

[wandb]
project = "hendrycks-math-qwen4b"
group = "dynamo-vllm-qwen4b-aime25-500-bs64"
name = "dynamo-qwen4b-aime25-500-bs64"

[model]
name = "Qwen/Qwen3-4B-Instruct-2507"

[orchestrator]
batch_size = 64
max_inflight_rollouts = 64
rollouts_per_example = 4
num_train_workers = 8
max_off_policy_steps = 8
use_token_client = false

[orchestrator.train.sampling]
temperature = 1.0
repetition_penalty = 1.0
max_completion_tokens = 1024
min_tokens = 0
extra_body = { top_k = -1, min_p = 0.0 }

[[orchestrator.train.env]]
id = "math-env"
name = "hendrycks-math"
args = { dataset_name = "PrimeIntellect/Hendrycks-Math", dataset_subset = "default", math_verify_max_workers = 128, math_verify_timeout = 60 }

[orchestrator.buffer]
easy_threshold = 1.0
hard_threshold = 0.0

[orchestrator.eval]
interval = 100
eval_base_model = false
skip_eval_on_resume = true

[orchestrator.eval.sampling]
max_completion_tokens = 1024

[[orchestrator.eval.env]]
id = "aime2025"
name = "aime2025"
num_examples = 30
rollouts_per_example = 4
interval = 100

[trainer]

[inference]
backend = "dynamo"
vllm_extra = { max_num_seqs = 64 }

[inference.dynamo]
deploy_router = true
router_mode = "round-robin"
discovery_backend = "file"
request_plane = "tcp"
system_port = 8081
216 changes: 216 additions & 0 deletions packages/prime-rl-configs/src/prime_rl/configs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@

# TODO: Set thinking/ solution budget

InferenceBackend: TypeAlias = Literal["vllm", "dynamo"]
DynamoRouterMode: TypeAlias = Literal[
"round-robin",
"random",
"kv",
"direct",
"power-of-two",
"least-loaded",
"device-aware-weighted",
]


class ServerConfig(BaseConfig):
"""Configures the inference server."""
Expand Down Expand Up @@ -120,6 +131,78 @@ class WeightBroadcastConfig(BaseConfig):
)


class DynamoConfig(BaseConfig):
"""Configures the experimental Dynamo backend."""

deploy_router: Annotated[
bool,
Field(
description=(
"Launch Dynamo's frontend/router from the inference entrypoint. Disable only when an "
"external Dynamo frontend/router is already deployed and orchestrator.client.base_url points at it."
),
),
] = True

namespace: Annotated[
str,
Field(description="Dynamo namespace used by the frontend and worker for service discovery."),
] = "dynamo"

discovery_backend: Annotated[
Literal["file", "etcd", "kubernetes", "mem"],
Field(description="Dynamo discovery backend. Use 'file' for local single-node runs without etcd/NATS."),
] = "file"

request_plane: Annotated[
Literal["tcp", "nats", "http"],
Field(description="Dynamo request plane used between the frontend and worker."),
] = "tcp"

router_mode: Annotated[
DynamoRouterMode,
Field(description="Dynamo frontend router mode used for OpenAI chat-completions traffic."),
] = "round-robin"

min_initial_workers: Annotated[
int | None,
Field(
ge=0,
description="Optional Dynamo frontend minimum worker count before the router starts serving requests.",
),
] = None

event_plane: Annotated[
Literal["nats", "zmq"] | None,
Field(description="Dynamo event plane. If None, Dynamo derives it from discovery_backend."),
] = None

system_port: Annotated[
int,
Field(ge=1, le=65535, description="Dynamo worker system-server port for /health and /engine routes."),
] = 8081

use_vllm_tokenizer: Annotated[
bool,
Field(
description=(
"Use Dynamo's text-in/text-out vLLM worker path. prime-rl defaults this off so the "
"Dynamo frontend tokenizes OpenAI requests before dispatching them to the worker."
),
),
] = False

frontend_extra: Annotated[
dict[str, Any],
Field(description="Extra CLI arguments for python -m dynamo.frontend."),
] = {}

worker_extra: Annotated[
dict[str, Any],
Field(description="Extra CLI arguments for the Dynamo vLLM worker."),
] = {}


class KVCacheOffloadConfig(BaseModel):
"""CPU KV cache offloading for vLLM inference workers."""

Expand Down Expand Up @@ -252,6 +335,16 @@ class InferenceExperimentalConfig(BaseConfig):
class InferenceConfig(BaseConfig):
"""Configures inference."""

backend: Annotated[
InferenceBackend,
Field(
description=(
"Inference server backend to launch. vLLM is the default; Dynamo is an experimental "
"backend for OpenAI-compatible rollouts with prime-rl weight updates."
)
),
] = "vllm"

# The server configuration
server: ServerConfig = ServerConfig()

Expand Down Expand Up @@ -384,6 +477,11 @@ class InferenceConfig(BaseConfig):
WeightBroadcastConfig()
)

dynamo: Annotated[
DynamoConfig,
Field(description="Experimental Dynamo-specific settings used when backend='dynamo'."),
] = DynamoConfig()

kv_cache_offload: Annotated[
KVCacheOffloadConfig | None,
Field(
Expand Down Expand Up @@ -447,6 +545,17 @@ def validate_multi_node_requires_slurm(self):
raise ValueError("Must use SLURM for multi-node deployment.")
return self

@model_validator(mode="after")
def validate_backend_support(self):
if self.backend != "dynamo":
return self

if self.deployment.type != "single_node":
raise ValueError("inference.backend='dynamo' currently supports only single_node deployment.")
if self.kv_cache_offload is not None:
raise ValueError("inference.backend='dynamo' does not support prime-rl kv_cache_offload plumbing yet.")
return self

@model_validator(mode="after")
def auto_setup_kv_cache_offload(self):
if self.kv_cache_offload is not None:
Expand Down Expand Up @@ -603,3 +712,110 @@ def to_vllm(self) -> Namespace:
delattr(namespace, "rope_scaling")

return namespace

def to_dynamo_frontend(self) -> Namespace:
"""Convert InferenceConfig to Dynamo frontend CLI namespace."""
namespace = Namespace()
to_frontend = {
"server.host": "http_host",
"server.port": "http_port",
"dynamo.namespace": "namespace",
"dynamo.discovery_backend": "discovery_backend",
"dynamo.request_plane": "request_plane",
"dynamo.router_mode": "router_mode",
"dynamo.min_initial_workers": "min_initial_workers",
"dynamo.event_plane": "event_plane",
"model.name": "model_name",
}

for config_key, frontend_key in to_frontend.items():
value = rgetattr(self, config_key.replace("-", "_"))
rsetattr(namespace, frontend_key, value)

namespace.dyn_chat_processor = "vllm"

for key, value in self.dynamo.frontend_extra.items():
setattr(namespace, key, value)

for optional_key in ("http_host", "event_plane", "model_name", "min_initial_workers"):
value = getattr(namespace, optional_key, None)
if value is None:
delattr(namespace, optional_key)

return namespace

def to_dynamo_vllm(self) -> Namespace:
"""Convert InferenceConfig to Dynamo vLLM worker CLI namespace."""
namespace = Namespace()
to_dynamo_vllm = {
"dynamo.namespace": "namespace",
"dynamo.discovery_backend": "discovery_backend",
"dynamo.request_plane": "request_plane",
"dynamo.event_plane": "event_plane",
"dynamo.use_vllm_tokenizer": "use_vllm_tokenizer",
"model.name": "model",
"model.dtype": "dtype",
"model.max_model_len": "max_model_len",
"model.enforce_eager": "enforce_eager",
"model.trust_remote_code": "trust_remote_code",
"model.rope_scaling": "rope_scaling",
"parallel.tp": "tensor_parallel_size",
"parallel.dp": "data_parallel_size",
"data_parallel_size_local": "data_parallel_size_local",
"data_parallel_rpc_port": "data_parallel_rpc_port",
"enable_lora": "enable_lora",
"enable_prefix_caching": "enable_prefix_caching",
"max_loras": "max_loras",
"max_cpu_loras": "max_cpu_loras",
"max_lora_rank": "max_lora_rank",
"lora_target_modules": "lora_target_modules",
"gpu_memory_utilization": "gpu_memory_utilization",
"enable_return_routed_experts": "enable_return_routed_experts",
"enable_expert_parallel": "enable_expert_parallel",
"all2all_backend": "all2all_backend",
"enable_eplb": "enable_eplb",
"enable_dbo": "enable_dbo",
"seed": "seed",
}

for config_key, dynamo_key in to_dynamo_vllm.items():
value = rgetattr(self, config_key.replace("-", "_"))
rsetattr(namespace, dynamo_key, value)

namespace.served_model_name = self.model.name

if self.model.tool_call_parser not in (None, "auto"):
namespace.dyn_tool_call_parser = self.model.tool_call_parser
if self.model.reasoning_parser is not None:
namespace.dyn_reasoning_parser = self.model.reasoning_parser

namespace.logprobs_mode = "processed_logprobs"

if self.enable_fp32_lm_head:
existing = getattr(namespace, "additional_config", None) or {}
existing["fp32_lm_head"] = True
rsetattr(namespace, "additional_config", existing)

for key, value in self.vllm_extra.items():
setattr(namespace, key, value)
for key, value in self.dynamo.worker_extra.items():
setattr(namespace, key, value)

for optional_key in (
"event_plane",
"rope_scaling",
"data_parallel_size_local",
"data_parallel_rpc_port",
"enable_prefix_caching",
"max_lora_rank",
"lora_target_modules",
"additional_config",
):
if not hasattr(namespace, optional_key):
continue

value = getattr(namespace, optional_key)
if value is None:
delattr(namespace, optional_key)

return namespace
Original file line number Diff line number Diff line change
Expand Up @@ -1339,5 +1339,6 @@ def resolve_env_config(self):
if is_vllm:
env.sampling.extra_body.setdefault("top_k", -1)
env.sampling.extra_body.setdefault("min_p", 0.0)
env.sampling.extra_body.setdefault("return_token_ids", True)
if self.use_token_client:
env.sampling.extra_body.setdefault("return_token_ids", True)
return self
29 changes: 29 additions & 0 deletions packages/prime-rl-configs/src/prime_rl/configs/rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,35 @@ def auto_setup_weight_broadcast(self):

return self

@model_validator(mode="after")
def auto_setup_dynamo_backend(self):
"""Configure orchestrator compatibility for the experimental Dynamo backend."""
if self.inference is None or self.inference.backend != "dynamo":
return self

self.orchestrator.client.admin_backend = "dynamo"
if self.orchestrator.client.admin_base_url is None:
self.orchestrator.client.admin_base_url = [f"http://localhost:{self.inference.dynamo.system_port}"]

if self.orchestrator.use_renderer:
raise ValueError(
"inference.backend='dynamo' does not support orchestrator.use_renderer because prime-rl's "
"renderer client targets the vLLM-only /v1/generate endpoint."
)

if self.orchestrator.use_token_client:
if "use_token_client" in self.orchestrator.model_fields_set:
raise ValueError(
"inference.backend='dynamo' does not support orchestrator.use_token_client because Dynamo "
"does not expose prime-rl's vLLM-only /v1/chat/completions/tokens endpoint."
)
self.orchestrator.use_token_client = False

for env in self.orchestrator.train.env:
env.sampling.extra_body.pop("return_token_ids", None)

return self

@model_validator(mode="after")
def validate_eplb_requires_quantized_weight_transfer(self):
if self.inference is None or not self.inference.enable_eplb:
Expand Down
6 changes: 6 additions & 0 deletions packages/prime-rl-configs/src/prime_rl/configs/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def resolve_project_dir(self):


ServerType = Literal["vllm", "openai"]
AdminBackend = Literal["vllm", "dynamo"]


class VLMConfig(BaseConfig):
Expand Down Expand Up @@ -310,6 +311,11 @@ class ClientConfig(BaseConfig):
),
] = None

admin_backend: Annotated[
AdminBackend,
Field(description="Backend API exposed by admin_base_url/base_url for weight updates and health checks."),
] = "vllm"

elastic: Annotated[
ElasticConfig | None,
Field(
Expand Down
Loading
Loading