Skip to content

Commit 9ab038a

Browse files
authored
Optimize Concurrent Crawl Check (#2945)
Use DB and query first N non-completed crawls (by start time), instead of querying all k8s crawljob objects. Add additional index on crawls optimized for this query. Follow-up to #2940 to make concurrent crawler queue efficient even when there is no limit set.
1 parent 9979d73 commit 9ab038a

File tree

2 files changed

+31
-36
lines changed

2 files changed

+31
-36
lines changed

backend/btrixcloud/crawls.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,13 @@ async def init_index(self):
121121
await self.crawls.create_index(
122122
[("type", pymongo.HASHED), ("fileSize", pymongo.DESCENDING)]
123123
)
124-
124+
await self.crawls.create_index(
125+
[
126+
("state", pymongo.ASCENDING),
127+
("oid", pymongo.ASCENDING),
128+
("started", pymongo.ASCENDING),
129+
]
130+
)
125131
await self.crawls.create_index([("finished", pymongo.DESCENDING)])
126132
await self.crawls.create_index([("oid", pymongo.HASHED)])
127133
await self.crawls.create_index([("cid", pymongo.HASHED)])
@@ -336,6 +342,18 @@ async def list_crawls(
336342

337343
return crawls, total
338344

345+
async def get_active_crawls(self, oid: UUID, limit: int) -> list[str]:
346+
"""get list of waiting crawls, sorted from earliest to latest"""
347+
res = (
348+
self.crawls.find(
349+
{"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}, {"_id": 1}
350+
)
351+
.sort({"started": 1})
352+
.limit(limit)
353+
)
354+
res_list = await res.to_list()
355+
return [res["_id"] for res in res_list]
356+
339357
async def delete_crawls(
340358
self,
341359
org: Organization,

backend/btrixcloud/operator/crawls.py

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
TYPE_NON_RUNNING_STATES,
2222
TYPE_RUNNING_STATES,
2323
TYPE_ALL_CRAWL_STATES,
24-
NON_RUNNING_STATES,
2524
RUNNING_STATES,
2625
WAITING_STATES,
2726
RUNNING_AND_STARTING_ONLY,
@@ -53,7 +52,6 @@
5352
CMAP,
5453
PVC,
5554
CJS,
56-
BTRIX_API,
5755
)
5856

5957

@@ -258,7 +256,7 @@ async def sync_crawls(self, data: MCSyncData):
258256
return self._empty_response(status)
259257

260258
if status.state in ("starting", "waiting_org_limit"):
261-
if not await self.can_start_new(crawl, data, status):
259+
if not await self.can_start_new(crawl, status):
262260
return self._empty_response(status)
263261

264262
await self.set_state(
@@ -738,20 +736,10 @@ async def set_state(
738736

739737
def get_related(self, data: MCBaseRequest):
740738
"""return objects related to crawl pods"""
741-
spec = data.parent.get("spec", {})
742-
crawl_id = spec["id"]
743-
oid = spec.get("oid")
744-
# filter by role as well (job vs qa-job)
745-
role = data.parent.get("metadata", {}).get("labels", {}).get("role")
746-
related_resources = [
747-
{
748-
"apiVersion": BTRIX_API,
749-
"resource": "crawljobs",
750-
"labelSelector": {"matchLabels": {"btrix.org": oid, "role": role}},
751-
},
752-
]
753-
739+
related_resources = []
754740
if self.k8s.enable_auto_resize:
741+
spec = data.parent.get("spec", {})
742+
crawl_id = spec["id"]
755743
related_resources.append(
756744
{
757745
"apiVersion": METRICS_API,
@@ -765,7 +753,6 @@ def get_related(self, data: MCBaseRequest):
765753
async def can_start_new(
766754
self,
767755
crawl: CrawlSpec,
768-
data: MCSyncData,
769756
status: CrawlStatus,
770757
):
771758
"""return true if crawl can start, otherwise set crawl to 'queued' state
@@ -774,27 +761,17 @@ async def can_start_new(
774761
if not max_crawls:
775762
return True
776763

764+
next_active_crawls = await self.crawl_ops.get_active_crawls(
765+
crawl.oid, max_crawls
766+
)
767+
777768
# if total crawls < concurrent, always allow, no need to check further
778-
if len(data.related[CJS]) <= max_crawls:
769+
if len(next_active_crawls) < max_crawls:
779770
return True
780771

781-
name = data.parent.get("metadata", {}).get("name")
782-
783-
# assume crawls already sorted from oldest to newest
784-
# (seems to be the case always)
785-
i = 0
786-
for crawl_sorted in data.related[CJS].values():
787-
# if crawl not running, don't count
788-
if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES:
789-
continue
790-
791-
# if reached current crawl, if did not reach crawl quota, allow current crawl to run
792-
if crawl_sorted.get("metadata").get("name") == name:
793-
if i < max_crawls:
794-
return True
795-
796-
break
797-
i += 1
772+
# allow crawl if within first list of active crawls
773+
if crawl.id in next_active_crawls:
774+
return True
798775

799776
await self.set_state(
800777
"waiting_org_limit", status, crawl, allowed_from=["starting"]

0 commit comments

Comments
 (0)