Skip to content

Commit 4bdadb5

Browse files
authored
Make autocreated fleets opt-in (#3342)
* Add DSTACK_FF_AUTOCREATED_FLEETS_ENABLED * Drop autocreated fleets warning * Fix tests
1 parent a49d34c commit 4bdadb5

File tree

6 files changed

+37
-105
lines changed

6 files changed

+37
-105
lines changed

src/dstack/_internal/cli/services/configurators/run.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
from dstack._internal.utils.nested_list import NestedList, NestedListItem
6565
from dstack._internal.utils.path import is_absolute_posix_path
6666
from dstack.api._public.runs import Run
67-
from dstack.api.server import APIClient
6867
from dstack.api.utils import load_profile
6968

7069
_KNOWN_AMD_GPUS = {gpu.name.lower() for gpu in gpuhunt.KNOWN_AMD_GPUS}
@@ -232,8 +231,6 @@ def apply_configuration(
232231
)
233232
)
234233

235-
_warn_fleet_autocreated(self.api.client, run)
236-
237234
console.print(
238235
f"\n[code]{run.name}[/] provisioning completed [secondary]({run.status.value})[/]"
239236
)
@@ -939,16 +936,3 @@ def render_run_spec_diff(old_spec: RunSpec, new_spec: RunSpec) -> Optional[str]:
939936
item = NestedListItem(spec_field.replace("_", " ").capitalize())
940937
nested_list.children.append(item)
941938
return nested_list.render()
942-
943-
944-
def _warn_fleet_autocreated(api: APIClient, run: Run):
945-
if run._run.fleet is None:
946-
return
947-
fleet = api.fleets.get(project_name=run._project, name=run._run.fleet.name)
948-
if not fleet.spec.autocreated:
949-
return
950-
warn(
951-
f"\nThe run is using automatically created fleet [code]{fleet.name}[/code].\n"
952-
"Future dstack versions won't create fleets automatically.\n"
953-
"Create a fleet explicitly: https://dstack.ai/docs/concepts/fleets/"
954-
)

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ async def _process_submitted_job(
341341
job_model.last_processed_at = common_utils.get_current_datetime()
342342
await session.commit()
343343
return
344-
if FeatureFlags.AUTOCREATED_FLEETS_DISABLED:
344+
if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED:
345345
logger.debug("%s: no fleet found", fmt(job_model))
346346
job_model.status = JobStatus.TERMINATING
347347
job_model.termination_reason = (

src/dstack/_internal/server/services/runs/plan.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,10 @@ async def get_job_plans(
106106
exclude_not_available=False,
107107
)
108108
if _should_force_non_fleet_offers(run_spec) or (
109-
not FeatureFlags.AUTOCREATED_FLEETS_DISABLED
110-
and profile.fleets is None
111-
and fleet_model is None
109+
FeatureFlags.AUTOCREATED_FLEETS_ENABLED and profile.fleets is None and fleet_model is None
112110
):
113111
# Keep the old behavior returning all offers irrespective of fleets.
114112
# Needed for supporting offers with autocreated fleets flow (and for `dstack offer`).
115-
# TODO: Consider dropping when autocreated fleets are dropped.
116113
instance_offers, backend_offers = await _get_non_fleet_offers(
117114
session=session,
118115
project=project,
@@ -248,7 +245,6 @@ async def find_optimal_fleet_with_offers(
248245
# the run without additional provisioning and choose the one with the cheapest pool offer.
249246
# Then choose a fleet with the cheapest pool offer among all fleets with pool offers.
250247
# If there are no fleets with pool offers, choose a fleet with a cheapest backend offer.
251-
# Fallback to autocreated fleet if fleets have no pool or backend offers.
252248
# TODO: Consider trying all backend offers and then choosing a fleet.
253249
candidate_fleets_with_offers: list[
254250
tuple[
@@ -325,7 +321,7 @@ async def find_optimal_fleet_with_offers(
325321
return None, [], []
326322

327323
if (
328-
not FeatureFlags.AUTOCREATED_FLEETS_DISABLED
324+
FeatureFlags.AUTOCREATED_FLEETS_ENABLED
329325
and run_spec.merged_profile.fleets is None
330326
and all(t[3] == 0 and t[4] == 0 for t in candidate_fleets_with_offers)
331327
):

src/dstack/_internal/settings.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ class FeatureFlags:
3535
development. Feature flags are environment variables of the form DSTACK_FF_*
3636
"""
3737

38-
AUTOCREATED_FLEETS_DISABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_DISABLED") is not None
38+
# DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets:
39+
# If there are no fleet suitable for the run, a new fleet is created automatically instead of an error.
40+
AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None
3941

4042
# Enabling LEGACY_REPO_DIR_DISABLED does the following:
4143
# - Changes `working_dir` default value from `/workflow` to the image's working dir, unless

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 5 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -631,9 +631,8 @@ async def test_cannot_assign_multi_node_job_to_partially_busy_shared_instance(
631631
await session.refresh(instance)
632632
res = await session.execute(select(JobModel).options(joinedload(JobModel.instance)))
633633
job = res.unique().scalar_one()
634-
assert job.status == JobStatus.SUBMITTED
635-
assert job.instance_assigned
636-
assert job.instance is None
634+
assert job.status == JobStatus.TERMINATING
635+
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
637636
assert instance.total_blocks == 4
638637
assert instance.busy_blocks == 1
639638

@@ -724,41 +723,7 @@ async def test_creates_new_instance_in_existing_non_empty_fleet(
724723

725724
@pytest.mark.asyncio
726725
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
727-
async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: AsyncSession):
728-
project = await create_project(session)
729-
user = await create_user(session)
730-
repo = await create_repo(session=session, project_id=project.id)
731-
fleet = await create_fleet(session=session, project=project)
732-
instance = await create_instance(
733-
session=session,
734-
project=project,
735-
fleet=fleet,
736-
instance_num=0,
737-
status=InstanceStatus.BUSY,
738-
)
739-
fleet.instances.append(instance)
740-
run = await create_run(
741-
session=session,
742-
project=project,
743-
repo=repo,
744-
user=user,
745-
)
746-
job = await create_job(
747-
session=session,
748-
run=run,
749-
instance_assigned=False,
750-
)
751-
await session.commit()
752-
await process_submitted_jobs()
753-
await session.refresh(job)
754-
assert job.status == JobStatus.SUBMITTED
755-
assert job.instance_assigned
756-
assert job.instance_id is None
757-
assert job.fleet_id is None
758-
759-
@pytest.mark.asyncio
760-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
761-
async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncSession):
726+
async def test_fails_if_run_cannot_fit_into_fleet(self, test_db, session: AsyncSession):
762727
project = await create_project(session)
763728
user = await create_user(session)
764729
repo = await create_repo(session=session, project_id=project.id)
@@ -800,37 +765,8 @@ async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncS
800765
await session.commit()
801766
await process_submitted_jobs()
802767
await session.refresh(job)
803-
assert job.status == JobStatus.SUBMITTED
804-
assert job.instance_assigned
805-
assert job.instance_id is None
806-
assert job.fleet_id is None
807-
808-
@pytest.mark.asyncio
809-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
810-
async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified(
811-
self, test_db, session: AsyncSession
812-
):
813-
project = await create_project(session)
814-
user = await create_user(session)
815-
repo = await create_repo(session=session, project_id=project.id)
816-
fleet_spec = get_fleet_spec()
817-
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
818-
await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet")
819-
run = await create_run(
820-
session=session,
821-
project=project,
822-
repo=repo,
823-
user=user,
824-
)
825-
job = await create_job(
826-
session=session,
827-
run=run,
828-
instance_assigned=False,
829-
)
830-
await process_submitted_jobs()
831-
await session.refresh(job)
832-
assert job.status == JobStatus.SUBMITTED
833-
assert job.instance_assigned
768+
assert job.status == JobStatus.TERMINATING
769+
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
834770
assert job.instance_id is None
835771
assert job.fleet_id is None
836772

src/tests/_internal/server/routers/test_runs.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
ServiceConfiguration,
2222
TaskConfiguration,
2323
)
24+
from dstack._internal.core.models.fleets import FleetNodesSpec
2425
from dstack._internal.core.models.gateways import GatewayStatus
2526
from dstack._internal.core.models.instances import (
2627
InstanceAvailability,
@@ -59,6 +60,7 @@
5960
create_run,
6061
create_user,
6162
get_auth_headers,
63+
get_fleet_spec,
6264
get_job_provisioning_data,
6365
get_run_spec,
6466
)
@@ -173,7 +175,7 @@ def get_dev_env_run_plan_dict(
173175
"stop_duration": None,
174176
"max_price": None,
175177
"retry": None,
176-
"spot_policy": "spot",
178+
"spot_policy": "auto",
177179
"idle_duration": None,
178180
"utilization_policy": None,
179181
"startup_order": None,
@@ -198,7 +200,7 @@ def get_dev_env_run_plan_dict(
198200
"max_price": None,
199201
"name": "string",
200202
"retry": None,
201-
"spot_policy": "spot",
203+
"spot_policy": "auto",
202204
"idle_duration": None,
203205
"utilization_policy": None,
204206
"startup_order": None,
@@ -249,7 +251,7 @@ def get_dev_env_run_plan_dict(
249251
"shm_size": None,
250252
},
251253
"max_price": None,
252-
"spot": True,
254+
"spot": None,
253255
"reservation": None,
254256
"multinode": False,
255257
},
@@ -387,7 +389,7 @@ def get_dev_env_run_dict(
387389
"stop_duration": None,
388390
"max_price": None,
389391
"retry": None,
390-
"spot_policy": "spot",
392+
"spot_policy": "auto",
391393
"idle_duration": None,
392394
"utilization_policy": None,
393395
"startup_order": None,
@@ -412,7 +414,7 @@ def get_dev_env_run_dict(
412414
"max_price": None,
413415
"name": "string",
414416
"retry": None,
415-
"spot_policy": "spot",
417+
"spot_policy": "auto",
416418
"idle_duration": None,
417419
"utilization_policy": None,
418420
"startup_order": None,
@@ -458,7 +460,7 @@ def get_dev_env_run_dict(
458460
"shm_size": None,
459461
},
460462
"max_price": None,
461-
"spot": True,
463+
"spot": None,
462464
"reservation": None,
463465
"multinode": False,
464466
},
@@ -967,12 +969,15 @@ async def test_returns_run_plan_privileged_false(
967969
await add_project_member(
968970
session=session, project=project, user=user, project_role=ProjectRole.USER
969971
)
972+
fleet_spec = get_fleet_spec()
973+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
974+
await create_fleet(session=session, project=project, spec=fleet_spec)
970975
repo = await create_repo(session=session, project_id=project.id)
971976
offer_aws = InstanceOfferWithAvailability(
972977
backend=BackendType.AWS,
973978
instance=InstanceType(
974979
name="instance",
975-
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
980+
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
976981
),
977982
region="us",
978983
price=1.0,
@@ -982,7 +987,7 @@ async def test_returns_run_plan_privileged_false(
982987
backend=BackendType.RUNPOD,
983988
instance=InstanceType(
984989
name="instance",
985-
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
990+
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
986991
),
987992
region="us",
988993
price=2.0,
@@ -1030,12 +1035,15 @@ async def test_returns_run_plan_privileged_true(
10301035
await add_project_member(
10311036
session=session, project=project, user=user, project_role=ProjectRole.USER
10321037
)
1038+
fleet_spec = get_fleet_spec()
1039+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
1040+
await create_fleet(session=session, project=project, spec=fleet_spec)
10331041
repo = await create_repo(session=session, project_id=project.id)
10341042
offer_aws = InstanceOfferWithAvailability(
10351043
backend=BackendType.AWS,
10361044
instance=InstanceType(
10371045
name="instance",
1038-
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
1046+
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
10391047
),
10401048
region="us",
10411049
price=1.0,
@@ -1045,7 +1053,7 @@ async def test_returns_run_plan_privileged_true(
10451053
backend=BackendType.RUNPOD,
10461054
instance=InstanceType(
10471055
name="instance",
1048-
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
1056+
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
10491057
),
10501058
region="us",
10511059
price=2.0,
@@ -1090,12 +1098,15 @@ async def test_returns_run_plan_docker_true(
10901098
await add_project_member(
10911099
session=session, project=project, user=user, project_role=ProjectRole.USER
10921100
)
1101+
fleet_spec = get_fleet_spec()
1102+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
1103+
await create_fleet(session=session, project=project, spec=fleet_spec)
10931104
repo = await create_repo(session=session, project_id=project.id)
10941105
offer_aws = InstanceOfferWithAvailability(
10951106
backend=BackendType.AWS,
10961107
instance=InstanceType(
10971108
name="instance",
1098-
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
1109+
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
10991110
),
11001111
region="us",
11011112
price=1.0,
@@ -1105,7 +1116,7 @@ async def test_returns_run_plan_docker_true(
11051116
backend=BackendType.RUNPOD,
11061117
instance=InstanceType(
11071118
name="instance",
1108-
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
1119+
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
11091120
),
11101121
region="us",
11111122
price=2.0,
@@ -1150,6 +1161,9 @@ async def test_returns_run_plan_instance_volumes(
11501161
await add_project_member(
11511162
session=session, project=project, user=user, project_role=ProjectRole.USER
11521163
)
1164+
fleet_spec = get_fleet_spec()
1165+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
1166+
await create_fleet(session=session, project=project, spec=fleet_spec)
11531167
repo = await create_repo(session=session, project_id=project.id)
11541168
offer_aws = InstanceOfferWithAvailability(
11551169
backend=BackendType.AWS,

0 commit comments

Comments
 (0)