Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/docs/reference/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ For more details on the options below, refer to the [server deployment](../guide
- `DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS } – Maximum age of instance health checks.
- `DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS } – Minimum time interval between consecutive health checks of the same instance.

<!--
TODO: uncomment after dropping DSTACK_FF_EVENTS

- `DSTACK_SERVER_EVENTS_TTL_SECONDS` { #DSTACK_SERVER_EVENTS_TTL_SECONDS } - Maximum age of event records. Set to `0` to disable event storage. Defaults to 30 days.
-->

??? info "Internal environment variables"
The following environment variables are intended for development purposes:

Expand Down
68 changes: 68 additions & 0 deletions src/dstack/_internal/core/models/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import uuid
from datetime import datetime
from enum import Enum
from typing import Annotated, Optional

from pydantic import Field

from dstack._internal.core.models.common import CoreModel
from dstack._internal.utils.common import list_enum_values_for_annotation


class EventTargetType(str, Enum):
PROJECT = "project"
USER = "user"
FLEET = "fleet"
INSTANCE = "instance"
RUN = "run"
JOB = "job"


class EventTarget(CoreModel):
type: Annotated[
str, # not using EventTargetType to allow adding new types without breaking compatibility
Field(
description=(
f"Type of the target entity."
f" One of: {list_enum_values_for_annotation(EventTargetType)}"
)
),
]
project_id: Annotated[
Optional[uuid.UUID],
Field(
description=(
"ID of the project the target entity belongs to,"
" or `null` for target types not bound to a project (e.g., users)"
)
),
]
id: Annotated[uuid.UUID, Field(description="ID of the target entity")]
name: Annotated[str, Field(description="Name of the target entity")]


class Event(CoreModel):
id: uuid.UUID
message: str
recorded_at: datetime
actor_user_id: Annotated[
Optional[uuid.UUID],
Field(
description=(
"ID of the user who performed the action that triggered the event,"
" or `null` if the action was performed by the system"
)
),
]
actor_user: Annotated[
Optional[str],
Field(
description=(
"Name of the user who performed the action that triggered the event,"
" or `null` if the action was performed by the system"
)
),
]
targets: Annotated[
list[EventTarget], Field(description="List of entities affected by the event")
]
4 changes: 3 additions & 1 deletion src/dstack/_internal/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dstack._internal.server.db import get_db, get_session_ctx, migrate
from dstack._internal.server.routers import (
backends,
events,
files,
fleets,
gateways,
Expand Down Expand Up @@ -65,7 +66,7 @@
get_client_version,
get_server_client_error_details,
)
from dstack._internal.settings import DSTACK_VERSION
from dstack._internal.settings import DSTACK_VERSION, FeatureFlags
from dstack._internal.utils.logging import get_logger
from dstack._internal.utils.ssh import check_required_ssh_version

Expand Down Expand Up @@ -228,6 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True):
app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"])
app.include_router(prometheus.router)
app.include_router(files.router)
app.include_router(events.root_router, include_in_schema=FeatureFlags.EVENTS)

@app.exception_handler(ForbiddenError)
async def forbidden_error_handler(request: Request, exc: ForbiddenError):
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/server/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dstack._internal.server import settings
from dstack._internal.server.background.tasks.process_compute_groups import process_compute_groups
from dstack._internal.server.background.tasks.process_events import delete_events
from dstack._internal.server.background.tasks.process_fleets import process_fleets
from dstack._internal.server.background.tasks.process_gateways import (
process_gateways,
Expand Down Expand Up @@ -32,6 +33,7 @@
process_terminating_jobs,
)
from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes
from dstack._internal.settings import FeatureFlags

_scheduler = AsyncIOScheduler()

