diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index e232332bec..58d3050752 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -49,6 +49,7 @@ UserFilePreparer, MIN_UPLOAD_PART_SIZE, PublicCollOut, + ResourcesOnly, ) from .utils import ( dt_now, @@ -57,6 +58,8 @@ get_origin, ) +from .crawlmanager import CrawlManager + if TYPE_CHECKING: from .orgs import OrgOps from .storages import StorageOps @@ -81,8 +84,16 @@ class CollectionOps: event_webhook_ops: EventWebhookOps crawl_ops: CrawlOps page_ops: PageOps + crawl_manager: CrawlManager - def __init__(self, mdb, storage_ops, orgs, event_webhook_ops): + def __init__( + self, + mdb, + orgs: OrgOps, + storage_ops: StorageOps, + crawl_manager: CrawlManager, + event_webhook_ops: EventWebhookOps, + ): self.collections = mdb["collections"] self.crawls = mdb["crawls"] self.crawl_configs = mdb["crawl_configs"] @@ -91,6 +102,7 @@ def __init__(self, mdb, storage_ops, orgs, event_webhook_ops): self.orgs = orgs self.storage_ops = storage_ops + self.crawl_manager = crawl_manager self.event_webhook_ops = event_webhook_ops def set_crawl_ops(self, ops): @@ -141,11 +153,15 @@ async def add_collection(self, oid: UUID, coll_in: CollIn): access=coll_in.access, defaultThumbnailName=coll_in.defaultThumbnailName, allowPublicDownload=coll_in.allowPublicDownload, + hasDedupIndex=coll_in.hasDedupIndex, ) try: await self.collections.insert_one(coll.to_dict()) org = await self.orgs.get_org_by_id(oid) await self.clear_org_previous_slugs_matching_slug(slug, org) + # create collection index + if coll.hasDedupIndex: + await self.crawl_manager.create_coll_index(coll) if crawl_ids: await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org) @@ -194,22 +210,33 @@ async def update_collection( db_update["$push"] = {"previousSlugs": previous_slug} try: - result = await self.collections.find_one_and_update( + prev_result = await self.collections.find_one_and_update( {"_id": coll_id, "oid": org.id}, db_update, - return_document=pymongo.ReturnDocument.AFTER, + return_document=pymongo.ReturnDocument.BEFORE, ) except pymongo.errors.DuplicateKeyError as err: # pylint: disable=raise-missing-from field = get_duplicate_key_error_field(err) raise HTTPException(status_code=400, detail=f"collection_{field}_taken") - if not result: + if not prev_result: raise HTTPException(status_code=404, detail="collection_not_found") if slug_update: await self.clear_org_previous_slugs_matching_slug(slug_update, org) + # if dedup index is true, but was false + if update.hasDedupIndex and not prev_result.get("hasDedupIndex"): + # get latest coll, create index + coll = await self.get_collection(coll_id, org.id) + await self.crawl_manager.create_coll_index(coll) + + # if dedup is false, but was true + if update.hasDedupIndex is False and prev_result.get("hasDedupIndex"): + # delete index -- may need extra restrictions + await self.crawl_manager.delete_coll_index(coll_id) + return {"updated": True} async def clear_org_previous_slugs_matching_slug( @@ -221,6 +248,16 @@ async def clear_org_previous_slugs_matching_slug( {"$pull": {"previousSlugs": slug}}, ) + async def get_coll_dedup_index(self, coll_id: UUID) -> bool: + """return true/false if collection has dedup index, or raise""" + result = await self.collections.find_one( + {"_id": coll_id}, projection=["hasDedupIndex"] + ) + if not result: + raise HTTPException(status_code=404, detail="collection_not_found") + + return result["hasDedupIndex"] is True + async def add_crawls_to_collection( self, coll_id: UUID, @@ -229,8 +266,6 @@ async def add_crawls_to_collection( headers: Optional[dict] = None, ) -> CollOut: """Add crawls to collection""" - await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org) - modified = dt_now() result = await self.collections.find_one_and_update( {"_id": coll_id}, @@ -240,8 +275,11 @@ async def add_crawls_to_collection( if not result: raise HTTPException(status_code=404, detail="collection_not_found") + # do this after checking if collection exists + await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org) + await self.update_collection_counts_and_tags(coll_id) - await self.update_collection_dates(coll_id, org.id) + await self.update_collection_dates(coll_id, org.id, update_index=True) asyncio.create_task( self.event_webhook_ops.create_added_to_collection_notification( @@ -294,6 +332,24 @@ async def get_collection_raw( return result + async def enable_dedup_index(self, coll_id: UUID): + """enable dedup index if it doesn't exist yet""" + result = await self.collections.find_one_and_update( + {"_id": coll_id, "hasDedupIndex": {"$ne": True}}, + {"$set": {"hasDedupIndex": True}}, + return_document=pymongo.ReturnDocument.AFTER, + ) + + # not changed, nothing to do + if not result: + return False + + coll = Collection.from_dict(result) + + await self.crawl_manager.create_coll_index(coll) + + return True + async def get_collection_raw_by_slug( self, coll_slug: str, @@ -396,6 +452,16 @@ async def get_collection_out( return CollOut.from_dict(result) + async def get_internal_replay_list(self, coll_id: UUID, oid: UUID) -> ResourcesOnly: + """get list of internally resolved signed WACZ files""" + org = await self.orgs.get_org_by_id(oid) + resources, _, _ = await self.get_collection_crawl_resources(coll_id, org) + + for file_ in resources: + file_.path = self.storage_ops.resolve_internal_access_path(file_.path) + + return ResourcesOnly(resources=resources) + async def get_public_collection_out( self, coll_id: UUID, @@ -639,6 +705,9 @@ async def delete_collection(self, coll_id: UUID, org: Organization): if coll.thumbnail: await self.delete_thumbnail(coll_id, org) + if coll.hasDedupIndex: + await self.crawl_manager.delete_coll_index(coll.id) + result = await self.collections.delete_one({"_id": coll_id, "oid": org.id}) if result.deleted_count < 1: raise HTTPException(status_code=404, detail="collection_not_found") @@ -740,7 +809,9 @@ async def update_collection_counts_and_tags(self, collection_id: UUID): }, ) - async def update_collection_dates(self, coll_id: UUID, oid: UUID): + async def update_collection_dates( + self, coll_id: UUID, oid: UUID, update_index=False + ): """Update collection earliest and latest dates from page timestamps""" # pylint: disable=too-many-locals coll = await self.get_collection(coll_id, oid) @@ -749,6 +820,10 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID): earliest_ts = None latest_ts = None + # update_index is set, update dedup index if it exists + if update_index and coll.hasDedupIndex: + await self.crawl_manager.update_coll_index(coll_id) + match_query = { "oid": coll.oid, "crawl_id": {"$in": crawl_ids}, @@ -783,13 +858,16 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID): async def update_crawl_collections(self, crawl_id: str, oid: UUID): """Update counts, dates, and modified for all collections in crawl""" + # accessing directly to handle both crawls and uploads crawl = await self.crawls.find_one({"_id": crawl_id}) - crawl_coll_ids = crawl.get("collectionIds") + crawl_coll_ids = crawl.get("collectionIds") or [] modified = dt_now() for coll_id in crawl_coll_ids: await self.update_collection_counts_and_tags(coll_id) - await self.update_collection_dates(coll_id, oid) + await self.update_collection_dates( + coll_id, oid, crawl.get("dedupCollId") != coll_id + ) await self.collections.find_one_and_update( {"_id": coll_id}, {"$set": {"modified": modified}}, @@ -1000,12 +1078,20 @@ async def calculate_thumbnail_storage(self, oid: UUID) -> int: # ============================================================================ # pylint: disable=too-many-locals def init_collections_api( - app, mdb, orgs, storage_ops, event_webhook_ops, user_dep + app, + mdb, + orgs: OrgOps, + storage_ops: StorageOps, + crawl_manager: CrawlManager, + event_webhook_ops: EventWebhookOps, + user_dep, ) -> CollectionOps: """init collections api""" # pylint: disable=invalid-name, unused-argument, too-many-arguments - colls: CollectionOps = CollectionOps(mdb, storage_ops, orgs, event_webhook_ops) + colls: CollectionOps = CollectionOps( + mdb, orgs, storage_ops, crawl_manager, event_webhook_ops + ) org_crawl_dep = orgs.org_crawl_dep org_viewer_dep = orgs.org_viewer_dep diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 9a1f5119b7..8d839f1006 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -316,6 +316,14 @@ async def add_crawl_config( first_seed = seeds[0].url + # the dedup collection id must also be in auto add collections + if config_in.dedupCollId: + if config_in.autoAddCollections is None: + config_in.autoAddCollections = [] + + if config_in.dedupCollId not in config_in.autoAddCollections: + config_in.autoAddCollections.append(config_in.dedupCollId) + now = dt_now() crawlconfig = CrawlConfig( id=uuid4(), @@ -343,6 +351,7 @@ async def add_crawl_config( firstSeed=first_seed, seedCount=seed_count, shareable=config_in.shareable, + dedupCollId=config_in.dedupCollId, ) if config_in.runNow: @@ -359,6 +368,9 @@ async def add_crawl_config( storage_quota_reached = False exec_mins_quota_reached = False + if config_in.dedupCollId: + await self.coll_ops.enable_dedup_index(config_in.dedupCollId) + if config_in.runNow: try: crawl_id = await self.run_now_internal(crawlconfig, org, user) @@ -602,6 +614,19 @@ async def update_crawl_config( update.tags is not None and ",".join(orig_crawl_config.tags) != ",".join(update.tags) ) + + if isinstance(update.dedupCollId, UUID): + if update.autoAddCollections is None: + update.autoAddCollections = [] + + if update.dedupCollId not in update.autoAddCollections: + update.autoAddCollections.append(update.dedupCollId) + + metadata_changed = metadata_changed or ( + update.dedupCollId is not None + and update.dedupCollId != orig_crawl_config.dedupCollId + ) + metadata_changed = metadata_changed or ( update.autoAddCollections is not None and sorted(orig_crawl_config.autoAddCollections) @@ -629,7 +654,7 @@ async def update_crawl_config( query["modifiedByName"] = user.name query["modified"] = dt_now() - # if empty str, just clear the profile + # profile - if empty str, just clear the profile if update.profileid == "": query["profileid"] = None # else, ensure its a valid profile @@ -637,6 +662,14 @@ async def update_crawl_config( await self.profiles.get_profile(cast(UUID, update.profileid), org) query["profileid"] = update.profileid + # dedup - if empty dedupCollId, clear the coll id + if update.dedupCollId == "": + query["dedupCollId"] = None + # else, enable dedup on collection + if isinstance(update.dedupCollId, UUID): + query["dedupCollId"] = update.dedupCollId + await self.coll_ops.enable_dedup_index(update.dedupCollId) + if update.config is not None: query["config"] = update.config.dict() @@ -1116,6 +1149,10 @@ async def remove_collection_from_all_configs( {"$pull": {"autoAddCollections": coll_id}}, ) + await self.crawl_configs.update_many( + {"oid": org.id, "dedupCollId": coll_id}, {"$set": {"dedupCollId": None}} + ) + async def get_crawl_config_tags(self, org): """get distinct tags from all crawl configs for this org""" return await self.crawl_configs.distinct("tags", {"oid": org.id}) diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index 1b72632af2..c992a053b5 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -5,13 +5,14 @@ from typing import Optional, Dict, Tuple from datetime import datetime, timedelta +from uuid import UUID from fastapi import HTTPException from .utils import dt_now, date_to_str, scale_from_browser_windows from .k8sapi import K8sAPI -from .models import StorageRef, CrawlConfig, BgJobType +from .models import StorageRef, CrawlConfig, BgJobType, Collection # ============================================================================ @@ -293,6 +294,9 @@ async def create_crawl_job( storage_filename=storage_filename, profile_filename=profile_filename, proxy_id=crawlconfig.proxyId or DEFAULT_PROXY_ID, + dedup_coll_id=( + str(crawlconfig.dedupCollId) if crawlconfig.dedupCollId else "" + ), is_single_page=is_single_page, seed_file_url=seed_file_url, ) @@ -468,6 +472,31 @@ async def delete_crawl_config_by_id(self, cid: str) -> None: """Delete all crawl configs by id""" await self._delete_crawl_configs(f"btrix.crawlconfig={cid}") + async def create_coll_index(self, collection: Collection): + """create collection index""" + params = { + "id": str(collection.id), + "oid": str(collection.oid), + "collItemsUpdatedAt": date_to_str(collection.modified or dt_now()), + } + data = self.templates.env.get_template("coll_index.yaml").render(params) + + await self.create_from_yaml(data) + + return str(collection.id) + + async def update_coll_index(self, coll_id: UUID): + """force collection index to update""" + return await self.patch_custom_object( + f"collindex-{coll_id}", + {"collItemsUpdatedAt": date_to_str(dt_now())}, + "collindexes", + ) + + async def delete_coll_index(self, coll_id: UUID): + """delete collection index""" + return await self.delete_custom_object(f"collindex-{coll_id}", "collindexes") + # ======================================================================== # Internal Methods async def _delete_crawl_configs(self, label) -> None: diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index c6c8780f06..c16cbdd01c 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -403,6 +403,7 @@ async def add_new_crawl( version=2, firstSeed=crawlconfig.firstSeed, seedCount=crawlconfig.seedCount, + dedupCollId=crawlconfig.dedupCollId, ) try: diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index f878872c7c..a9f458ab18 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -14,6 +14,7 @@ from kubernetes_asyncio.client.exceptions import ApiException from redis import asyncio as aioredis +from redis.asyncio.client import Redis from fastapi import HTTPException from fastapi.templating import Jinja2Templates @@ -51,6 +52,7 @@ def __init__(self): # custom resource's client API self.add_custom_resource("CrawlJob", "crawljobs") self.add_custom_resource("ProfileJob", "profilejobs") + self.add_custom_resource("CollIndex", "collindexes") def add_custom_resource(self, name, plural): """add custom resource""" @@ -60,11 +62,9 @@ def get_custom_api(self, kind): """return custom API""" return self.custom_resources[kind] if kind in self.custom_resources else None - def get_redis_url(self, crawl_id): - """get redis url for crawl id""" - redis_url = ( - f"redis://redis-{crawl_id}.redis.{self.namespace}.svc.cluster.local/0" - ) + def get_redis_url(self, obj_id): + """get redis url for obj id""" + redis_url = f"redis://redis-{obj_id}.redis.{self.namespace}.svc.cluster.local/0" return redis_url async def get_redis_client(self, redis_url): @@ -76,6 +76,23 @@ async def get_redis_client(self, redis_url): socket_timeout=20, ) + async def get_redis_connected(self, obj_id: str) -> Optional[Redis]: + """init redis, ensure connectivity""" + redis_url = self.get_redis_url(obj_id) + redis = None + try: + redis = await self.get_redis_client(redis_url) + # test connection + await redis.ping() + return redis + + # pylint: disable=bare-except + except: + if redis: + await redis.close() + + return None + # pylint: disable=too-many-arguments, too-many-locals def new_crawl_job_yaml( self, @@ -95,6 +112,7 @@ def new_crawl_job_yaml( profile_filename: str = "", qa_source: str = "", proxy_id: str = "", + dedup_coll_id: str = "", is_single_page: bool = False, seed_file_url: str = "", ): @@ -121,6 +139,7 @@ def new_crawl_job_yaml( "profile_filename": profile_filename, "qa_source": qa_source, "proxy_id": proxy_id, + "dedup_coll_id": dedup_coll_id, "is_single_page": "1" if is_single_page else "0", "seed_file_url": seed_file_url, } @@ -146,6 +165,7 @@ async def new_crawl_job( profile_filename: str = "", qa_source: str = "", proxy_id: str = "", + dedup_coll_id: str = "", is_single_page: bool = False, seed_file_url: str = "", ) -> str: @@ -167,6 +187,7 @@ async def new_crawl_job( profile_filename=profile_filename, qa_source=qa_source, proxy_id=proxy_id, + dedup_coll_id=dedup_coll_id, is_single_page=is_single_page, seed_file_url=seed_file_url, ) @@ -228,14 +249,26 @@ async def has_storage_secret(self, storage_secret) -> bool: async def delete_crawl_job(self, crawl_id): """delete custom crawljob object""" - try: - name = f"crawljob-{crawl_id}" + name = f"crawljob-{crawl_id}" + return await self.delete_custom_object(name, "crawljobs") + + async def delete_profile_browser(self, browserid): + """delete custom crawljob object""" + name = f"profilejobs-{browserid}" + + res = await self.delete_custom_object(name, "profilejobs") + + return res.get("success") is True + + async def delete_custom_object(self, name: str, plural: str): + """delete custom object with name and plural type""" + try: await self.custom_api.delete_namespaced_custom_object( group="btrix.cloud", version="v1", namespace=self.namespace, - plural="crawljobs", + plural=plural, name=name, grace_period_seconds=0, # delete as background to allow operator to do proper cleanup @@ -246,23 +279,6 @@ async def delete_crawl_job(self, crawl_id): except ApiException as api_exc: return {"error": str(api_exc.reason)} - async def delete_profile_browser(self, browserid): - """delete custom crawljob object""" - try: - await self.custom_api.delete_namespaced_custom_object( - group="btrix.cloud", - version="v1", - namespace=self.namespace, - plural="profilejobs", - name=f"profilejob-{browserid}", - grace_period_seconds=0, - propagation_policy="Background", - ) - return True - - except ApiException: - return False - async def get_profile_browser(self, browserid): """get profile browser""" return await self.custom_api.get_namespaced_custom_object( @@ -273,9 +289,15 @@ async def get_profile_browser(self, browserid): name=f"profilejob-{browserid}", ) - async def _patch_job(self, crawl_id, body, pluraltype="crawljobs") -> dict: + async def _patch_job(self, obj_id, body, pluraltype="crawljobs") -> dict: + """patch crawl/profile job""" + name = f"{pluraltype[:-1]}-{obj_id}" + + return await self.patch_custom_object(name, body, pluraltype) + + async def patch_custom_object(self, name: str, body, pluraltype: str) -> dict: + """patch custom object""" try: - name = f"{pluraltype[:-1]}-{crawl_id}" await self.custom_api.patch_namespaced_custom_object( group="btrix.cloud", diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index 785e76afac..0a5968f73a 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -238,7 +238,13 @@ def main() -> None: ) coll_ops = init_collections_api( - app, mdb, org_ops, storage_ops, event_webhook_ops, current_active_user + app, + mdb, + org_ops, + storage_ops, + crawl_manager, + event_webhook_ops, + current_active_user, ) base_crawl_init = ( diff --git a/backend/btrixcloud/migrations/migration_0010_collection_total_size.py b/backend/btrixcloud/migrations/migration_0010_collection_total_size.py index 8e6234954a..abf05e79df 100644 --- a/backend/btrixcloud/migrations/migration_0010_collection_total_size.py +++ b/backend/btrixcloud/migrations/migration_0010_collection_total_size.py @@ -2,9 +2,16 @@ Migration 0010 - Precomputing collection total size """ +from typing import cast + from btrixcloud.colls import CollectionOps from btrixcloud.migrations import BaseMigration +from btrixcloud.orgs import OrgOps +from btrixcloud.storages import StorageOps +from btrixcloud.webhooks import EventWebhookOps +from btrixcloud.crawlmanager import CrawlManager + MIGRATION_VERSION = "0010" @@ -22,7 +29,13 @@ async def migrate_up(self): Recompute collection data to include totalSize. """ # pylint: disable=duplicate-code - coll_ops = CollectionOps(self.mdb, None, None, None) + coll_ops = CollectionOps( + self.mdb, + cast(OrgOps, None), + cast(StorageOps, None), + cast(CrawlManager, None), + cast(EventWebhookOps, None), + ) async for coll in coll_ops.collections.find({}): coll_id = coll["_id"] diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index fd70cc0585..185270b3b0 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -2,6 +2,8 @@ Crawl-related models and types """ +# pylint: disable=invalid-name, too-many-lines + from datetime import datetime from enum import Enum, IntEnum from uuid import UUID @@ -89,7 +91,6 @@ def _validate(cls, value: CasedEmailStr, /) -> CasedEmailStr: return validate_email(value)[1].lower() -# pylint: disable=invalid-name, too-many-lines # ============================================================================ class UserRole(IntEnum): """User role""" @@ -237,7 +238,7 @@ class UserOrgInfoOut(BaseModel): RUNNING_STATES = get_args(TYPE_RUNNING_STATES) TYPE_WAITING_STATES = Literal[ - "starting", "waiting_capacity", "waiting_org_limit", "paused" + "starting", "waiting_capacity", "waiting_org_limit", "waiting_dedup_index", "paused" ] WAITING_STATES = get_args(TYPE_WAITING_STATES) @@ -394,6 +395,8 @@ class CrawlConfigIn(BaseModel): proxyId: Optional[str] = None autoAddCollections: Optional[List[UUID]] = [] + dedupCollId: Optional[UUID] = None + tags: Optional[List[str]] = [] crawlTimeout: int = 0 @@ -460,6 +463,8 @@ class CrawlConfigCore(BaseMongoModel): firstSeed: str = "" seedCount: int = 0 + dedupCollId: Optional[UUID] = None + # ============================================================================ class CrawlConfigAdditional(BaseModel): @@ -545,6 +550,7 @@ class UpdateCrawlConfig(BaseModel): tags: Optional[List[str]] = None description: Optional[str] = None autoAddCollections: Optional[List[UUID]] = None + dedupCollId: Union[UUID, EmptyStr, None] = None runNow: bool = False updateRunning: bool = False @@ -588,6 +594,8 @@ class CrawlConfigDefaults(BaseModel): customBehaviors: List[str] = [] + dedupCollId: Optional[UUID] = None + # ============================================================================ class CrawlConfigAddedResponse(BaseModel): @@ -1498,6 +1506,13 @@ class PageUrlCount(BaseModel): snapshots: List[PageIdTimestamp] = [] +# ============================================================================ +class ResourcesOnly(BaseModel): + """Resources-only response""" + + resources: Optional[List[CrawlFileOut]] = [] + + # ============================================================================ class CrawlOutWithResources(CrawlOut): """Crawl output model including resources""" @@ -1588,6 +1603,8 @@ class Collection(BaseMongoModel): previousSlugs: List[str] = [] + hasDedupIndex: bool = False + # ============================================================================ class CollIn(BaseModel): @@ -1604,6 +1621,8 @@ class CollIn(BaseModel): defaultThumbnailName: Optional[str] = None allowPublicDownload: bool = True + hasDedupIndex: bool = False + # ============================================================================ class CollOut(BaseMongoModel): @@ -1648,6 +1667,7 @@ class CollOut(BaseMongoModel): downloadUrl: Optional[str] = None topPageHosts: List[HostCount] = [] + hasDedupIndex: bool = False # ============================================================================ @@ -1699,6 +1719,7 @@ class UpdateColl(BaseModel): defaultThumbnailName: Optional[str] = None allowPublicDownload: Optional[bool] = None thumbnailSource: Optional[CollectionThumbnailSource] = None + hasDedupIndex: Optional[bool] = None # ============================================================================ diff --git a/backend/btrixcloud/operator/__init__.py b/backend/btrixcloud/operator/__init__.py index ee353224a2..7b8bfff0ab 100644 --- a/backend/btrixcloud/operator/__init__.py +++ b/backend/btrixcloud/operator/__init__.py @@ -4,9 +4,16 @@ from .bgjobs import BgJobOperator from .cronjobs import CronJobOperator from .crawls import CrawlOperator +from .collindexes import CollIndexOperator from .baseoperator import K8sOpAPI -operator_classes = [ProfileOperator, BgJobOperator, CronJobOperator, CrawlOperator] +operator_classes = [ + ProfileOperator, + BgJobOperator, + CronJobOperator, + CrawlOperator, + CollIndexOperator, +] # ============================================================================ diff --git a/backend/btrixcloud/operator/collindexes.py b/backend/btrixcloud/operator/collindexes.py new file mode 100644 index 0000000000..8f2ed58659 --- /dev/null +++ b/backend/btrixcloud/operator/collindexes.py @@ -0,0 +1,176 @@ +"""Operator handler for CollIndexes""" + +from typing import Literal, get_args +import re + +from uuid import UUID +from pydantic import BaseModel + +from btrixcloud.utils import str_to_date + +from .models import MCSyncData, POD, JOB, CMAP +from .baseoperator import BaseOperator + + +TYPE_INDEX_STATES = Literal["initing", "importing", "ready"] +INDEX_STATES = get_args(TYPE_INDEX_STATES) + + +# ============================================================================ +class CollIndexStatus(BaseModel): + """CollIndex Status""" + + state: TYPE_INDEX_STATES = "initing" + + lastCollUpdated: str = "" + + +# ============================================================================ +class CollIndexSpec(BaseModel): + """CollIndex Spec""" + + id: UUID + oid: UUID + + collItemsUpdatedAt: str = "" + + +# ============================================================================ +class CollIndexOperator(BaseOperator): + """CollIndex Operation""" + + shared_params = {} + + def __init__(self, *args): + super().__init__(*args) + self.shared_params.update(self.k8s.shared_params) + self.shared_params["redis_storage"] = self.shared_params["redis_dedup_storage"] + self.shared_params["memory"] = self.shared_params["redis_dedup_memory"] + self.shared_params["cpu"] = self.shared_params["redis_cpu"] + self.shared_params["init_redis"] = True + self.shared_params["obj_type"] = "coll" + self.dedup_importer_channel = ( + self.shared_params.get("dedup_importer_channel") or "default" + ) + + def init_routes(self, app): + """init routes for this operator""" + + @app.post("/op/collindexes/sync") + async def mc_sync_index(data: MCSyncData): + return await self.sync_index(data) + + @app.post("/op/collindexes/finalize") + async def mc_finalize_index(data: MCSyncData): + return await self.sync_index(data) + + async def sync_index(self, data: MCSyncData): + """sync CollIndex object with existing state""" + spec = CollIndexSpec(**data.parent.get("spec", {})) + status = CollIndexStatus(**data.parent.get("status", {})) + + if data.finalizing: + # allow deletion + return {"status": status.dict(), "children": [], "finalized": True} + + index_id = str(spec.id) + redis_name = "redis-coll-" + index_id + new_children = self.load_redis(index_id, redis_name) + + redis = None + if redis_name in data.children[POD]: + redis = await self.k8s.get_redis_connected("coll-" + index_id) + else: + status.state = "initing" + + import_ts = self.get_import_ts(spec, status) + if import_ts: + import_job_name = f"import-{index_id}-{import_ts}" + new_children.extend(await self.load_import_job(index_id, import_job_name)) + new_children.extend( + await self.load_import_configmap( + index_id, import_job_name, spec.oid, data.children + ) + ) + status.state = "importing" + + if redis: + # attempt to set the last updated from redis when done + try: + last_update_ts = await redis.get("last_update_ts") + if last_update_ts: + status.lastCollUpdated = last_update_ts + + # index is ready! + if not data.children[JOB]: + status.state = "ready" + + # pylint: disable=broad-exception-caught + except Exception as e: + print(e) + + return { + "status": status.dict(exclude_none=True), + "children": new_children, + } + + def get_import_ts(self, spec: CollIndexSpec, status: CollIndexStatus): + """returnt rue if a reimport is needed based on last import date""" + coll_update_date = str_to_date(spec.collItemsUpdatedAt) + if not coll_update_date: + return None + + last_import_date = str_to_date(status.lastCollUpdated) + # do update from 'coll_update_date' timestamp + if not last_import_date or coll_update_date >= last_import_date: + return re.sub(r"[^0-9]", "", spec.collItemsUpdatedAt) + + return None + + def load_redis(self, index_id: str, name: str): + """create redis pods from yaml template""" + params = {} + params.update(self.shared_params) + params["name"] = name + params["id"] = index_id + + return self.load_from_yaml("redis.yaml", params) + + async def load_import_job(self, index_id: str, name: str): + """create indexer pods from yaml template""" + params = {} + params.update(self.shared_params) + params["name"] = name + params["id"] = index_id + params["crawler_image"] = self.crawl_config_ops.get_channel_crawler_image( + self.dedup_importer_channel + ) + + params["redis_url"] = self.k8s.get_redis_url("coll-" + index_id) + + return self.load_from_yaml("index-import-job.yaml", params) + + async def load_import_configmap( + self, index_id: str, name: str, oid: UUID, children + ): + """create configmap for import job, lookup resources only on first init""" + configmap = children[CMAP].get(name) + # pylint: disable=duplicate-code + if configmap: + metadata = configmap["metadata"] + configmap["metadata"] = { + "name": metadata["name"], + "namespace": metadata["namespace"], + "labels": metadata["labels"], + } + return [configmap] + + replay_list = await self.coll_ops.get_internal_replay_list(UUID(index_id), oid) + + params = {} + params.update(self.shared_params) + params["name"] = name + params["id"] = index_id + params["config"] = replay_list.json() + + return self.load_from_yaml("index-import-configmap.yaml", params) diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index fe6a6941d3..2ecc999459 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -51,6 +51,7 @@ CMAP, PVC, CJS, + COLLINDEX, BTRIX_API, ) @@ -152,8 +153,6 @@ async def sync_crawls(self, data: MCSyncData): cid = spec["cid"] oid = spec["oid"] - redis_url = self.k8s.get_redis_url(crawl_id) - params = {} params.update(self.k8s.shared_params) params["id"] = crawl_id @@ -172,6 +171,7 @@ async def sync_crawls(self, data: MCSyncData): storage=StorageRef(spec["storageName"]), crawler_channel=spec.get("crawlerChannel", "default"), proxy_id=spec.get("proxyId"), + dedup_coll_id=spec.get("dedupCollId"), scale=spec.get("scale", 1), browser_windows=spec.get("browserWindows", 1), started=data.parent["metadata"]["creationTimestamp"], @@ -246,12 +246,28 @@ async def sync_crawls(self, data: MCSyncData): return self._empty_response(status) if status.state in ("starting", "waiting_org_limit"): - if not await self.can_start_new(crawl, data, status): + if not self.can_start_new(crawl, data): + await self.set_state( + "waiting_org_limit", status, crawl, allowed_from=["starting"] + ) return self._empty_response(status) - await self.set_state( - "starting", status, crawl, allowed_from=["waiting_org_limit"] - ) + if status.state != "starting": + await self.set_state( + "starting", status, crawl, allowed_from=["waiting_org_limit"] + ) + + if status.state in ("starting", "waiting_dedup_index"): + if self.is_waiting_for_dedup_index(crawl, data): + await self.set_state( + "waiting_dedup_index", status, crawl, allowed_from=["starting"] + ) + return self._empty_response(status) + + if status.state != "starting": + await self.set_state( + "starting", status, crawl, allowed_from=["waiting_dedup_index"] + ) status.scale = len(pods) if status.scale: @@ -262,7 +278,7 @@ async def sync_crawls(self, data: MCSyncData): self.sync_resources(status, pod_name, pod, data.children) - status = await self.sync_crawl_state(redis_url, crawl, status, pods, data) + status = await self.sync_crawl_state(crawl, status, pods, data) if self.k8s.enable_auto_resize: # auto sizing handled here @@ -339,7 +355,14 @@ async def sync_crawls(self, data: MCSyncData): params["warc_prefix"] = spec.get("warcPrefix") - params["redis_url"] = redis_url + params["redis_url"] = self.k8s.get_redis_url(crawl_id) + + if crawl.dedup_coll_id: + params["redis_dedup_url"] = self.k8s.get_redis_url( + "coll-" + crawl.dedup_coll_id + ) + else: + params["redis_dedup_url"] = "" if spec.get("restartTime") != status.restartTime: # pylint: disable=invalid-name @@ -406,6 +429,7 @@ def _load_redis(self, params, status: CrawlStatus, crawl: CrawlSpec, children): pod_info = status.podStatus[name] params["name"] = name + params["obj_type"] = "crawl" params["cpu"] = pod_info.newCpu or params.get("redis_cpu") params["memory"] = pod_info.newMemory or params.get("redis_memory") params["no_pvc"] = crawl.is_single_page @@ -729,6 +753,7 @@ def get_related(self, data: MCBaseRequest): spec = data.parent.get("spec", {}) crawl_id = spec["id"] oid = spec.get("oid") + coll_id = spec.get("dedupCollId") # filter by role as well (job vs qa-job) role = data.parent.get("metadata", {}).get("labels", {}).get("role") related_resources = [ @@ -739,6 +764,21 @@ def get_related(self, data: MCBaseRequest): }, ] + if coll_id: + related_resources.append( + { + "apiVersion": BTRIX_API, + "resource": "collindexes", + "labelSelector": { + "matchLabels": { + "oid": oid, + "role": "collindex", + "coll": coll_id, + } + }, + } + ) + if self.k8s.enable_auto_resize: related_resources.append( { @@ -750,12 +790,11 @@ def get_related(self, data: MCBaseRequest): return {"relatedResources": related_resources} - async def can_start_new( + def can_start_new( self, crawl: CrawlSpec, data: MCSyncData, - status: CrawlStatus, - ): + ) -> bool: """return true if crawl can start, otherwise set crawl to 'queued' state until more crawls for org finish""" max_crawls = crawl.org.quotas.maxConcurrentCrawls or 0 @@ -784,9 +823,25 @@ async def can_start_new( break i += 1 - await self.set_state( - "waiting_org_limit", status, crawl, allowed_from=["starting"] - ) + return False + + def is_waiting_for_dedup_index(self, crawl: CrawlSpec, data: MCSyncData) -> bool: + """return true if we need to wait for dedup index to be ready + before starting the crawl""" + if not crawl.dedup_coll_id: + return False + + # index object doesn't exist + if not data.related[COLLINDEX]: + return True + + for index in data.related[COLLINDEX].values(): + if index.get("status", {}).get("state") == "ready": + return True + + # only check first index, should only be one + break + return False async def cancel_crawl( @@ -913,25 +968,8 @@ async def finalize_response( "finalized": finalized, } - async def _get_redis(self, redis_url: str) -> Optional[Redis]: - """init redis, ensure connectivity""" - redis = None - try: - redis = await self.k8s.get_redis_client(redis_url) - # test connection - await redis.ping() - return redis - - # pylint: disable=bare-except - except: - if redis: - await redis.close() - - return None - async def sync_crawl_state( self, - redis_url: str, crawl: CrawlSpec, status: CrawlStatus, pods: dict[str, dict], @@ -948,7 +986,7 @@ async def sync_crawl_state( try: if redis_running: - redis = await self._get_redis(redis_url) + redis = await self.k8s.get_redis_connected(crawl.id) await self.add_used_stats(crawl.id, status.podStatus, redis, metrics) @@ -1746,8 +1784,7 @@ async def inc_crawl_complete_stats(self, crawl: CrawlSpec, finished: datetime): async def mark_for_cancelation(self, crawl_id): """mark crawl as canceled in redis""" try: - redis_url = self.k8s.get_redis_url(crawl_id) - redis = await self._get_redis(redis_url) + redis = await self.k8s.get_redis_connected(crawl_id) if not redis: return False diff --git a/backend/btrixcloud/operator/cronjobs.py b/backend/btrixcloud/operator/cronjobs.py index fdd76c446d..909021b714 100644 --- a/backend/btrixcloud/operator/cronjobs.py +++ b/backend/btrixcloud/operator/cronjobs.py @@ -140,6 +140,9 @@ async def make_new_crawljob( storage_filename=self.crawl_config_ops.default_filename_template, profile_filename=profile_filename or "", proxy_id=crawlconfig.proxyId or "", + dedup_coll_id=( + str(crawlconfig.dedupCollId) if crawlconfig.dedupCollId else "" + ), is_single_page=self.crawl_config_ops.is_single_page(crawlconfig.config), ) diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py index ea45a47ec1..5f7383bed2 100644 --- a/backend/btrixcloud/operator/models.py +++ b/backend/btrixcloud/operator/models.py @@ -14,7 +14,9 @@ CMAP = "ConfigMap.v1" PVC = "PersistentVolumeClaim.v1" POD = "Pod.v1" +JOB = "Job.batch/v1" CJS = f"CrawlJob.{BTRIX_API}" +COLLINDEX = f"CollIndex.{BTRIX_API}" StopReason = Literal[ "stopped_by_user", @@ -86,6 +88,7 @@ class CrawlSpec(BaseModel): max_crawl_size: int = 0 qa_source_crawl_id: Optional[str] = "" proxy_id: Optional[str] = None + dedup_coll_id: Optional[str] = None is_single_page: bool = False seed_file_url: Optional[str] = "" diff --git a/backend/btrixcloud/ops.py b/backend/btrixcloud/ops.py index d821a7d909..eaaa528e8b 100644 --- a/backend/btrixcloud/ops.py +++ b/backend/btrixcloud/ops.py @@ -85,7 +85,9 @@ def init_ops() -> Tuple[ storage_ops, ) - coll_ops = CollectionOps(mdb, storage_ops, org_ops, event_webhook_ops) + coll_ops = CollectionOps( + mdb, org_ops, storage_ops, crawl_manager, event_webhook_ops + ) base_crawl_init = ( mdb, diff --git a/backend/test/test_collections.py b/backend/test/test_collections.py index 2077a4df94..e3e92c0edf 100644 --- a/backend/test/test_collections.py +++ b/backend/test/test_collections.py @@ -1168,6 +1168,7 @@ def test_list_public_collections( assert r.status_code == 404 assert r.json()["detail"] == "public_profile_not_found" + def test_same_collection_diff_orgs_correct(non_default_org_id, admin_auth_headers): # public collection exists in default org r = requests.get( @@ -1210,6 +1211,7 @@ def test_same_collection_diff_orgs_correct(non_default_org_id, admin_auth_header assert r.status_code == 200 assert r.json()["success"] + def test_list_public_collections_no_colls(non_default_org_id, admin_auth_headers): # Test existing org that's not public - should return same 404 as # if org doesn't exist diff --git a/chart/Chart.lock b/chart/Chart.lock index fec463cfa0..3a857265ff 100644 --- a/chart/Chart.lock +++ b/chart/Chart.lock @@ -4,12 +4,12 @@ dependencies: version: 0.1.0 - name: btrix-crds repository: file://./btrix-crds - version: 0.1.1 + version: 0.2.0 - name: metacontroller-helm repository: oci://ghcr.io/metacontroller version: 4.11.11 - name: btrix-proxies repository: file://./proxies/ version: 0.1.0 -digest: sha256:2fd9472f857e9e3eacdcc616a3cffac5bb2951411cc2d34aea84253092225ecf -generated: "2024-08-15T11:19:17.884682494+02:00" +digest: sha256:ab8c36eadd12d8235d86e43f449bca776cac5a0a0432131b9c9d0f3764c55f12 +generated: "2025-09-23T21:45:05.759421-07:00" diff --git a/chart/Chart.yaml b/chart/Chart.yaml index c8c1cccb48..fdb9a5cace 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -13,7 +13,7 @@ dependencies: condition: addons.admin.logging repository: file://./admin/logging - name: btrix-crds - version: 0.1.1 + version: 0.2.0 repository: file://./btrix-crds - name: metacontroller-helm version: 4.11.11 diff --git a/chart/app-templates/coll_index.yaml b/chart/app-templates/coll_index.yaml new file mode 100644 index 0000000000..947a3ac68f --- /dev/null +++ b/chart/app-templates/coll_index.yaml @@ -0,0 +1,18 @@ +apiVersion: btrix.cloud/v1 +kind: CollIndex +metadata: + name: collindex-{{ id }} + labels: + role: "collindex" + oid: {{ oid }} + coll: {{ id }} + +spec: + selector: + matchLabels: + coll: "{{ id }}" + + id: "{{ id }}" + oid: "{{ oid }}" + + collItemsUpdatedAt: "{{ collItemsUpdatedAt }}" diff --git a/chart/app-templates/crawl_job.yaml b/chart/app-templates/crawl_job.yaml index 756d263d8c..e9832091e1 100644 --- a/chart/app-templates/crawl_job.yaml +++ b/chart/app-templates/crawl_job.yaml @@ -8,6 +8,7 @@ metadata: btrix.org: "{{ oid }}" btrix.user: "{{ userid }}" btrix.storage: "{{ storage_name }}" + dedup_coll_id: "{{ dedup_coll_id }}" spec: selector: @@ -37,6 +38,8 @@ spec: proxyId: "{{ proxy_id }}" + dedupCollId: "{{ dedup_coll_id }}" + pausedAt: "{{ pausedAt }}" isSinglePage: "{{ is_single_page }}" diff --git a/chart/app-templates/crawler.yaml b/chart/app-templates/crawler.yaml index 5ff7b5ae7f..61a21af456 100644 --- a/chart/app-templates/crawler.yaml +++ b/chart/app-templates/crawler.yaml @@ -137,7 +137,10 @@ spec: - "{{ workers }}" - --redisStoreUrl - {{ redis_url }} - {% if qa_source_crawl_id %} + {% if redis_dedup_url %} + - --redisDedupUrl + - {{ redis_dedup_url }} + {% elif qa_source_crawl_id %} - --qaSource - /tmp/qa/qa-config.json {% elif profile_filename %} diff --git a/chart/app-templates/index-import-configmap.yaml b/chart/app-templates/index-import-configmap.yaml new file mode 100644 index 0000000000..4185e4e418 --- /dev/null +++ b/chart/app-templates/index-import-configmap.yaml @@ -0,0 +1,14 @@ +# ------- +# CONFIGMAP +# ------- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ name }} + namespace: {{ namespace }} + labels: + coll: {{ id }} + role: configmap + +data: + config.json: {{ config | tojson }} diff --git a/chart/app-templates/index-import-job.yaml b/chart/app-templates/index-import-job.yaml new file mode 100644 index 0000000000..c817b8f900 --- /dev/null +++ b/chart/app-templates/index-import-job.yaml @@ -0,0 +1,105 @@ +# ------- +# INDEXER +# ------- +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ name }} + labels: + role: index-import-job + coll: {{ id }} + +spec: + ttlSecondsAfterFinished: 0 + backoffLimit: 3 + template: + metadata: + labels: + role: index-import-job + coll: {{ id }} + + spec: + restartPolicy: OnFailure + priorityClassName: index-import-job + volumes: + - name: config + configMap: + name: {{ name }} + + containers: + - name: indexer + image: {{ crawler_image }} + imagePullPolicy: {{ crawler_image_pull_policy }} + command: + - indexer + - --sourceUrl + - /tmp/config/config.json + - --redisDedupUrl + - {{ redis_url }} + + volumeMounts: + - name: config + mountPath: /tmp/config/ + readOnly: True + + resources: + limits: + memory: "500Mi" + + requests: + cpu: "300m" + memory: "100Mi" + + {% if crawler_liveness_port and crawler_liveness_port != '0' %} + livenessProbe: + httpGet: + path: /healthz + port: {{ crawler_liveness_port }} + + initialDelaySeconds: 15 + periodSeconds: 120 + failureThreshold: 3 + {% endif %} + + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + + affinity: + {% if crawler_node_type %} + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: nodeType + operator: In + values: + - "{{ crawler_node_type }}" + {% endif %} + + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 10 + podAffinityTerm: + topologyKey: "kubernetes.io/hostname" + labelSelector: + matchExpressions: + - key: coll + operator: In + values: + - {{ id }} + + tolerations: + - key: nodeType + operator: Equal + value: crawling + effect: NoSchedule + - key: node.kubernetes.io/not-ready + operator: Exists + tolerationSeconds: 300 + effect: NoExecute + - key: node.kubernetes.io/unreachable + operator: Exists + effect: NoExecute + tolerationSeconds: 300 diff --git a/chart/app-templates/redis.yaml b/chart/app-templates/redis.yaml index 341f861be1..4cf2c31891 100644 --- a/chart/app-templates/redis.yaml +++ b/chart/app-templates/redis.yaml @@ -9,7 +9,7 @@ metadata: name: {{ name }} namespace: {{ namespace }} labels: - crawl: {{ id }} + {{ obj_type }}: {{ id }} role: redis spec: @@ -36,7 +36,7 @@ metadata: name: {{ name }} namespace: {{ namespace }} labels: - crawl: {{ id }} + {{ obj_type }}: {{ id }} role: redis spec: @@ -83,7 +83,7 @@ spec: topologyKey: "failure-domain.beta.kubernetes.io/zone" labelSelector: matchLabels: - crawl: {{ id }} + {{ obj_type }}: {{ id }} tolerations: - key: nodeType diff --git a/chart/btrix-crds/Chart.yaml b/chart/btrix-crds/Chart.yaml index c10f90be53..ba1aa5da88 100644 --- a/chart/btrix-crds/Chart.yaml +++ b/chart/btrix-crds/Chart.yaml @@ -7,9 +7,9 @@ icon: https://webrecorder.net/assets/icon.png # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.1 +version: 0.2.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. -appVersion: 0.1.1 +appVersion: 0.2.0 diff --git a/chart/btrix-crds/templates/collindex.yaml b/chart/btrix-crds/templates/collindex.yaml new file mode 100644 index 0000000000..7cdb391fc5 --- /dev/null +++ b/chart/btrix-crds/templates/collindex.yaml @@ -0,0 +1,49 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: collindexes.btrix.cloud +spec: + scope: Namespaced + group: btrix.cloud + names: + kind: CollIndex + plural: collindexes + singular: collindex + shortNames: + - cixs + + versions: + - name: v1 + served: true + storage: true + subresources: + status: {} + + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + x-kubernetes-preserve-unknown-fields: true + + status: + type: object + x-kubernetes-preserve-unknown-fields: true + + additionalPrinterColumns: + - name: State + type: string + jsonPath: .status.state + description: Crawl State + + - name: Time Created + type: date + jsonPath: .metadata.creationTimestamp + description: "time index created" + + - name: Time Updated + type: date + jsonPath: .status.updated + description: "time index updated" diff --git a/chart/charts/btrix-admin-logging-0.1.0.tgz b/chart/charts/btrix-admin-logging-0.1.0.tgz index facc99e306..eb651195f3 100644 Binary files a/chart/charts/btrix-admin-logging-0.1.0.tgz and b/chart/charts/btrix-admin-logging-0.1.0.tgz differ diff --git a/chart/charts/btrix-crds-0.1.1.tgz b/chart/charts/btrix-crds-0.1.1.tgz deleted file mode 100644 index 8a0a3149da..0000000000 Binary files a/chart/charts/btrix-crds-0.1.1.tgz and /dev/null differ diff --git a/chart/charts/btrix-crds-0.2.0.tgz b/chart/charts/btrix-crds-0.2.0.tgz new file mode 100644 index 0000000000..798feb8e15 Binary files /dev/null and b/chart/charts/btrix-crds-0.2.0.tgz differ diff --git a/chart/charts/btrix-proxies-0.1.0.tgz b/chart/charts/btrix-proxies-0.1.0.tgz index 33dca51c7d..7de5d64d49 100644 Binary files a/chart/charts/btrix-proxies-0.1.0.tgz and b/chart/charts/btrix-proxies-0.1.0.tgz differ diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index f87c5643f6..f1245a7fc4 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -152,6 +152,11 @@ data: redis_storage: "{{ .Values.redis_storage }}" + redis_dedup_storage: "{{ .Values.redis_dedup_storage }}" + redis_dedup_memory: "{{ .Values.redis_dedup_memory }}" + + dedup_importer_channel: {{ .Values.dedup_importer_channel | default "default" }} + # crawler crawler_image_pull_policy: {{ .Values.crawler_pull_policy }} diff --git a/chart/templates/operators.yaml b/chart/templates/operators.yaml index 7547ec4241..cb58dc891b 100644 --- a/chart/templates/operators.yaml +++ b/chart/templates/operators.yaml @@ -76,6 +76,55 @@ spec: port: {{ .Values.opPort }} path: /op/profilebrowsers/sync +--- +apiVersion: metacontroller.k8s.io/v1alpha1 +kind: CompositeController +metadata: + name: collindexes-operator +spec: + generateSelector: false + resyncPeriodSeconds: 30 + parentResource: + apiVersion: btrix.cloud/v1 + resource: collindexes + childResources: + - apiVersion: v1 + resource: pods + updateStrategy: + method: InPlace + + - apiVersion: v1 + resource: persistentvolumeclaims + updateStrategy: + method: InPlace + + - apiVersion: batch/v1 + resource: jobs + updateStrategy: + method: InPlace + + - apiVersion: v1 + resource: configmaps + updateStrategy: + method: InPlace + + hooks: + sync: + webhook: + service: + namespace: {{ .Release.Namespace }} + name: {{ .Values.name }}-backend + port: {{ .Values.opPort }} + path: /op/collindexes/sync + + finalize: + webhook: + service: + namespace: {{ .Release.Namespace }} + name: {{ .Values.name }}-backend + port: {{ .Values.opPort }} + path: /op/collindexes/finalize + --- apiVersion: metacontroller.k8s.io/v1alpha1 kind: DecoratorController diff --git a/chart/templates/priorities.yaml b/chart/templates/priorities.yaml index 97d3af72fb..98b959a047 100644 --- a/chart/templates/priorities.yaml +++ b/chart/templates/priorities.yaml @@ -35,4 +35,14 @@ value: -1000 globalDefault: false description: "Priority for background jobs" +# Higher Priority for Indexing +--- +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: index-import-job +value: 5 +globalDefault: false +description: "Priority for indexing" + diff --git a/chart/templates/role.yaml b/chart/templates/role.yaml index 36138b2c22..9b7aa4cc2b 100644 --- a/chart/templates/role.yaml +++ b/chart/templates/role.yaml @@ -14,7 +14,7 @@ rules: verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"] - apiGroups: ["btrix.cloud"] - resources: ["crawljobs", "profilejobs"] + resources: ["crawljobs", "profilejobs", "collindexes"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"] - apiGroups: ["metrics.k8s.io"] diff --git a/chart/test/test.yaml b/chart/test/test.yaml index b3f62e86d7..dcd9eecfcb 100644 --- a/chart/test/test.yaml +++ b/chart/test/test.yaml @@ -53,3 +53,8 @@ log_failed_crawl_lines: 200 # disable for tests disk_utilization_threshold: 0 + + +redis_dedup_memory: "50Mi" + +redis_dedup_storage: "1Gi" diff --git a/chart/values.yaml b/chart/values.yaml index 54305ab7eb..7813909952 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -243,6 +243,14 @@ redis_memory: "200Mi" redis_storage: "3Gi" + +# Redis Dedup Index +# ========================================= +redis_dedup_memory: "1Gi" + +redis_dedup_storage: "5Gi" + + # Crawler Channels # ========================================= # Support for additional crawler release channels @@ -263,6 +271,11 @@ crawler_pull_policy: "IfNotPresent" crawler_namespace: "crawlers" + +# set this to custom crawler channel used for dedup index importing +# dedup_importer_channel: "default" + + # if set, will restrict QA to image names that are >= than this value # min_qa_crawler_image: ""