Skip to content

Commit b780ad3

Browse files
authored
Events emission framework and API (#3325)
- Add the framework for event emission - Add the events API - Add a few sample events about project / user / fleet / instance / run / job creation Hidden behind the `DSTACK_FF_EVENTS` feature flag.
1 parent 5f04b6d commit b780ad3

File tree

22 files changed

+2365
-10
lines changed

22 files changed

+2365
-10
lines changed

docs/docs/reference/environment-variables.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ For more details on the options below, refer to the [server deployment](../guide
132132
- `DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS } – Maximum age of instance health checks.
133133
- `DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS } – Minimum time interval between consecutive health checks of the same instance.
134134

135+
<!--
136+
TODO: uncomment after dropping DSTACK_FF_EVENTS
137+
138+
- `DSTACK_SERVER_EVENTS_TTL_SECONDS` { #DSTACK_SERVER_EVENTS_TTL_SECONDS } - Maximum age of event records. Set to `0` to disable event storage. Defaults to 30 days.
139+
-->
140+
135141
??? info "Internal environment variables"
136142
The following environment variables are intended for development purposes:
137143

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import uuid
2+
from datetime import datetime
3+
from enum import Enum
4+
from typing import Annotated, Optional
5+
6+
from pydantic import Field
7+
8+
from dstack._internal.core.models.common import CoreModel
9+
from dstack._internal.utils.common import list_enum_values_for_annotation
10+
11+
12+
class EventTargetType(str, Enum):
13+
PROJECT = "project"
14+
USER = "user"
15+
FLEET = "fleet"
16+
INSTANCE = "instance"
17+
RUN = "run"
18+
JOB = "job"
19+
20+
21+
class EventTarget(CoreModel):
22+
type: Annotated[
23+
str, # not using EventTargetType to allow adding new types without breaking compatibility
24+
Field(
25+
description=(
26+
f"Type of the target entity."
27+
f" One of: {list_enum_values_for_annotation(EventTargetType)}"
28+
)
29+
),
30+
]
31+
project_id: Annotated[
32+
Optional[uuid.UUID],
33+
Field(
34+
description=(
35+
"ID of the project the target entity belongs to,"
36+
" or `null` for target types not bound to a project (e.g., users)"
37+
)
38+
),
39+
]
40+
id: Annotated[uuid.UUID, Field(description="ID of the target entity")]
41+
name: Annotated[str, Field(description="Name of the target entity")]
42+
43+
44+
class Event(CoreModel):
45+
id: uuid.UUID
46+
message: str
47+
recorded_at: datetime
48+
actor_user_id: Annotated[
49+
Optional[uuid.UUID],
50+
Field(
51+
description=(
52+
"ID of the user who performed the action that triggered the event,"
53+
" or `null` if the action was performed by the system"
54+
)
55+
),
56+
]
57+
actor_user: Annotated[
58+
Optional[str],
59+
Field(
60+
description=(
61+
"Name of the user who performed the action that triggered the event,"
62+
" or `null` if the action was performed by the system"
63+
)
64+
),
65+
]
66+
targets: Annotated[
67+
list[EventTarget], Field(description="List of entities affected by the event")
68+
]

src/dstack/_internal/server/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from dstack._internal.server.db import get_db, get_session_ctx, migrate
2727
from dstack._internal.server.routers import (
2828
backends,
29+
events,
2930
files,
3031
fleets,
3132
gateways,
@@ -65,7 +66,7 @@
6566
get_client_version,
6667
get_server_client_error_details,
6768
)
68-
from dstack._internal.settings import DSTACK_VERSION
69+
from dstack._internal.settings import DSTACK_VERSION, FeatureFlags
6970
from dstack._internal.utils.logging import get_logger
7071
from dstack._internal.utils.ssh import check_required_ssh_version
7172

@@ -228,6 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True):
228229
app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"])
229230
app.include_router(prometheus.router)
230231
app.include_router(files.router)
232+
app.include_router(events.root_router, include_in_schema=FeatureFlags.EVENTS)
231233

232234
@app.exception_handler(ForbiddenError)
233235
async def forbidden_error_handler(request: Request, exc: ForbiddenError):

src/dstack/_internal/server/background/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from dstack._internal.server import settings
55
from dstack._internal.server.background.tasks.process_compute_groups import process_compute_groups
6+
from dstack._internal.server.background.tasks.process_events import delete_events
67
from dstack._internal.server.background.tasks.process_fleets import process_fleets
78
from dstack._internal.server.background.tasks.process_gateways import (
89
process_gateways,
@@ -32,6 +33,7 @@
3233
process_terminating_jobs,
3334
)
3435
from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes
36+
from dstack._internal.settings import FeatureFlags
3537

3638
_scheduler = AsyncIOScheduler()
3739

@@ -69,6 +71,8 @@ def start_background_tasks() -> AsyncIOScheduler:
6971
_scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1))
7072
_scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1)
7173
_scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1)
74+
if FeatureFlags.EVENTS:
75+
_scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1)
7276
if settings.ENABLE_PROMETHEUS_METRICS:
7377
_scheduler.add_job(
7478
collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from datetime import timedelta
2+
3+
from sqlalchemy import delete
4+
5+
from dstack._internal.server import settings
6+
from dstack._internal.server.db import get_session_ctx
7+
from dstack._internal.server.models import EventModel
8+
from dstack._internal.server.utils import sentry_utils
9+
from dstack._internal.utils.common import get_current_datetime
10+
11+
12+
@sentry_utils.instrument_background_task
13+
async def delete_events():
14+
cutoff = get_current_datetime() - timedelta(seconds=settings.SERVER_EVENTS_TTL_SECONDS)
15+
stmt = delete(EventModel).where(EventModel.recorded_at < cutoff)
16+
async with get_session_ctx() as session:
17+
await session.execute(stmt)
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""Add events and event_targets
2+
3+
Revision ID: 22d74df9897e
4+
Revises: 5fd659afca82
5+
Create Date: 2025-12-04 20:56:08.003504
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
import sqlalchemy_utils
11+
from alembic import op
12+
13+
import dstack._internal.server.models
14+
15+
# revision identifiers, used by Alembic.
16+
revision = "22d74df9897e"
17+
down_revision = "5fd659afca82"
18+
branch_labels = None
19+
depends_on = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.create_table(
25+
"events",
26+
sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
27+
sa.Column("message", sa.Text(), nullable=False),
28+
sa.Column("recorded_at", dstack._internal.server.models.NaiveDateTime(), nullable=False),
29+
sa.Column(
30+
"actor_user_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True
31+
),
32+
sa.ForeignKeyConstraint(
33+
["actor_user_id"],
34+
["users.id"],
35+
name=op.f("fk_events_actor_user_id_users"),
36+
ondelete="CASCADE",
37+
),
38+
sa.PrimaryKeyConstraint("id", name=op.f("pk_events")),
39+
)
40+
with op.batch_alter_table("events", schema=None) as batch_op:
41+
batch_op.create_index(
42+
batch_op.f("ix_events_actor_user_id"), ["actor_user_id"], unique=False
43+
)
44+
batch_op.create_index(batch_op.f("ix_events_recorded_at"), ["recorded_at"], unique=False)
45+
46+
op.create_table(
47+
"event_targets",
48+
sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
49+
sa.Column("event_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
50+
sa.Column(
51+
"entity_project_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True
52+
),
53+
sa.Column("entity_type", sa.String(length=100), nullable=False),
54+
sa.Column("entity_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False),
55+
sa.Column("entity_name", sa.String(length=200), nullable=False),
56+
sa.ForeignKeyConstraint(
57+
["entity_project_id"],
58+
["projects.id"],
59+
name=op.f("fk_event_targets_entity_project_id_projects"),
60+
ondelete="CASCADE",
61+
),
62+
sa.ForeignKeyConstraint(
63+
["event_id"],
64+
["events.id"],
65+
name=op.f("fk_event_targets_event_id_events"),
66+
ondelete="CASCADE",
67+
),
68+
sa.PrimaryKeyConstraint("id", name=op.f("pk_event_targets")),
69+
)
70+
with op.batch_alter_table("event_targets", schema=None) as batch_op:
71+
batch_op.create_index(
72+
batch_op.f("ix_event_targets_entity_id"), ["entity_id"], unique=False
73+
)
74+
batch_op.create_index(
75+
batch_op.f("ix_event_targets_entity_project_id"), ["entity_project_id"], unique=False
76+
)
77+
batch_op.create_index(
78+
batch_op.f("ix_event_targets_entity_type"), ["entity_type"], unique=False
79+
)
80+
batch_op.create_index(batch_op.f("ix_event_targets_event_id"), ["event_id"], unique=False)
81+
82+
# ### end Alembic commands ###
83+
84+
85+
def downgrade() -> None:
86+
# ### commands auto generated by Alembic - please adjust! ###
87+
with op.batch_alter_table("event_targets", schema=None) as batch_op:
88+
batch_op.drop_index(batch_op.f("ix_event_targets_event_id"))
89+
batch_op.drop_index(batch_op.f("ix_event_targets_entity_type"))
90+
batch_op.drop_index(batch_op.f("ix_event_targets_entity_project_id"))
91+
batch_op.drop_index(batch_op.f("ix_event_targets_entity_id"))
92+
93+
op.drop_table("event_targets")
94+
with op.batch_alter_table("events", schema=None) as batch_op:
95+
batch_op.drop_index(batch_op.f("ix_events_recorded_at"))
96+
batch_op.drop_index(batch_op.f("ix_events_actor_user_id"))
97+
98+
op.drop_table("events")
99+
# ### end Alembic commands ###
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""Add ix_instances_fleet_id
2+
3+
Revision ID: 5fd659afca82
4+
Revises: d4d9dc26cf58
5+
Create Date: 2025-12-04 20:52:07.015334
6+
7+
"""
8+
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "5fd659afca82"
13+
down_revision = "d4d9dc26cf58"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade() -> None:
19+
# ### commands auto generated by Alembic - please adjust! ###
20+
with op.batch_alter_table("instances", schema=None) as batch_op:
21+
batch_op.create_index(batch_op.f("ix_instances_fleet_id"), ["fleet_id"], unique=False)
22+
23+
# ### end Alembic commands ###
24+
25+
26+
def downgrade() -> None:
27+
# ### commands auto generated by Alembic - please adjust! ###
28+
with op.batch_alter_table("instances", schema=None) as batch_op:
29+
batch_op.drop_index(batch_op.f("ix_instances_fleet_id"))
30+
31+
# ### end Alembic commands ###
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""Add ix_jobs_run_id
2+
3+
Revision ID: d4d9dc26cf58
4+
Revises: 006512f572b4
5+
Create Date: 2025-12-04 20:48:10.543248
6+
7+
"""
8+
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "d4d9dc26cf58"
13+
down_revision = "006512f572b4"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade() -> None:
19+
# ### commands auto generated by Alembic - please adjust! ###
20+
with op.batch_alter_table("jobs", schema=None) as batch_op:
21+
batch_op.create_index(batch_op.f("ix_jobs_run_id"), ["run_id"], unique=False)
22+
23+
# ### end Alembic commands ###
24+
25+
26+
def downgrade() -> None:
27+
# ### commands auto generated by Alembic - please adjust! ###
28+
with op.batch_alter_table("jobs", schema=None) as batch_op:
29+
batch_op.drop_index(batch_op.f("ix_jobs_run_id"))
30+
31+
# ### end Alembic commands ###

src/dstack/_internal/server/models.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from dstack._internal.core.models.backends.base import BackendType
2727
from dstack._internal.core.models.common import CoreConfig, generate_dual_core_model
2828
from dstack._internal.core.models.compute_groups import ComputeGroupStatus
29+
from dstack._internal.core.models.events import EventTargetType
2930
from dstack._internal.core.models.fleets import FleetStatus
3031
from dstack._internal.core.models.gateways import GatewayStatus
3132
from dstack._internal.core.models.health import HealthStatus
@@ -407,7 +408,9 @@ class JobModel(BaseModel):
407408
project_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
408409
project: Mapped["ProjectModel"] = relationship()
409410

410-
run_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("runs.id", ondelete="CASCADE"))
411+
run_id: Mapped[uuid.UUID] = mapped_column(
412+
ForeignKey("runs.id", ondelete="CASCADE"), index=True
413+
)
411414
run: Mapped["RunModel"] = relationship()
412415

413416
# Jobs need to reference fleets because we may choose an optimal fleet for a master job
@@ -601,7 +604,7 @@ class InstanceModel(BaseModel):
601604
)
602605
pool: Mapped[Optional["PoolModel"]] = relationship(back_populates="instances")
603606

604-
fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id"))
607+
fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id"), index=True)
605608
fleet: Mapped[Optional["FleetModel"]] = relationship(back_populates="instances")
606609

607610
compute_group_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("compute_groups.id"))
@@ -852,3 +855,42 @@ class SecretModel(BaseModel):
852855

