diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 31a94e903..04ee67d93 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -132,6 +132,12 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS } – Maximum age of instance health checks. - `DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS } – Minimum time interval between consecutive health checks of the same instance. + + ??? info "Internal environment variables" The following environment variables are intended for development purposes: diff --git a/src/dstack/_internal/core/models/events.py b/src/dstack/_internal/core/models/events.py new file mode 100644 index 000000000..3fb556101 --- /dev/null +++ b/src/dstack/_internal/core/models/events.py @@ -0,0 +1,68 @@ +import uuid +from datetime import datetime +from enum import Enum +from typing import Annotated, Optional + +from pydantic import Field + +from dstack._internal.core.models.common import CoreModel +from dstack._internal.utils.common import list_enum_values_for_annotation + + +class EventTargetType(str, Enum): + PROJECT = "project" + USER = "user" + FLEET = "fleet" + INSTANCE = "instance" + RUN = "run" + JOB = "job" + + +class EventTarget(CoreModel): + type: Annotated[ + str, # not using EventTargetType to allow adding new types without breaking compatibility + Field( + description=( + f"Type of the target entity." + f" One of: {list_enum_values_for_annotation(EventTargetType)}" + ) + ), + ] + project_id: Annotated[ + Optional[uuid.UUID], + Field( + description=( + "ID of the project the target entity belongs to," + " or `null` for target types not bound to a project (e.g., users)" + ) + ), + ] + id: Annotated[uuid.UUID, Field(description="ID of the target entity")] + name: Annotated[str, Field(description="Name of the target entity")] + + +class Event(CoreModel): + id: uuid.UUID + message: str + recorded_at: datetime + actor_user_id: Annotated[ + Optional[uuid.UUID], + Field( + description=( + "ID of the user who performed the action that triggered the event," + " or `null` if the action was performed by the system" + ) + ), + ] + actor_user: Annotated[ + Optional[str], + Field( + description=( + "Name of the user who performed the action that triggered the event," + " or `null` if the action was performed by the system" + ) + ), + ] + targets: Annotated[ + list[EventTarget], Field(description="List of entities affected by the event") + ] diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index d9b906bc6..5382e8f11 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -26,6 +26,7 @@ from dstack._internal.server.db import get_db, get_session_ctx, migrate from dstack._internal.server.routers import ( backends, + events, files, fleets, gateways, @@ -65,7 +66,7 @@ get_client_version, get_server_client_error_details, ) -from dstack._internal.settings import DSTACK_VERSION +from dstack._internal.settings import DSTACK_VERSION, FeatureFlags from dstack._internal.utils.logging import get_logger from dstack._internal.utils.ssh import check_required_ssh_version @@ -228,6 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True): app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"]) app.include_router(prometheus.router) app.include_router(files.router) + app.include_router(events.root_router, include_in_schema=FeatureFlags.EVENTS) @app.exception_handler(ForbiddenError) async def forbidden_error_handler(request: Request, exc: ForbiddenError): diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index df7d41b9d..2c1a42e85 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -3,6 +3,7 @@ from dstack._internal.server import settings from dstack._internal.server.background.tasks.process_compute_groups import process_compute_groups +from dstack._internal.server.background.tasks.process_events import delete_events from dstack._internal.server.background.tasks.process_fleets import process_fleets from dstack._internal.server.background.tasks.process_gateways import ( process_gateways, @@ -32,6 +33,7 @@ process_terminating_jobs, ) from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes +from dstack._internal.settings import FeatureFlags _scheduler = AsyncIOScheduler() @@ -69,6 +71,8 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1)) _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) + if FeatureFlags.EVENTS: + _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: _scheduler.add_job( collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 diff --git a/src/dstack/_internal/server/background/tasks/process_events.py b/src/dstack/_internal/server/background/tasks/process_events.py new file mode 100644 index 000000000..22df5bcf3 --- /dev/null +++ b/src/dstack/_internal/server/background/tasks/process_events.py @@ -0,0 +1,17 @@ +from datetime import timedelta + +from sqlalchemy import delete + +from dstack._internal.server import settings +from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.models import EventModel +from dstack._internal.server.utils import sentry_utils +from dstack._internal.utils.common import get_current_datetime + + +@sentry_utils.instrument_background_task +async def delete_events(): + cutoff = get_current_datetime() - timedelta(seconds=settings.SERVER_EVENTS_TTL_SECONDS) + stmt = delete(EventModel).where(EventModel.recorded_at < cutoff) + async with get_session_ctx() as session: + await session.execute(stmt) diff --git a/src/dstack/_internal/server/migrations/versions/22d74df9897e_add_events_and_event_targets.py b/src/dstack/_internal/server/migrations/versions/22d74df9897e_add_events_and_event_targets.py new file mode 100644 index 000000000..87a48deba --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/22d74df9897e_add_events_and_event_targets.py @@ -0,0 +1,99 @@ +"""Add events and event_targets + +Revision ID: 22d74df9897e +Revises: 5fd659afca82 +Create Date: 2025-12-04 20:56:08.003504 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +import dstack._internal.server.models + +# revision identifiers, used by Alembic. +revision = "22d74df9897e" +down_revision = "5fd659afca82" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "events", + sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("message", sa.Text(), nullable=False), + sa.Column("recorded_at", dstack._internal.server.models.NaiveDateTime(), nullable=False), + sa.Column( + "actor_user_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True + ), + sa.ForeignKeyConstraint( + ["actor_user_id"], + ["users.id"], + name=op.f("fk_events_actor_user_id_users"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_events")), + ) + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_events_actor_user_id"), ["actor_user_id"], unique=False + ) + batch_op.create_index(batch_op.f("ix_events_recorded_at"), ["recorded_at"], unique=False) + + op.create_table( + "event_targets", + sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("event_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column( + "entity_project_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True + ), + sa.Column("entity_type", sa.String(length=100), nullable=False), + sa.Column("entity_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("entity_name", sa.String(length=200), nullable=False), + sa.ForeignKeyConstraint( + ["entity_project_id"], + ["projects.id"], + name=op.f("fk_event_targets_entity_project_id_projects"), + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["event_id"], + ["events.id"], + name=op.f("fk_event_targets_event_id_events"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_event_targets")), + ) + with op.batch_alter_table("event_targets", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_event_targets_entity_id"), ["entity_id"], unique=False + ) + batch_op.create_index( + batch_op.f("ix_event_targets_entity_project_id"), ["entity_project_id"], unique=False + ) + batch_op.create_index( + batch_op.f("ix_event_targets_entity_type"), ["entity_type"], unique=False + ) + batch_op.create_index(batch_op.f("ix_event_targets_event_id"), ["event_id"], unique=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("event_targets", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_event_targets_event_id")) + batch_op.drop_index(batch_op.f("ix_event_targets_entity_type")) + batch_op.drop_index(batch_op.f("ix_event_targets_entity_project_id")) + batch_op.drop_index(batch_op.f("ix_event_targets_entity_id")) + + op.drop_table("event_targets") + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_events_recorded_at")) + batch_op.drop_index(batch_op.f("ix_events_actor_user_id")) + + op.drop_table("events") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py b/src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py new file mode 100644 index 000000000..4e9467a7c --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py @@ -0,0 +1,31 @@ +"""Add ix_instances_fleet_id + +Revision ID: 5fd659afca82 +Revises: d4d9dc26cf58 +Create Date: 2025-12-04 20:52:07.015334 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "5fd659afca82" +down_revision = "d4d9dc26cf58" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_instances_fleet_id"), ["fleet_id"], unique=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_instances_fleet_id")) + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py b/src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py new file mode 100644 index 000000000..b3d485a07 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py @@ -0,0 +1,31 @@ +"""Add ix_jobs_run_id + +Revision ID: d4d9dc26cf58 +Revises: 006512f572b4 +Create Date: 2025-12-04 20:48:10.543248 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "d4d9dc26cf58" +down_revision = "006512f572b4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_jobs_run_id"), ["run_id"], unique=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_jobs_run_id")) + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 04185762a..9e6adc1c9 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -26,6 +26,7 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import CoreConfig, generate_dual_core_model from dstack._internal.core.models.compute_groups import ComputeGroupStatus +from dstack._internal.core.models.events import EventTargetType from dstack._internal.core.models.fleets import FleetStatus from dstack._internal.core.models.gateways import GatewayStatus from dstack._internal.core.models.health import HealthStatus @@ -407,7 +408,9 @@ class JobModel(BaseModel): project_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE")) project: Mapped["ProjectModel"] = relationship() - run_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("runs.id", ondelete="CASCADE")) + run_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("runs.id", ondelete="CASCADE"), index=True + ) run: Mapped["RunModel"] = relationship() # Jobs need to reference fleets because we may choose an optimal fleet for a master job @@ -601,7 +604,7 @@ class InstanceModel(BaseModel): ) pool: Mapped[Optional["PoolModel"]] = relationship(back_populates="instances") - fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id")) + fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id"), index=True) fleet: Mapped[Optional["FleetModel"]] = relationship(back_populates="instances") compute_group_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("compute_groups.id")) @@ -852,3 +855,42 @@ class SecretModel(BaseModel): name: Mapped[str] = mapped_column(String(200)) value: Mapped[DecryptedString] = mapped_column(EncryptedString()) + + +class EventModel(BaseModel): + __tablename__ = "events" + + id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), primary_key=True) + message: Mapped[str] = mapped_column(Text) + recorded_at: Mapped[datetime] = mapped_column(NaiveDateTime, index=True) + + actor_user_id: Mapped[Optional[uuid.UUID]] = mapped_column( + ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True + ) + actor_user: Mapped[Optional["UserModel"]] = relationship() + + targets: Mapped[List["EventTargetModel"]] = relationship(back_populates="event") + + +class EventTargetModel(BaseModel): + __tablename__ = "event_targets" + + id: Mapped[uuid.UUID] = mapped_column( + UUIDType(binary=False), primary_key=True, default=uuid.uuid4 + ) + + event_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("events.id", ondelete="CASCADE"), index=True + ) + event: Mapped["EventModel"] = relationship() + + entity_project_id: Mapped[Optional[uuid.UUID]] = mapped_column( + ForeignKey("projects.id", ondelete="CASCADE"), nullable=True, index=True + ) + entity_project: Mapped[Optional["ProjectModel"]] = relationship() + + entity_type: Mapped[EventTargetType] = mapped_column( + EnumAsString(EventTargetType, 100), index=True + ) + entity_id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), index=True) + entity_name: Mapped[str] = mapped_column(String(200)) diff --git a/src/dstack/_internal/server/routers/events.py b/src/dstack/_internal/server/routers/events.py new file mode 100644 index 000000000..b5472aa0d --- /dev/null +++ b/src/dstack/_internal/server/routers/events.py @@ -0,0 +1,61 @@ +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +import dstack._internal.server.services.events as events_services +from dstack._internal.core.errors import ServerClientError +from dstack._internal.core.models.events import Event +from dstack._internal.server.db import get_session +from dstack._internal.server.models import UserModel +from dstack._internal.server.schemas.events import ListEventsRequest +from dstack._internal.server.security.permissions import Authenticated +from dstack._internal.server.utils.routers import ( + CustomORJSONResponse, + get_base_api_additional_responses, +) +from dstack._internal.settings import FeatureFlags + +root_router = APIRouter( + prefix="/api/events", + tags=["events"], + responses=get_base_api_additional_responses(), +) + + +@root_router.post("/list", response_model=list[Event]) +async def list_events( + body: ListEventsRequest, + session: AsyncSession = Depends(get_session), + user: UserModel = Depends(Authenticated()), +): + """ + Returns events visible to the current user. + + Regular users can see events related to themselves and to projects they are members of. + Global admins can see all events. + + The results are paginated. To get the next page, pass `recorded_at` and `id` of + the last event from the previous page as `prev_recorded_at` and `prev_id`. + """ + if not FeatureFlags.EVENTS: + raise ServerClientError("Events are disabled on this server") + return CustomORJSONResponse( + await events_services.list_events( + session=session, + user=user, + target_projects=body.target_projects, + target_users=body.target_users, + target_fleets=body.target_fleets, + target_instances=body.target_instances, + target_runs=body.target_runs, + target_jobs=body.target_jobs, + within_projects=body.within_projects, + within_fleets=body.within_fleets, + within_runs=body.within_runs, + include_target_types=body.include_target_types, + actors=body.actors, + prev_recorded_at=body.prev_recorded_at, + prev_id=body.prev_id, + limit=body.limit, + ascending=body.ascending, + ) + ) diff --git a/src/dstack/_internal/server/routers/users.py b/src/dstack/_internal/server/routers/users.py index d58472275..be04f8392 100644 --- a/src/dstack/_internal/server/routers/users.py +++ b/src/dstack/_internal/server/routers/users.py @@ -76,6 +76,7 @@ async def create_user( global_role=body.global_role, email=body.email, active=body.active, + creator=user, ) return CustomORJSONResponse(users.user_model_to_user(res)) diff --git a/src/dstack/_internal/server/schemas/events.py b/src/dstack/_internal/server/schemas/events.py new file mode 100644 index 000000000..05ed5b323 --- /dev/null +++ b/src/dstack/_internal/server/schemas/events.py @@ -0,0 +1,177 @@ +import uuid +from datetime import datetime +from typing import Annotated, Optional +from uuid import UUID + +from pydantic import Field, root_validator + +from dstack._internal.core.models.common import CoreModel +from dstack._internal.core.models.events import EventTargetType + +MIN_FILTER_ITEMS = 1 +MAX_FILTER_ITEMS = 16 # Conservative limit to prevent overly complex db queries + + +class ListEventsRequest(CoreModel): + target_projects: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of project IDs." + " The response will only include events that target the specified projects" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_users: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of user IDs." + " The response will only include events that target the specified users" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_fleets: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of fleet IDs." + " The response will only include events that target the specified fleets" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_instances: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of instance IDs." + " The response will only include events that target the specified instances" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_runs: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of run IDs." + " The response will only include events that target the specified runs" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_jobs: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of job IDs." + " The response will only include events that target the specified jobs" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + within_projects: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of project IDs." + " The response will only include events that target the specified projects" + " or any entities within those projects" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + within_fleets: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of fleet IDs." + " The response will only include events that target the specified fleets" + " or instances within those fleets" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + within_runs: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of run IDs." + " The response will only include events that target the specified runs" + " or jobs within those runs" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + include_target_types: Annotated[ + Optional[list[EventTargetType]], + Field( + description=( + "List of target types." + " The response will only include events that have a target" + " of one of the specified types" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + actors: Annotated[ + Optional[list[Optional[uuid.UUID]]], + Field( + description=( + "List of user IDs or `null` values." + " The response will only include events about actions" + " performed by the specified users," + " or performed by the system if `null` is specified" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + prev_recorded_at: Optional[datetime] = None + prev_id: Optional[UUID] = None + limit: int = Field(100, ge=1, le=100) + ascending: bool = False + + @root_validator + def _validate_target_filters(cls, values): + """ + Raise an error if more than one target_* filter is set. Setting multiple + target_* filters would always result in an empty response, which might confuse users. + """ + + target_filters = [name for name in cls.__fields__ if name.startswith("target_")] + set_filters = [f for f in target_filters if values.get(f) is not None] + if len(set_filters) > 1: + raise ValueError( + f"At most one target_* filter can be set at a time. Got {', '.join(set_filters)}" + ) + return values + + @root_validator + def _validate_within_filters(cls, values): + """ + Raise an error if more than one within_* filter is set. Setting multiple + within_* filters is either redundant or incorrect. Each within_* filter + may also lead to additional db queries, causing unnecessary load. + """ + + within_filters = [name for name in cls.__fields__ if name.startswith("within_")] + set_filters = [f for f in within_filters if values.get(f) is not None] + if len(set_filters) > 1: + raise ValueError( + f"At most one within_* filter can be set at a time. Got {', '.join(set_filters)}" + ) + return values diff --git a/src/dstack/_internal/server/services/events.py b/src/dstack/_internal/server/services/events.py new file mode 100644 index 000000000..34e7b7237 --- /dev/null +++ b/src/dstack/_internal/server/services/events.py @@ -0,0 +1,411 @@ +import uuid +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Union + +from sqlalchemy import and_, exists, or_, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from dstack._internal.core.models.events import Event, EventTarget, EventTargetType +from dstack._internal.core.models.users import GlobalRole +from dstack._internal.server import settings +from dstack._internal.server.models import ( + EventModel, + EventTargetModel, + FleetModel, + InstanceModel, + JobModel, + MemberModel, + ProjectModel, + RunModel, + UserModel, +) +from dstack._internal.server.services.logging import fmt_entity +from dstack._internal.settings import FeatureFlags +from dstack._internal.utils.common import get_current_datetime +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +class SystemActor: + """Represents the system as the actor of an event""" + + def fmt(self) -> str: + return "system" + + +@dataclass +class UserActor: + """ + Represents a user as the actor of an event. + + **NOTE**: Prefer using `UserActor.from_user` to create `UserActor` instances, + unless you don't have a complete `UserModel` available. + """ + + user_id: uuid.UUID + user_name: str + + @staticmethod + def from_user(user: UserModel) -> "UserActor": + return UserActor(user_id=user.id, user_name=user.name) + + def fmt(self) -> str: + return fmt_entity("user", self.user_id, self.user_name) + + +AnyActor = Union[SystemActor, UserActor] + + +@dataclass( + frozen=True, # to enforce the __post_init__ invariant +) +class Target: + """ + Target specification for event emission. + + **NOTE**: Prefer using `Target.from_model` to create `Target` instances, + unless you don't have a complete model available. + """ + + type: EventTargetType + project_id: Optional[uuid.UUID] + id: uuid.UUID + name: str + + def __post_init__(self): + if self.type == EventTargetType.USER and self.project_id is not None: + raise ValueError("User target cannot have project_id") + if self.type != EventTargetType.USER and self.project_id is None: + raise ValueError(f"{self.type} target must have project_id") + if self.type == EventTargetType.PROJECT and self.id != self.project_id: + raise ValueError("Project target id must be equal to project_id") + + @staticmethod + def from_model( + model: Union[ + FleetModel, + InstanceModel, + JobModel, + ProjectModel, + RunModel, + UserModel, + ], + ) -> "Target": + if isinstance(model, FleetModel): + return Target( + type=EventTargetType.FLEET, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.name, + ) + if isinstance(model, InstanceModel): + return Target( + type=EventTargetType.INSTANCE, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.name, + ) + if isinstance(model, JobModel): + return Target( + type=EventTargetType.JOB, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.job_name, + ) + if isinstance(model, ProjectModel): + return Target( + type=EventTargetType.PROJECT, + project_id=model.id, + id=model.id, + name=model.name, + ) + if isinstance(model, RunModel): + return Target( + type=EventTargetType.RUN, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.run_name, + ) + if isinstance(model, UserModel): + return Target( + type=EventTargetType.USER, + project_id=None, + id=model.id, + name=model.name, + ) + raise ValueError(f"Unsupported model type: {type(model)}") + + def fmt(self) -> str: + return fmt_entity(self.type, self.id, self.name) + + +def emit(session: AsyncSession, message: str, actor: AnyActor, targets: list[Target]) -> None: + """ + Emit an event - add it to the current session without committing. + + Usage guidelines: + - Message: + - Use past tense - events should describe completed actions. + Bad: "Creating project" + Good: "Project created" + - Do not duplicate target and actor names in the message. + Bad: "User John created project MyProject" + Good: "Project created" + - Actor: + - Pass `UserActor` for events about user actions, e.g., in API handlers. + - Pass `SystemActor` for system-generated events, e.g., in background jobs. + - Targets: + - Link the event to one or more entities affected by it. + E.g., for a "Job assigned to instance" event, link it to the job and the instance. + - Do not link the event to parent entities of the affected entities. + E.g., the "Instance created" event should be linked to the instance only, + not to the fleet or project. Transitive relationships with parent entities + are inferred automatically when listing events using the within_* filters. + - **Important**: If linking the event to multiple targets with different access scopes + (e.g., entities in different projects, or different users), ensure that this does not + leak sensitive information. If a user has access to at least one of the targets, + they will see the entire event with all targets. If this is not desired, + consider emitting multiple separate events instead. + """ + if not FeatureFlags.EVENTS: + return + + if not targets: + raise ValueError("At least one target must be specified") + if not message: + raise ValueError("Message cannot be empty") + if message.strip() != message: + raise ValueError("Message cannot have leading or trailing whitespace") + if "\n" in message: + raise ValueError("Message cannot contain newlines") + if message.endswith("."): + raise ValueError("Message cannot end with a period") + + logger.info( + "Emitting event: %s. Event targets: %s. Actor: %s", + message, + ", ".join(target.fmt() for target in targets), + actor.fmt(), + ) + + if settings.SERVER_EVENTS_TTL_SECONDS <= 0: + return + event = EventModel( + id=uuid.uuid4(), + message=message, + actor_user_id=actor.user_id if isinstance(actor, UserActor) else None, + recorded_at=get_current_datetime(), + targets=[], + ) + for target in targets: + event.targets.append( + EventTargetModel( + entity_type=target.type, + entity_project_id=target.project_id, + entity_id=target.id, + entity_name=target.name, + ) + ) + session.add(event) + + +async def list_events( + session: AsyncSession, + user: UserModel, # the user requesting the events + target_projects: Optional[list[uuid.UUID]], + target_users: Optional[list[uuid.UUID]], + target_fleets: Optional[list[uuid.UUID]], + target_instances: Optional[list[uuid.UUID]], + target_runs: Optional[list[uuid.UUID]], + target_jobs: Optional[list[uuid.UUID]], + within_projects: Optional[list[uuid.UUID]], + within_fleets: Optional[list[uuid.UUID]], + within_runs: Optional[list[uuid.UUID]], + include_target_types: Optional[list[EventTargetType]], + actors: Optional[list[Optional[uuid.UUID]]], + prev_recorded_at: Optional[datetime], + prev_id: Optional[uuid.UUID], + limit: int, + ascending: bool, +) -> list[Event]: + target_filters = [] + if user.global_role != GlobalRole.ADMIN: + query = select(MemberModel.project_id).where(MemberModel.user_id == user.id) + res = await session.execute(query) + # In Postgres, fetching project IDs separately is orders of magnitude faster + # than using a subquery. + project_ids = list(res.unique().scalars().all()) + target_filters.append( + or_( + EventTargetModel.entity_project_id.in_(project_ids), + and_( + EventTargetModel.entity_project_id.is_(None), + EventTargetModel.entity_type == EventTargetType.USER, + EventTargetModel.entity_id == user.id, + ), + ) + ) + if target_projects is not None: + target_filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.PROJECT, + EventTargetModel.entity_id.in_(target_projects), + ) + ) + if target_users is not None: + target_filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.USER, + EventTargetModel.entity_id.in_(target_users), + ) + ) + if target_fleets is not None: + target_filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.FLEET, + EventTargetModel.entity_id.in_(target_fleets), + ) + ) + if target_instances is not None: + target_filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.INSTANCE, + EventTargetModel.entity_id.in_(target_instances), + ) + ) + if target_runs is not None: + target_filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.RUN, + EventTargetModel.entity_id.in_(target_runs), + ) + ) + if target_jobs is not None: + target_filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.JOB, + EventTargetModel.entity_id.in_(target_jobs), + ) + ) + if within_projects is not None: + target_filters.append(EventTargetModel.entity_project_id.in_(within_projects)) + if within_fleets is not None: + query = select(InstanceModel.id).where(InstanceModel.fleet_id.in_(within_fleets)) + res = await session.execute(query) + # In Postgres, fetching instance IDs separately is orders of magnitude faster + # than using a subquery. + instance_ids = list(res.unique().scalars().all()) + target_filters.append( + or_( + and_( + EventTargetModel.entity_type == EventTargetType.FLEET, + EventTargetModel.entity_id.in_(within_fleets), + ), + and_( + EventTargetModel.entity_type == EventTargetType.INSTANCE, + EventTargetModel.entity_id.in_(instance_ids), + ), + ) + ) + if within_runs is not None: + query = select(JobModel.id).where(JobModel.run_id.in_(within_runs)) + res = await session.execute(query) + # In Postgres, fetching job IDs separately is orders of magnitude faster + # than using a subquery. + job_ids = list(res.unique().scalars().all()) + target_filters.append( + or_( + and_( + EventTargetModel.entity_type == EventTargetType.RUN, + EventTargetModel.entity_id.in_(within_runs), + ), + and_( + EventTargetModel.entity_type == EventTargetType.JOB, + EventTargetModel.entity_id.in_(job_ids), + ), + ) + ) + if include_target_types is not None: + target_filters.append(EventTargetModel.entity_type.in_(include_target_types)) + + event_filters = [] + if actors is not None: + event_filters.append( + or_( + EventModel.actor_user_id.is_(None) if None in actors else False, + EventModel.actor_user_id.in_( + [actor_id for actor_id in actors if actor_id is not None] + ), + ) + ) + if prev_recorded_at is not None: + if ascending: + if prev_id is None: + event_filters.append(EventModel.recorded_at > prev_recorded_at) + else: + event_filters.append( + or_( + EventModel.recorded_at > prev_recorded_at, + and_(EventModel.recorded_at == prev_recorded_at, EventModel.id < prev_id), + ) + ) + else: + if prev_id is None: + event_filters.append(EventModel.recorded_at < prev_recorded_at) + else: + event_filters.append( + or_( + EventModel.recorded_at < prev_recorded_at, + and_(EventModel.recorded_at == prev_recorded_at, EventModel.id > prev_id), + ) + ) + order_by = (EventModel.recorded_at.desc(), EventModel.id) + if ascending: + order_by = (EventModel.recorded_at.asc(), EventModel.id.desc()) + query = ( + select(EventModel) + .order_by(*order_by) + .limit(limit) + .options( + joinedload(EventModel.targets), + joinedload(EventModel.actor_user).load_only(UserModel.name), + ) + ) + if event_filters: + query = query.where(*event_filters) + if target_filters: + query = query.where( + exists().where( + and_( + EventTargetModel.event_id == EventModel.id, + *target_filters, + ) + ) + ) + res = await session.execute(query) + event_models = res.unique().scalars().all() + return list(map(event_model_to_event, event_models)) + + +def event_model_to_event(event_model: EventModel) -> Event: + targets = [ + EventTarget( + type=target.entity_type, + project_id=target.entity_project_id, + id=target.entity_id, + name=target.entity_name, + ) + for target in event_model.targets + ] + + return Event( + id=event_model.id, + message=event_model.message, + recorded_at=event_model.recorded_at, + actor_user_id=event_model.actor_user_id, + actor_user=event_model.actor_user.name if event_model.actor_user else None, + targets=targets, + ) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 3a5329f6e..b1285134e 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -53,6 +53,7 @@ ProjectModel, UserModel, ) +from dstack._internal.server.services import events from dstack._internal.server.services import instances as instances_services from dstack._internal.server.services import offers as offers_services from dstack._internal.server.services.instances import ( @@ -752,6 +753,12 @@ async def _create_fleet( instances=[], ) session.add(fleet_model) + events.emit( + session, + f"Fleet created. Status: {fleet_model.status.upper()}", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_model)], + ) if spec.configuration.ssh_config is not None: for i, host in enumerate(spec.configuration.ssh_config.hosts): instances_model = await create_fleet_ssh_instance_model( @@ -773,6 +780,13 @@ async def _create_fleet( instance_num=i, ) fleet_model.instances.append(instance_model) + for instance_model in fleet_model.instances: + events.emit( + session, + f"Instance created on fleet submission. Status: {instance_model.status.upper()}", + actor=events.SystemActor(), + targets=[events.Target.from_model(instance_model)], + ) await session.commit() return fleet_model_to_fleet(fleet_model) diff --git a/src/dstack/_internal/server/services/logging.py b/src/dstack/_internal/server/services/logging.py index c73853419..bf1b72f09 100644 --- a/src/dstack/_internal/server/services/logging.py +++ b/src/dstack/_internal/server/services/logging.py @@ -1,3 +1,4 @@ +import uuid from typing import Union from dstack._internal.server.models import ( @@ -12,13 +13,17 @@ def fmt(model: Union[RunModel, JobModel, InstanceModel, GatewayModel, ProbeModel]) -> str: """Consistent string representation of a model for logging.""" if isinstance(model, RunModel): - return f"run({model.id.hex[:6]}){model.run_name}" + return fmt_entity("run", model.id, model.run_name) if isinstance(model, JobModel): - return f"job({model.id.hex[:6]}){model.job_name}" + return fmt_entity("job", model.id, model.job_name) if isinstance(model, InstanceModel): - return f"instance({model.id.hex[:6]}){model.name}" + return fmt_entity("instance", model.id, model.name) if isinstance(model, GatewayModel): - return f"gateway({model.id.hex[:6]}){model.name}" + return fmt_entity("gateway", model.id, model.name) if isinstance(model, ProbeModel): - return f"probe({model.id.hex[:6]}){model.name}" + return fmt_entity("probe", model.id, model.name) return str(model) + + +def fmt_entity(entity_type: str, entity_id: uuid.UUID, entity_name: str) -> str: + return f"{entity_type}({entity_id.hex[:6]}){entity_name}" diff --git a/src/dstack/_internal/server/services/projects.py b/src/dstack/_internal/server/services/projects.py index cc6c37401..2004b5ccc 100644 --- a/src/dstack/_internal/server/services/projects.py +++ b/src/dstack/_internal/server/services/projects.py @@ -31,7 +31,7 @@ VolumeModel, ) from dstack._internal.server.schemas.projects import MemberSetting -from dstack._internal.server.services import users +from dstack._internal.server.services import events, users from dstack._internal.server.services.backends import ( get_backend_config_without_creds_from_backend_model, ) @@ -540,6 +540,12 @@ async def create_project_model( is_public=is_public, ) session.add(project) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(owner), + targets=[events.Target.from_model(project)], + ) await session.commit() return project diff --git a/src/dstack/_internal/server/services/runs/__init__.py b/src/dstack/_internal/server/services/runs/__init__.py index cacda8e18..18c0847b4 100644 --- a/src/dstack/_internal/server/services/runs/__init__.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -47,8 +47,8 @@ RunModel, UserModel, ) +from dstack._internal.server.services import events, services from dstack._internal.server.services import repos as repos_services -from dstack._internal.server.services import services from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, delay_job_instance_termination, @@ -484,6 +484,12 @@ async def submit_run( next_triggered_at=_get_next_triggered_at(run_spec), ) session.add(run_model) + events.emit( + session, + f"Run submitted. Status: {run_model.status.upper()}", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run_model)], + ) if run_spec.configuration.type == "service": await services.register_service(session, run_model, run_spec) @@ -501,6 +507,14 @@ async def submit_run( status=JobStatus.SUBMITTED, ) session.add(job_model) + events.emit( + session, + f"Job created on run submission. Status: {job_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(job_model), + ], + ) await session.commit() await session.refresh(run_model) diff --git a/src/dstack/_internal/server/services/users.py b/src/dstack/_internal/server/services/users.py index aed0a23de..a42fb64a1 100644 --- a/src/dstack/_internal/server/services/users.py +++ b/src/dstack/_internal/server/services/users.py @@ -20,6 +20,7 @@ UserWithCreds, ) from dstack._internal.server.models import DecryptedString, MemberModel, UserModel +from dstack._internal.server.services import events from dstack._internal.server.services.permissions import get_default_permissions from dstack._internal.server.utils.routers import error_forbidden from dstack._internal.utils import crypto @@ -87,6 +88,7 @@ async def create_user( active: bool = True, token: Optional[str] = None, config: Optional[UserHookConfig] = None, + creator: Optional[UserModel] = None, ) -> UserModel: validate_username(username) user_model = await get_user_model_by_name(session=session, username=username, ignore_case=True) @@ -107,6 +109,12 @@ async def create_user( ssh_public_key=public_bytes.decode(), ) session.add(user) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(creator) if creator else events.UserActor.from_user(user), + targets=[events.Target.from_model(user)], + ) await session.commit() for func in _CREATE_USER_HOOKS: await func(session, user, config) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 0730db7f4..52f56f23e 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -106,6 +106,11 @@ os.getenv("DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS", 60) ) +SERVER_EVENTS_TTL_SECONDS = int( + # default documented in reference/environment-variables.md, keep in sync + os.getenv("DSTACK_SERVER_EVENTS_TTL_SECONDS", 30 * 24 * 3600) +) + SERVER_KEEP_SHIM_TASKS = os.getenv("DSTACK_SERVER_KEEP_SHIM_TASKS") is not None DEFAULT_PROJECT_NAME = "main" diff --git a/src/dstack/_internal/settings.py b/src/dstack/_internal/settings.py index 0462ddcdf..86ca6fe31 100644 --- a/src/dstack/_internal/settings.py +++ b/src/dstack/_internal/settings.py @@ -44,3 +44,6 @@ class FeatureFlags: # - Makes `repos[].path` required, unless the client is older than 0.19.27, # in which case `/workflow` is still used. LEGACY_REPO_DIR_DISABLED = os.getenv("DSTACK_FF_LEGACY_REPO_DIR_DISABLED") is not None + + # Server-side flag to enable event emission and Events API + EVENTS = os.getenv("DSTACK_FF_EVENTS") is not None diff --git a/src/tests/_internal/server/background/tasks/test_process_events.py b/src/tests/_internal/server/background/tasks/test_process_events.py new file mode 100644 index 000000000..1dc29167d --- /dev/null +++ b/src/tests/_internal/server/background/tasks/test_process_events.py @@ -0,0 +1,56 @@ +from datetime import datetime +from typing import Generator +from unittest.mock import patch + +import pytest +from freezegun import freeze_time +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.server import settings +from dstack._internal.server.background.tasks.process_events import delete_events +from dstack._internal.server.models import EventModel +from dstack._internal.server.services import events +from dstack._internal.server.testing.common import create_user + + +@pytest.fixture(autouse=True) +def set_feature_flag() -> Generator[None, None, None]: + with patch("dstack._internal.settings.FeatureFlags.EVENTS", True): + yield + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +async def test_deletes_old_events(test_db, session: AsyncSession) -> None: + user = await create_user(session=session) + for i in range(10): + with freeze_time(datetime(2026, 1, 1, i)): + events.emit( + session, + message=f"Event {i}", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(user)], + ) + await session.commit() + + res = await session.execute(select(EventModel)) + all_events = res.scalars().all() + assert len(all_events) == 10 + + with ( + patch.multiple(settings, SERVER_EVENTS_TTL_SECONDS=5 * 3600), + freeze_time(datetime(2026, 1, 1, 10)), + ): + await delete_events() + + res = await session.execute(select(EventModel).order_by(EventModel.recorded_at)) + remaining_events = res.scalars().all() + assert len(remaining_events) == 5 + assert [e.message for e in remaining_events] == [ + "Event 5", + "Event 6", + "Event 7", + "Event 8", + "Event 9", + ] diff --git a/src/tests/_internal/server/routers/test_events.py b/src/tests/_internal/server/routers/test_events.py new file mode 100644 index 000000000..149f28ee3 --- /dev/null +++ b/src/tests/_internal/server/routers/test_events.py @@ -0,0 +1,1294 @@ +import uuid +from datetime import datetime +from typing import Generator +from unittest.mock import patch + +import pytest +from freezegun import freeze_time +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.users import GlobalRole, ProjectRole +from dstack._internal.server.services import events +from dstack._internal.server.services.projects import add_project_member +from dstack._internal.server.testing.common import ( + create_fleet, + create_instance, + create_job, + create_project, + create_repo, + create_run, + create_user, + get_auth_headers, +) + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.usefixtures("test_db", "image_config_mock"), + pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True), +] + + +@pytest.fixture(autouse=True) +def set_feature_flag() -> Generator[None, None, None]: + with patch("dstack._internal.settings.FeatureFlags.EVENTS", True): + yield + + +class TestListEventsGeneral: + async def test_response_format(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session, name="test_user") + project = await create_project(session=session, owner=user, name="test_project") + await add_project_member( + session=session, + project=project, + user=user, + project_role=ProjectRole.ADMIN, + ) + event_ids = [uuid.uuid4() for _ in range(2)] + with patch("uuid.uuid4", side_effect=event_ids): + with freeze_time(datetime(2026, 1, 1, 12, 0, 0)): + events.emit( + session, + "User added to project", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(user), events.Target.from_model(project)], + ) + with freeze_time(datetime(2026, 1, 1, 12, 0, 1)): + events.emit( + session, + "Project updated", + actor=events.SystemActor(), + targets=[events.Target.from_model(project)], + ) + await session.commit() + + resp = await client.post("/api/events/list", headers=get_auth_headers(user.token), json={}) + resp.raise_for_status() + resp_data = resp.json() + for event in resp_data: + event["targets"].sort(key=lambda t: t["type"]) # for consistent comparison + assert resp_data == [ + { + "id": str(event_ids[1]), + "message": "Project updated", + "recorded_at": "2026-01-01T12:00:01+00:00", + "actor_user_id": None, + "actor_user": None, + "targets": [ + { + "type": "project", + "project_id": str(project.id), + "id": str(project.id), + "name": "test_project", + }, + ], + }, + { + "id": str(event_ids[0]), + "message": "User added to project", + "recorded_at": "2026-01-01T12:00:00+00:00", + "actor_user_id": str(user.id), + "actor_user": "test_user", + "targets": [ + { + "type": "project", + "project_id": str(project.id), + "id": str(project.id), + "name": "test_project", + }, + { + "type": "user", + "project_id": None, + "id": str(user.id), + "name": "test_user", + }, + ], + }, + ] + + async def test_empty_response_when_no_events( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session=session) + resp = await client.post("/api/events/list", headers=get_auth_headers(user.token), json={}) + resp.raise_for_status() + assert resp.json() == [] + + +class TestListEventsAccessControl: + async def test_user_sees_events_about_themselves( + self, session: AsyncSession, client: AsyncClient + ) -> None: + admin_user = await create_user( + session=session, + name="admin", + global_role=GlobalRole.ADMIN, + ) + regular_user = await create_user( + session=session, + name="regular", + global_role=GlobalRole.USER, + ) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(admin_user), + targets=[events.Target.from_model(admin_user)], + ) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(admin_user), + targets=[events.Target.from_model(regular_user)], + ) + await session.commit() + + # Regular user only sees the event about themselves + resp = await client.post( + "/api/events/list", headers=get_auth_headers(regular_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(regular_user.id) + + # Admin sees all events + resp = await client.post( + "/api/events/list", headers=get_auth_headers(admin_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_user_sees_events_within_their_project( + self, session: AsyncSession, client: AsyncClient + ) -> None: + admin_user = await create_user( + session=session, + name="admin", + global_role=GlobalRole.ADMIN, + ) + regular_user = await create_user( + session=session, + name="regular", + global_role=GlobalRole.USER, + ) + admin_project = await create_project( + session=session, + name="admin", + owner=admin_user, + ) + regular_project = await create_project( + session=session, + name="regular", + owner=regular_user, + ) + await add_project_member( + session=session, + project=admin_project, + user=admin_user, + project_role=ProjectRole.ADMIN, + ) + await add_project_member( + session=session, + project=regular_project, + user=regular_user, + project_role=ProjectRole.USER, + ) + admin_fleet = await create_fleet( + session=session, + project=admin_project, + name="admin", + ) + regular_fleet = await create_fleet( + session=session, + project=regular_project, + name="regular", + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(admin_user), + targets=[events.Target.from_model(admin_project)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(admin_user), + targets=[events.Target.from_model(regular_project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(admin_user), + targets=[events.Target.from_model(admin_fleet)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(admin_user), + targets=[events.Target.from_model(regular_fleet)], + ) + await session.commit() + + # Regular user only sees the events within their project + resp = await client.post( + "/api/events/list", headers=get_auth_headers(regular_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert {resp.json()[0]["targets"][0]["id"], resp.json()[1]["targets"][0]["id"]} == { + str(regular_project.id), + str(regular_fleet.id), + } + + # Admin sees all events + resp = await client.post( + "/api/events/list", headers=get_auth_headers(admin_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 4 + + async def test_filters_do_not_bypass_access_control( + self, session: AsyncSession, client: AsyncClient + ) -> None: + admin = await create_user( + session=session, + name="admin", + global_role=GlobalRole.ADMIN, + ) + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session) + fleet = await create_fleet(session=session, project=project) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(admin), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(admin), + targets=[events.Target.from_model(fleet)], + ) + await session.commit() + + # Regular user can't see events from a project they are not a member of + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + + # Admin can see the events + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(admin.token), + json={"within_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(admin.token), + json={"target_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(admin.token), + json={"target_fleets": [str(fleet.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + +class TestListEventsFilters: + async def test_target_projects(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project_a = await create_project(session=session, name="project_a", owner=user) + project_b = await create_project(session=session, name="project_b", owner=user) + fleet_a = await create_fleet(session=session, project=project_a) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(user)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(project_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(project_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project_a.id), str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_users(self, session: AsyncSession, client: AsyncClient) -> None: + user_a = await create_user(session=session, name="user_a") + user_b = await create_user(session=session, name="user_b") + project_a = await create_project(session=session, name="project_a", owner=user_a) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(user_a), + targets=[events.Target.from_model(user_a)], + ) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(user_b), + targets=[events.Target.from_model(user_b)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user_a), + targets=[events.Target.from_model(project_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"target_users": [str(user_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(user_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_b.token), + json={"target_users": [str(user_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(user_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"target_users": [str(user_a.id), str(user_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_fleets(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet_a = await create_fleet( + session=session, + project=project, + name="fleet_a", + ) + fleet_b = await create_fleet( + session=session, + project=project, + name="fleet_b", + ) + instance_a = await create_instance( + session=session, + project=project, + fleet=fleet_a, + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_b)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(fleet_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(fleet_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet_a.id), str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_instances(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) + instance_a = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + instance_b = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_a)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_b)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(instance_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(instance_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_a.id), str(instance_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_runs(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run_a = await create_run( + session=session, + project=project, + run_name="run_a", + repo=repo, + user=user, + ) + run_b = await create_run( + session=session, + project=project, + run_name="run_b", + repo=repo, + user=user, + ) + job_a = await create_job( + session=session, + run=run_a, + ) + events.emit( + session, + "Run created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run_a)], + ) + events.emit( + session, + "Run created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run_b)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(job_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_runs": [str(run_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(run_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_runs": [str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(run_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_runs": [str(run_a.id), str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_jobs(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run( + session=session, + project=project, + run_name="run", + repo=repo, + user=user, + ) + job_a = await create_job( + session=session, + run=run, + ) + job_b = await create_job( + session=session, + run=run, + ) + events.emit( + session, + "Run created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(job_a)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(job_b)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_jobs": [str(job_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(job_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_jobs": [str(job_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(job_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_jobs": [str(job_a.id), str(job_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_within_projects(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project_a = await create_project(session=session, name="project_a", owner=user) + project_b = await create_project(session=session, name="project_b", owner=user) + fleet_a = await create_fleet(session=session, project=project_a) + instance_a = await create_instance( + session=session, + project=project_a, + fleet=fleet_a, + ) + events.emit( + session, + "User created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(user)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 3 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project_a.id), str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 4 + + async def test_within_fleets(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet_a = await create_fleet( + session=session, + project=project, + name="fleet_a", + ) + fleet_b = await create_fleet( + session=session, + project=project, + name="fleet_b", + ) + isinstance_a = await create_instance( + session=session, + project=project, + fleet=fleet_a, + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_b)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(isinstance_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_fleets": [str(fleet_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_fleets": [str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_fleets": [str(fleet_a.id), str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 3 + + async def test_within_runs(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run_a = await create_run( + session=session, + project=project, + run_name="run_a", + repo=repo, + user=user, + ) + run_b = await create_run( + session=session, + project=project, + run_name="run_b", + repo=repo, + user=user, + ) + job_a = await create_job( + session=session, + run=run_a, + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Run created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run_a)], + ) + events.emit( + session, + "Run created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run_b)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(job_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_runs": [str(run_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_runs": [str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_runs": [str(run_a.id), str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 3 + + async def test_include_target_types(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"include_target_types": ["fleet"]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["type"] == "fleet" + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"include_target_types": ["instance"]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["type"] == "instance" + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"include_target_types": ["project", "fleet"]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert {resp.json()[0]["targets"][0]["type"], resp.json()[1]["targets"][0]["type"]} == { + "project", + "fleet", + } + + async def test_within_projects_and_include_target_types( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session=session) + project_a = await create_project(session=session, name="project_a", owner=user) + project_b = await create_project(session=session, name="project_b", owner=user) + fleet_a = await create_fleet(session=session, project=project_a) + instance_a = await create_instance( + session=session, + project=project_a, + fleet=fleet_a, + ) + fleet_b = await create_fleet(session=session, project=project_b) + instance_b = await create_instance( + session=session, + project=project_b, + fleet=fleet_b, + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_b)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_b)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={ + "within_projects": [str(project_a.id)], + "include_target_types": ["fleet"], + }, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["type"] == "fleet" + assert resp.json()[0]["targets"][0]["id"] == str(fleet_a.id) + + async def test_actors(self, session: AsyncSession, client: AsyncClient) -> None: + user_a = await create_user(session=session, name="user_a") + user_b = await create_user(session=session, name="user_b") + project_a = await create_project(session=session, owner=user_a, name="project_a") + project_b = await create_project(session=session, owner=user_b, name="project_b") + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user_a), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user_b), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Project updated", + actor=events.SystemActor(), + targets=[events.Target.from_model(project_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [str(user_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Project created" + assert resp.json()[0]["targets"][0]["id"] == str(project_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [str(user_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Project created" + assert resp.json()[0]["targets"][0]["id"] == str(project_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [None]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Project updated" + assert resp.json()[0]["targets"][0]["id"] == str(project_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [str(user_a.id), None]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert {resp.json()[0]["targets"][0]["id"], resp.json()[1]["targets"][0]["id"]} == { + str(project_a.id) + } + + async def test_event_included_if_at_least_one_target_is_within_filters( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) + instance_a = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + instance_b = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Fleet instances created", + actor=events.UserActor.from_user(user), + targets=[ + events.Target.from_model(instance_a), + events.Target.from_model(instance_b), + ], + ) + instance_c = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_c)], + ) + await session.commit() + + for target_instances in [[instance_a.id], [instance_b.id], [instance_a.id, instance_b.id]]: + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": list(map(str, target_instances))}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Fleet instances created" + assert len(resp.json()[0]["targets"]) == 2 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_c.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Instance created" + assert len(resp.json()[0]["targets"]) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_a.id), str(instance_c.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + +class TestListEventsPagination: + @pytest.mark.parametrize("ascending", [True, False]) + async def test_pagination( + self, session: AsyncSession, client: AsyncClient, ascending: bool + ) -> None: + users = [] + for i in range(5): + user = await create_user(session=session, name=f"user_{i}") + users.append(user) + with freeze_time(datetime(2026, 1, 1, 12, 0, 0, i)): + events.emit( + session, + "User created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(user)], + ) + await session.commit() + + if not ascending: + users.reverse() + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert resp.json()[0]["targets"][0]["name"] == users[0].name + assert resp.json()[1]["targets"][0]["name"] == users[1].name + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert resp.json()[0]["targets"][0]["name"] == users[2].name + assert resp.json()[1]["targets"][0]["name"] == users[3].name + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["name"] == users[4].name + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + + async def test_limits_events_regardless_number_of_targets( + self, session: AsyncSession, client: AsyncClient + ) -> None: + users = [await create_user(session=session, name=f"user_{i}") for i in range(3)] + with freeze_time(datetime(2026, 1, 1, 12, 0, 0, 0)): + events.emit( + session, + "Users batch created", + actor=events.SystemActor(), + targets=[events.Target.from_model(users[0]), events.Target.from_model(users[1])], + ) + with freeze_time(datetime(2026, 1, 1, 12, 0, 0, 1)): + events.emit( + session, + "User created", + actor=events.SystemActor(), + targets=[events.Target.from_model(users[2])], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + json={ + "limit": 1, + "ascending": True, + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Users batch created" + assert len(resp.json()[0]["targets"]) == 2 + assert {resp.json()[0]["targets"][0]["id"], resp.json()[0]["targets"][1]["id"]} == { + str(users[0].id), + str(users[1].id), + } + + resp = await client.post( + "/api/events/list", + json={ + "limit": 1, + "ascending": True, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "User created" + assert len(resp.json()[0]["targets"]) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(users[2].id) + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": True, + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 2