Expand Down Expand Up @@ -69,6 +71,8 @@ def start_background_tasks() -> AsyncIOScheduler:
_scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1))
_scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1)
_scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1)
if FeatureFlags.EVENTS:
_scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1)
if settings.ENABLE_PROMETHEUS_METRICS:
_scheduler.add_job(
collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1
Expand Down
17 changes: 17 additions & 0 deletions src/dstack/_internal/server/background/tasks/process_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta

from sqlalchemy import delete

from dstack._internal.server import settings
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import EventModel
from dstack._internal.server.utils import sentry_utils
from dstack._internal.utils.common import get_current_datetime


@sentry_utils.instrument_background_task
async def delete_events():
cutoff = get_current_datetime() - timedelta(seconds=settings.SERVER_EVENTS_TTL_SECONDS)
stmt = delete(EventModel).where(EventModel.recorded_at < cutoff)
async with get_session_ctx() as session:
await session.execute(stmt)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Add events and event_targets

Revision ID: 22d74df9897e
Revises: 5fd659afca82
Create Date: 2025-12-04 20:56:08.003504

"""

import sqlalchemy as sa
import sqlalchemy_utils
from alembic import op

import dstack._internal.server.models

# revision identifiers, used by Alembic.
revision = "22d74df9897e"
down_revision = "5fd659afca82"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"events",
sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
sa.Column("message", sa.Text(), nullable=False),
sa.Column("recorded_at", dstack._internal.server.models.NaiveDateTime(), nullable=False),
sa.Column(
"actor_user_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True
),
sa.ForeignKeyConstraint(
["actor_user_id"],
["users.id"],
name=op.f("fk_events_actor_user_id_users"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_events")),
)
with op.batch_alter_table("events", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_events_actor_user_id"), ["actor_user_id"], unique=False
)
batch_op.create_index(batch_op.f("ix_events_recorded_at"), ["recorded_at"], unique=False)

op.create_table(
"event_targets",
sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
sa.Column("event_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
sa.Column(
"entity_project_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True
),
sa.Column("entity_type", sa.String(length=100), nullable=False),
sa.Column("entity_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
sa.Column("entity_name", sa.String(length=200), nullable=False),
sa.ForeignKeyConstraint(
["entity_project_id"],
["projects.id"],
name=op.f("fk_event_targets_entity_project_id_projects"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["event_id"],
["events.id"],
name=op.f("fk_event_targets_event_id_events"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_event_targets")),
)
with op.batch_alter_table("event_targets", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_event_targets_entity_id"), ["entity_id"], unique=False
)
batch_op.create_index(
batch_op.f("ix_event_targets_entity_project_id"), ["entity_project_id"], unique=False
)
batch_op.create_index(
batch_op.f("ix_event_targets_entity_type"), ["entity_type"], unique=False
)
batch_op.create_index(batch_op.f("ix_event_targets_event_id"), ["event_id"], unique=False)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("event_targets", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_event_targets_event_id"))
batch_op.drop_index(batch_op.f("ix_event_targets_entity_type"))
batch_op.drop_index(batch_op.f("ix_event_targets_entity_project_id"))
batch_op.drop_index(batch_op.f("ix_event_targets_entity_id"))

op.drop_table("event_targets")
with op.batch_alter_table("events", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_events_recorded_at"))
batch_op.drop_index(batch_op.f("ix_events_actor_user_id"))

op.drop_table("events")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Add ix_instances_fleet_id

Revision ID: 5fd659afca82
Revises: d4d9dc26cf58
Create Date: 2025-12-04 20:52:07.015334

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "5fd659afca82"
down_revision = "d4d9dc26cf58"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("instances", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_instances_fleet_id"), ["fleet_id"], unique=False)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("instances", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_instances_fleet_id"))

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Add ix_jobs_run_id

Revision ID: d4d9dc26cf58
Revises: 006512f572b4
Create Date: 2025-12-04 20:48:10.543248

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "d4d9dc26cf58"
down_revision = "006512f572b4"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("jobs", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_jobs_run_id"), ["run_id"], unique=False)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("jobs", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_jobs_run_id"))

# ### end Alembic commands ###
46 changes: 44 additions & 2 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import CoreConfig, generate_dual_core_model
from dstack._internal.core.models.compute_groups import ComputeGroupStatus
from dstack._internal.core.models.events import EventTargetType
from dstack._internal.core.models.fleets import FleetStatus
from dstack._internal.core.models.gateways import GatewayStatus
from dstack._internal.core.models.health import HealthStatus
Expand Down Expand Up @@ -407,7 +408,9 @@ class JobModel(BaseModel):
project_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
project: Mapped["ProjectModel"] = relationship()

run_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("runs.id", ondelete="CASCADE"))
run_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("runs.id", ondelete="CASCADE"), index=True
)
run: Mapped["RunModel"] = relationship()

# Jobs need to reference fleets because we may choose an optimal fleet for a master job
Expand Down Expand Up @@ -601,7 +604,7 @@ class InstanceModel(BaseModel):
)
pool: Mapped[Optional["PoolModel"]] = relationship(back_populates="instances")

fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id"))
fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id"), index=True)
fleet: Mapped[Optional["FleetModel"]] = relationship(back_populates="instances")

compute_group_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("compute_groups.id"))
Expand Down Expand Up @@ -852,3 +855,42 @@ class SecretModel(BaseModel):

name: Mapped[str] = mapped_column(String(200))
value: Mapped[DecryptedString] = mapped_column(EncryptedString())


class EventModel(BaseModel):
__tablename__ = "events"

id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), primary_key=True)
message: Mapped[str] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(NaiveDateTime, index=True)

actor_user_id: Mapped[Optional[uuid.UUID]] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True
)
actor_user: Mapped[Optional["UserModel"]] = relationship()

targets: Mapped[List["EventTargetModel"]] = relationship(back_populates="event")


class EventTargetModel(BaseModel):
__tablename__ = "event_targets"

id: Mapped[uuid.UUID] = mapped_column(
UUIDType(binary=False), primary_key=True, default=uuid.uuid4
)

event_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("events.id", ondelete="CASCADE"), index=True
)
event: Mapped["EventModel"] = relationship()

entity_project_id: Mapped[Optional[uuid.UUID]] = mapped_column(
ForeignKey("projects.id", ondelete="CASCADE"), nullable=True, index=True
)
entity_project: Mapped[Optional["ProjectModel"]] = relationship()

entity_type: Mapped[EventTargetType] = mapped_column(
EnumAsString(EventTargetType, 100), index=True
)
entity_id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), index=True)
entity_name: Mapped[str] = mapped_column(String(200))
Loading