853856
name: Mapped[str] = mapped_column(String(200))
854857
value: Mapped[DecryptedString] = mapped_column(EncryptedString())
858+
859+
860+
class EventModel(BaseModel):
861+
__tablename__ = "events"
862+
863+
id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), primary_key=True)
864+
message: Mapped[str] = mapped_column(Text)
865+
recorded_at: Mapped[datetime] = mapped_column(NaiveDateTime, index=True)
866+
867+
actor_user_id: Mapped[Optional[uuid.UUID]] = mapped_column(
868+
ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True
869+
)
870+
actor_user: Mapped[Optional["UserModel"]] = relationship()
871+
872+
targets: Mapped[List["EventTargetModel"]] = relationship(back_populates="event")
873+
874+
875+
class EventTargetModel(BaseModel):
876+
__tablename__ = "event_targets"
877+
878+
id: Mapped[uuid.UUID] = mapped_column(
879+
UUIDType(binary=False), primary_key=True, default=uuid.uuid4
880+
)
881+
882+
event_id: Mapped[uuid.UUID] = mapped_column(
883+
ForeignKey("events.id", ondelete="CASCADE"), index=True
884+
)
885+
event: Mapped["EventModel"] = relationship()
886+
887+
entity_project_id: Mapped[Optional[uuid.UUID]] = mapped_column(
888+
ForeignKey("projects.id", ondelete="CASCADE"), nullable=True, index=True
889+
)
890+
entity_project: Mapped[Optional["ProjectModel"]] = relationship()
891+
892+
entity_type: Mapped[EventTargetType] = mapped_column(
893+
EnumAsString(EventTargetType, 100), index=True
894+
)
895+
entity_id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), index=True)
896+
entity_name: Mapped[str] = mapped_column(String(200))

0 commit comments

Comments
 (0)