From 6c6ea252ff54b6f50f6e110c980cc18d410423bd Mon Sep 17 00:00:00 2001 From: emma Date: Wed, 17 Sep 2025 16:42:24 -0400 Subject: [PATCH 1/4] wip backend changes --- backend/btrixcloud/crawlconfigs.py | 132 ++++++++++++++++++++++++++++- backend/btrixcloud/models.py | 20 +++++ 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 6087337411..48d82abcb3 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -4,7 +4,17 @@ # pylint: disable=too-many-lines -from typing import List, Optional, TYPE_CHECKING, cast, Dict, Tuple, Annotated, Union +from typing import ( + AsyncIterator, + List, + Optional, + TYPE_CHECKING, + cast, + Dict, + Tuple, + Annotated, + Union, +) import asyncio import json @@ -17,10 +27,13 @@ import aiohttp from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import StreamingResponse import pymongo from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .models import ( + BatchCrawlRunOut, + BatchTotalOut, CrawlConfigIn, ConfigRevision, CrawlConfig, @@ -804,6 +817,61 @@ async def get_crawl_configs( return configs, total + async def run_crawls_by_filters( + self, + org: Organization, + user: User, + created_by: UUID | None = None, + modified_by: UUID | None = None, + profile_ids: list[UUID] | None = None, + first_seed: str | None = None, + name: str | None = None, + description: str | None = None, + tags: list[str] | None = None, + tag_match: ListFilterType | None = ListFilterType.AND, + schedule: bool | None = None, + is_crawl_running: bool | None = None, + ): + crawl_configs, total = await self.get_crawl_configs( + org, + created_by=created_by, + modified_by=modified_by, + profile_ids=profile_ids, + first_seed=first_seed, + name=name, + description=description, + tags=tags, + tag_match=tag_match, + schedule=schedule, + is_crawl_running=is_crawl_running, + page_size=999999999, # TODO do this better later + page=1, + sort_by="lastRun", + sort_direction=-1, + ) + yield BatchTotalOut(total=total).model_dump_json() + + async def run_crawl_with_metadata(config: CrawlConfigOut): + """Helper that runs crawl and returns metadata with result""" + try: + crawl_id = await self.run_now(config.id, org, user) + return BatchCrawlRunOut(crawl_id=crawl_id, success=True) + except HTTPException as e: + return BatchCrawlRunOut(crawl_id=config.id, success=False, error=e) + + async with asyncio.TaskGroup() as tg: + tasks = [ + tg.create_task(run_crawl_with_metadata(config)) + for config in crawl_configs + ] + + completed = 0 + for task in asyncio.as_completed(tasks): + result = await task + completed += 1 + result.position = completed + yield result.model_dump_json() + async def is_profile_in_use(self, profileid: UUID, org: Organization) -> bool: """return true/false if any active workflows exist with given profile""" res = await self.crawl_configs.find_one( @@ -1674,6 +1742,68 @@ async def validate_custom_behavior( ): return await ops.validate_custom_behavior(behavior.customBehavior) + # GROUP ACTIONS + + @router.post("/runMultiple", response_model=StartedResponse) + async def run_multiple( + org: Organization = Depends(org_crawl_dep), + user: User = Depends(user_dep), + # createdBy, kept as userid for API compatibility + created_by: Annotated[ + UUID | None, Query(alias="userid", title="Created By User ID") + ] = None, + modified_by: Annotated[ + UUID | None, Query(alias="modifiedBy", title="Modified By User ID") + ] = None, + profile_ids: Annotated[ + list[UUID] | None, Query(alias="profileIds", title="Profile IDs") + ] = None, + first_seed: Annotated[ + str | None, Query(alias="firstSeed", title="First Seed") + ] = None, + name: str | None = None, + description: str | None = None, + tags: Annotated[list[str] | None, Query(alias="tags", title="Tags")] = None, + tag_match: Annotated[ + ListFilterType | None, + Query( + alias="tagMatch", + title="Tag Match Type", + description='Defaults to `"and"` if omitted', + ), + ] = ListFilterType.AND, + schedule: bool | None = None, + is_crawl_running: Annotated[ + bool | None, Query(alias="isCrawlRunning", title="Is Crawl Running") + ] = None, + ): + if first_seed: + first_seed = urllib.parse.unquote(first_seed) + + if name: + name = urllib.parse.unquote(name) + + if description: + description = urllib.parse.unquote(description) + + return StreamingResponse( + ops.run_crawls_by_filters( + org=org, + user=user, + created_by=created_by, + modified_by=modified_by, + profile_ids=profile_ids, + first_seed=first_seed, + name=name, + description=description, + tags=tags, + tag_match=tag_match, + schedule=schedule, + is_crawl_running=is_crawl_running, + ), + media_type="application/json", + ) + org_ops.router.include_router(router) return ops diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 3c511f601b..521ee8d9f1 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -12,6 +12,7 @@ import os from typing import Optional, List, Dict, Union, Literal, Any, get_args +from fastapi import HTTPException from typing_extensions import Annotated from pydantic import ( @@ -3096,3 +3097,22 @@ class ListFilterType(str, Enum): OR = "or" AND = "and" + + +# BATCH ACTIONS +# ============================================================================ + + +class BatchTotalOut(BaseModel): + """Response model for batch total""" + + total: int + + +class BatchCrawlRunOut(BaseModel): + """Response model for crawl runs""" + + error: HTTPException | None = None + crawl_id: str | UUID + success: bool + position: int | None = None From dbae544d64126cf5c046fcce392525d246f7ecbc Mon Sep 17 00:00:00 2001 From: emma Date: Wed, 17 Sep 2025 19:16:19 -0400 Subject: [PATCH 2/4] implement run multiple --- backend/btrixcloud/crawlconfigs.py | 108 +++++++++++------------------ backend/btrixcloud/models.py | 37 +++++++++- 2 files changed, 76 insertions(+), 69 deletions(-) diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 48d82abcb3..f8686580f2 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -5,7 +5,6 @@ # pylint: disable=too-many-lines from typing import ( - AsyncIterator, List, Optional, TYPE_CHECKING, @@ -13,7 +12,6 @@ Dict, Tuple, Annotated, - Union, ) import asyncio @@ -33,6 +31,7 @@ from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .models import ( BatchCrawlRunOut, + BatchFilter, BatchTotalOut, CrawlConfigIn, ConfigRevision, @@ -700,8 +699,8 @@ async def update_usernames(self, userid: UUID, updated_name: str) -> None: async def get_crawl_configs( self, org: Organization, - page_size: int = DEFAULT_PAGE_SIZE, - page: int = 1, + page_size: int | None = DEFAULT_PAGE_SIZE, + page: int | None = 1, created_by: Optional[UUID] = None, modified_by: Optional[UUID] = None, profile_ids: Optional[List[UUID]] = None, @@ -718,10 +717,12 @@ async def get_crawl_configs( """Get all crawl configs for an organization is a member of""" # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Zero-index page for query - page = page - 1 - skip = page * page_size + page = page - 1 if page is not None else 1 + skip = page * page_size if page_size is not None else 0 - match_query = {"oid": org.id, "inactive": {"$ne": True}} + match_query: dict[ + str, object | str | int | list[dict[str, object | str | int]] + ] = {"oid": org.id, "inactive": {"$ne": True}} if tags: query_type = "$all" if tag_match == ListFilterType.AND else "$in" @@ -752,7 +753,7 @@ async def get_crawl_configs( match_query["isCrawlRunning"] = is_crawl_running # pylint: disable=duplicate-code - aggregate: List[Dict[str, Union[object, str, int]]] = [ + aggregate: list[dict[str, object | str | int]] = [ {"$match": match_query}, {"$unset": ["config"]}, ] @@ -783,14 +784,15 @@ async def get_crawl_configs( aggregate.extend([{"$sort": sort_query}]) + items_stages = [{"$skip": skip}] + if page_size is not None: + items_stages.append({"$limit": page_size}) + aggregate.extend( [ { "$facet": { - "items": [ - {"$skip": skip}, - {"$limit": page_size}, - ], + "items": items_stages, "total": [{"$count": "count"}], } }, @@ -844,20 +846,21 @@ async def run_crawls_by_filters( tag_match=tag_match, schedule=schedule, is_crawl_running=is_crawl_running, - page_size=999999999, # TODO do this better later - page=1, - sort_by="lastRun", - sort_direction=-1, + page_size=None, ) - yield BatchTotalOut(total=total).model_dump_json() + yield BatchTotalOut(total=total).model_dump_json(indent=None) + "\n" + print(f"Got {total} crawl configs with filter params: {created_by}, {modified_by}, {profile_ids}, {first_seed}, {name}, {description}, {tags}, {tag_match}, {schedule}, {is_crawl_running}") async def run_crawl_with_metadata(config: CrawlConfigOut): """Helper that runs crawl and returns metadata with result""" try: crawl_id = await self.run_now(config.id, org, user) return BatchCrawlRunOut(crawl_id=crawl_id, success=True) - except HTTPException as e: - return BatchCrawlRunOut(crawl_id=config.id, success=False, error=e) + except Exception as e: + print(f"Error running crawl for config {config.id}: {e}") + return BatchCrawlRunOut( + crawl_id=str(config.id), success=False, error=str(e) + ) async with asyncio.TaskGroup() as tg: tasks = [ @@ -870,7 +873,7 @@ async def run_crawl_with_metadata(config: CrawlConfigOut): result = await task completed += 1 result.position = completed - yield result.model_dump_json() + yield result.model_dump_json(indent=None) + "\n" async def is_profile_in_use(self, profileid: UUID, org: Organization) -> bool: """return true/false if any active workflows exist with given profile""" @@ -1744,64 +1747,37 @@ async def validate_custom_behavior( # GROUP ACTIONS - @router.post("/runMultiple", response_model=StartedResponse) + @router.post("/batch/run") async def run_multiple( + filter: BatchFilter, org: Organization = Depends(org_crawl_dep), user: User = Depends(user_dep), - # createdBy, kept as userid for API compatibility - created_by: Annotated[ - UUID | None, Query(alias="userid", title="Created By User ID") - ] = None, - modified_by: Annotated[ - UUID | None, Query(alias="modifiedBy", title="Modified By User ID") - ] = None, - profile_ids: Annotated[ - list[UUID] | None, Query(alias="profileIds", title="Profile IDs") - ] = None, - first_seed: Annotated[ - str | None, Query(alias="firstSeed", title="First Seed") - ] = None, - name: str | None = None, - description: str | None = None, - tags: Annotated[list[str] | None, Query(alias="tags", title="Tags")] = None, - tag_match: Annotated[ - ListFilterType | None, - Query( - alias="tagMatch", - title="Tag Match Type", - description='Defaults to `"and"` if omitted', - ), - ] = ListFilterType.AND, - schedule: bool | None = None, - is_crawl_running: Annotated[ - bool | None, Query(alias="isCrawlRunning", title="Is Crawl Running") - ] = None, ): - if first_seed: - first_seed = urllib.parse.unquote(first_seed) + if filter.first_seed: + filter.first_seed = urllib.parse.unquote(filter.first_seed) - if name: - name = urllib.parse.unquote(name) + if filter.name: + filter.name = urllib.parse.unquote(filter.name) - if description: - description = urllib.parse.unquote(description) + if filter.description: + filter.description = urllib.parse.unquote(filter.description) return StreamingResponse( ops.run_crawls_by_filters( org=org, user=user, - created_by=created_by, - modified_by=modified_by, - profile_ids=profile_ids, - first_seed=first_seed, - name=name, - description=description, - tags=tags, - tag_match=tag_match, - schedule=schedule, - is_crawl_running=is_crawl_running, + created_by=filter.created_by, + modified_by=filter.modified_by, + profile_ids=filter.profile_ids, + first_seed=filter.first_seed, + name=filter.name, + description=filter.description, + tags=filter.tags, + tag_match=filter.tag_match, + schedule=filter.schedule, + is_crawl_running=filter.is_crawl_running, ), - media_type="application/json", + media_type="application/jsonl", ) org_ops.router.include_router(router) diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 521ee8d9f1..4483c970e6 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -12,7 +12,6 @@ import os from typing import Optional, List, Dict, Union, Literal, Any, get_args -from fastapi import HTTPException from typing_extensions import Annotated from pydantic import ( @@ -3103,6 +3102,38 @@ class ListFilterType(str, Enum): # ============================================================================ +class BatchFilter(BaseModel): + """Base model for batch filters""" + + created_by: Annotated[ + UUID | None, Field(alias="createdBy", title="Created By User ID") + ] = None + modified_by: Annotated[ + UUID | None, Field(alias="modifiedBy", title="Modified By User ID") + ] = None + profile_ids: Annotated[ + list[UUID] | None, Field(alias="profileIds", title="Profile IDs") + ] = None + first_seed: Annotated[ + str | None, Field(alias="firstSeed", title="First Seed") + ] = None + name: str | None = None + description: str | None = None + tags: list[str] | None = None + tag_match: Annotated[ + ListFilterType | None, + Field( + alias="tagMatch", + title="Tag Match Type", + description='Defaults to `"and"` if omitted', + ), + ] = ListFilterType.AND + schedule: bool | None = None + is_crawl_running: Annotated[ + bool | None, Field(alias="isCrawlRunning", title="Is Crawl Running") + ] = None + + class BatchTotalOut(BaseModel): """Response model for batch total""" @@ -3112,7 +3143,7 @@ class BatchTotalOut(BaseModel): class BatchCrawlRunOut(BaseModel): """Response model for crawl runs""" - error: HTTPException | None = None - crawl_id: str | UUID + error: str | None = None + crawl_id: str success: bool position: int | None = None From a7ff415e13f0940071bab823c6a3f24003e97081 Mon Sep 17 00:00:00 2001 From: emma Date: Wed, 17 Sep 2025 19:17:37 -0400 Subject: [PATCH 3/4] format --- backend/btrixcloud/crawlconfigs.py | 4 +++- backend/btrixcloud/models.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index f8686580f2..51ad9c33d9 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -849,7 +849,9 @@ async def run_crawls_by_filters( page_size=None, ) yield BatchTotalOut(total=total).model_dump_json(indent=None) + "\n" - print(f"Got {total} crawl configs with filter params: {created_by}, {modified_by}, {profile_ids}, {first_seed}, {name}, {description}, {tags}, {tag_match}, {schedule}, {is_crawl_running}") + print( + f"Got {total} crawl configs with filter params: {created_by}, {modified_by}, {profile_ids}, {first_seed}, {name}, {description}, {tags}, {tag_match}, {schedule}, {is_crawl_running}" + ) async def run_crawl_with_metadata(config: CrawlConfigOut): """Helper that runs crawl and returns metadata with result""" diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 4483c970e6..09ed1443cd 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -3114,9 +3114,9 @@ class BatchFilter(BaseModel): profile_ids: Annotated[ list[UUID] | None, Field(alias="profileIds", title="Profile IDs") ] = None - first_seed: Annotated[ - str | None, Field(alias="firstSeed", title="First Seed") - ] = None + first_seed: Annotated[str | None, Field(alias="firstSeed", title="First Seed")] = ( + None + ) name: str | None = None description: str | None = None tags: list[str] | None = None From 7e6ed189da2553937d51442e17666ebd5c984d79 Mon Sep 17 00:00:00 2001 From: emma Date: Thu, 18 Sep 2025 17:16:29 -0400 Subject: [PATCH 4/4] wip --- frontend/src/controllers/api.ts | 48 ++++++++++++++++++++++ frontend/src/pages/org/workflows-list.ts | 51 +++++++++++++++++++++++- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/frontend/src/controllers/api.ts b/frontend/src/controllers/api.ts index ab50bbc6a8..a8b10d4b4b 100644 --- a/frontend/src/controllers/api.ts +++ b/frontend/src/controllers/api.ts @@ -20,6 +20,30 @@ export enum AbortReason { RequestTimeout = "request-timeout", } +function splitStream(splitOn: string) { + let buffer = ""; + + return new TransformStream({ + transform(chunk, controller) { + buffer += chunk; + const parts = buffer.split(splitOn); + parts.slice(0, -1).forEach((part) => controller.enqueue(part)); + buffer = parts[parts.length - 1]; + }, + flush(controller) { + if (buffer) controller.enqueue(buffer); + }, + }); +} + +function parseJSON() { + return new TransformStream({ + transform(chunk, controller) { + controller.enqueue(JSON.parse(chunk) as T); + }, + }); +} + /** * Utilities for interacting with the Browsertrix backend API * @@ -261,4 +285,28 @@ export class APIController implements ReactiveController { this.onUploadProgress.cancel(); } + + async fetchStream< + T = unknown, + Body extends RequestInit["body"] | object = undefined, + >(path: string, options?: Omit & { body?: Body }) { + const mergedOptions: RequestInit | undefined = options as + | RequestInit + | undefined; + if (options?.body) { + mergedOptions!.body = JSON.stringify(options.body); + } + const response = await fetch(path, mergedOptions); + if (!response.ok) { + throw new APIError({ + message: response.statusText, + status: response.status, + }); + } + const reader = response.body; + if (!reader) { + throw new Error("Response body is not readable"); + } + return reader.pipeThrough(splitStream("\n")).pipeThrough(parseJSON()); + } } diff --git a/frontend/src/pages/org/workflows-list.ts b/frontend/src/pages/org/workflows-list.ts index 24eb56fdee..8995715df7 100644 --- a/frontend/src/pages/org/workflows-list.ts +++ b/frontend/src/pages/org/workflows-list.ts @@ -53,6 +53,13 @@ type Sort = { direction: SortDirection; }; +type FilterBy = { + [K in keyof ListWorkflow]: ListWorkflow[K] extends string ? string : boolean; +}; +type BoolFilterKeys = { + [K in keyof FilterBy]: FilterBy[K] extends boolean ? K : never; +}[keyof FilterBy]; + const FILTER_BY_CURRENT_USER_STORAGE_KEY = "btrix.filterByCurrentUser.crawlConfigs"; const INITIAL_PAGE_SIZE = 10; @@ -128,7 +135,7 @@ export class WorkflowsList extends BtrixElement { private orderBy: Sort = DEFAULT_SORT; @state() - private filterBy: Partial<{ [k in keyof ListWorkflow]: boolean }> = {}; + private filterBy: Partial = {}; @state() private filterByCurrentUser = false; @@ -236,7 +243,7 @@ export class WorkflowsList extends BtrixElement { // Convert string bools to filter values if (value === "true") { - filterBy[key as keyof typeof filterBy] = true; + filterBy[key as BoolFilterKeys] = true; } else if (value === "false") { filterBy[key as keyof typeof filterBy] = false; } else { @@ -1158,4 +1165,44 @@ export class WorkflowsList extends BtrixElement { ); return data; } + + private async runBatchWorkflows() { + const data = await this.api.fetchStream< + BatchWorkflowUpdate, + BatchWorkflowFilter + >(`/orgs/${this.orgId}/crawlconfigs/batch/run`, { + method: "POST", + body: { + ...this.filterBy, + createdBy: this.filterByCurrentUser ? this.userInfo?.id : undefined, + tags: this.filterByTags, + tagMatch: this.filterByTagsType, + profileIds: this.filterByProfiles || undefined, + }, + }); + return data; + } } + +type BatchWorkflowFilter = { + createdBy?: string; + modifiedBy?: string; + profileIds?: string[]; + firstSeed?: string; + name?: string; + description?: string; + tags?: string[]; + tagMatch?: "and" | "or"; + schedule?: boolean; + isCrawlRunning?: boolean; +}; +type BatchWorkflowStart = { + total: number; +}; +type BatchWorkflowProgress = { + error?: string; + crawl_id: string; + success: boolean; + position: number; +}; +type BatchWorkflowUpdate = BatchWorkflowStart | BatchWorkflowProgress;