-
-
Notifications
You must be signed in to change notification settings - Fork 59
Dedup Backend Initial Implementation #2868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
ikreymer
wants to merge
23
commits into
feature-dedup
Choose a base branch
from
dedup-initial
base: feature-dedup
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
f2c3ba1
operator crds:
ikreymer 4bd5525
work
ikreymer 183c47b
add crud for CollIndex object in collections
ikreymer 85c8691
add import job, minimally working
ikreymer 933020d
add btrix-crds 0.2.0
ikreymer f5e609a
add dedupCollId to crawler, support running crawler with dedup!
ikreymer 84c3b53
ensure collindex deleted on collection delete
ikreymer 5b68f54
add 'waiting_for_dedup_index' state to indicate crawl is awaiting ded…
ikreymer 3de00d4
make storage and memory configureable: lower settings for tests
ikreymer 96fa722
configmap: add missing settings
ikreymer ec7dfc8
make dedupCollId independent, but require dedup coll to also be in au…
ikreymer 61f5d2f
fix typo/formatting
ikreymer 1118d06
index import channel: support setting custom crawler channel to use f…
ikreymer 651878d
configmap: fix quotes
ikreymer e609e79
fix autoadd uploads to collections
ikreymer 1105ee3
Update backend/btrixcloud/crawlmanager.py
ikreymer 25f01cb
Apply suggestion from @tw4l
ikreymer 016944e
Apply suggestion from @tw4l
ikreymer 25f09a6
refactor toggle_dedup_index():
ikreymer 5507a25
chart: change index importer nodeAffinity to preferred not required
ikreymer 548193a
dedup updates:
ikreymer b9c4251
lint fix move
ikreymer c993b0d
more lint fixes
ikreymer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,7 @@ | |
UserFilePreparer, | ||
MIN_UPLOAD_PART_SIZE, | ||
PublicCollOut, | ||
ResourcesOnly, | ||
) | ||
from .utils import ( | ||
dt_now, | ||
|
@@ -57,6 +58,8 @@ | |
get_origin, | ||
) | ||
|
||
from .crawlmanager import CrawlManager | ||
|
||
if TYPE_CHECKING: | ||
from .orgs import OrgOps | ||
from .storages import StorageOps | ||
|
@@ -81,8 +84,16 @@ class CollectionOps: | |
event_webhook_ops: EventWebhookOps | ||
crawl_ops: CrawlOps | ||
page_ops: PageOps | ||
crawl_manager: CrawlManager | ||
|
||
def __init__(self, mdb, storage_ops, orgs, event_webhook_ops): | ||
def __init__( | ||
self, | ||
mdb, | ||
orgs: OrgOps, | ||
storage_ops: StorageOps, | ||
crawl_manager: CrawlManager, | ||
event_webhook_ops: EventWebhookOps, | ||
): | ||
self.collections = mdb["collections"] | ||
self.crawls = mdb["crawls"] | ||
self.crawl_configs = mdb["crawl_configs"] | ||
|
@@ -91,6 +102,7 @@ def __init__(self, mdb, storage_ops, orgs, event_webhook_ops): | |
|
||
self.orgs = orgs | ||
self.storage_ops = storage_ops | ||
self.crawl_manager = crawl_manager | ||
self.event_webhook_ops = event_webhook_ops | ||
|
||
def set_crawl_ops(self, ops): | ||
|
@@ -141,11 +153,15 @@ async def add_collection(self, oid: UUID, coll_in: CollIn): | |
access=coll_in.access, | ||
defaultThumbnailName=coll_in.defaultThumbnailName, | ||
allowPublicDownload=coll_in.allowPublicDownload, | ||
hasDedupIndex=coll_in.hasDedupIndex, | ||
) | ||
try: | ||
await self.collections.insert_one(coll.to_dict()) | ||
org = await self.orgs.get_org_by_id(oid) | ||
await self.clear_org_previous_slugs_matching_slug(slug, org) | ||
# create collection index | ||
if coll.hasDedupIndex: | ||
await self.crawl_manager.create_coll_index(coll) | ||
|
||
if crawl_ids: | ||
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org) | ||
|
@@ -194,22 +210,33 @@ async def update_collection( | |
db_update["$push"] = {"previousSlugs": previous_slug} | ||
|
||
try: | ||
result = await self.collections.find_one_and_update( | ||
prev_result = await self.collections.find_one_and_update( | ||
{"_id": coll_id, "oid": org.id}, | ||
db_update, | ||
return_document=pymongo.ReturnDocument.AFTER, | ||
return_document=pymongo.ReturnDocument.BEFORE, | ||
) | ||
except pymongo.errors.DuplicateKeyError as err: | ||
# pylint: disable=raise-missing-from | ||
field = get_duplicate_key_error_field(err) | ||
raise HTTPException(status_code=400, detail=f"collection_{field}_taken") | ||
|
||
if not result: | ||
if not prev_result: | ||
raise HTTPException(status_code=404, detail="collection_not_found") | ||
|
||
if slug_update: | ||
await self.clear_org_previous_slugs_matching_slug(slug_update, org) | ||
|
||
# if dedup index is true, but was false | ||
if update.hasDedupIndex and not prev_result.get("hasDedupIndex"): | ||
# get latest coll, create index | ||
coll = await self.get_collection(coll_id, org.id) | ||
await self.crawl_manager.create_coll_index(coll) | ||
|
||
# if dedup is false, but was true | ||
if update.hasDedupIndex is False and prev_result.get("hasDedupIndex"): | ||
# delete index -- may need extra restrictions | ||
await self.crawl_manager.delete_coll_index(coll_id) | ||
|
||
return {"updated": True} | ||
|
||
async def clear_org_previous_slugs_matching_slug( | ||
|
@@ -221,6 +248,16 @@ async def clear_org_previous_slugs_matching_slug( | |
{"$pull": {"previousSlugs": slug}}, | ||
) | ||
|
||
async def get_coll_dedup_index(self, coll_id: UUID) -> bool: | ||
"""return true/false if collection has dedup index, or raise""" | ||
result = await self.collections.find_one( | ||
{"_id": coll_id}, projection=["hasDedupIndex"] | ||
) | ||
if not result: | ||
raise HTTPException(status_code=404, detail="collection_not_found") | ||
|
||
return result["hasDedupIndex"] is True | ||
|
||
async def add_crawls_to_collection( | ||
self, | ||
coll_id: UUID, | ||
|
@@ -229,8 +266,6 @@ async def add_crawls_to_collection( | |
headers: Optional[dict] = None, | ||
) -> CollOut: | ||
"""Add crawls to collection""" | ||
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org) | ||
|
||
modified = dt_now() | ||
result = await self.collections.find_one_and_update( | ||
{"_id": coll_id}, | ||
|
@@ -240,8 +275,11 @@ async def add_crawls_to_collection( | |
if not result: | ||
raise HTTPException(status_code=404, detail="collection_not_found") | ||
|
||
# do this after checking if collection exists | ||
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org) | ||
|
||
await self.update_collection_counts_and_tags(coll_id) | ||
await self.update_collection_dates(coll_id, org.id) | ||
await self.update_collection_dates(coll_id, org.id, update_index=True) | ||
|
||
asyncio.create_task( | ||
self.event_webhook_ops.create_added_to_collection_notification( | ||
|
@@ -294,6 +332,24 @@ async def get_collection_raw( | |
|
||
return result | ||
|
||
async def enable_dedup_index(self, coll_id: UUID): | ||
"""enable dedup index if it doesn't exist yet""" | ||
result = await self.collections.find_one_and_update( | ||
{"_id": coll_id, "hasDedupIndex": {"$ne": True}}, | ||
{"$set": {"hasDedupIndex": True}}, | ||
return_document=pymongo.ReturnDocument.AFTER, | ||
) | ||
|
||
# not changed, nothing to do | ||
if not result: | ||
return False | ||
|
||
coll = Collection.from_dict(result) | ||
|
||
await self.crawl_manager.create_coll_index(coll) | ||
|
||
return True | ||
|
||
async def get_collection_raw_by_slug( | ||
self, | ||
coll_slug: str, | ||
|
@@ -396,6 +452,16 @@ async def get_collection_out( | |
|
||
return CollOut.from_dict(result) | ||
|
||
async def get_internal_replay_list(self, coll_id: UUID, oid: UUID) -> ResourcesOnly: | ||
"""get list of internally resolved signed WACZ files""" | ||
org = await self.orgs.get_org_by_id(oid) | ||
resources, _, _ = await self.get_collection_crawl_resources(coll_id, org) | ||
|
||
for file_ in resources: | ||
file_.path = self.storage_ops.resolve_internal_access_path(file_.path) | ||
|
||
return ResourcesOnly(resources=resources) | ||
|
||
async def get_public_collection_out( | ||
self, | ||
coll_id: UUID, | ||
|
@@ -639,6 +705,9 @@ async def delete_collection(self, coll_id: UUID, org: Organization): | |
if coll.thumbnail: | ||
await self.delete_thumbnail(coll_id, org) | ||
|
||
if coll.hasDedupIndex: | ||
await self.crawl_manager.delete_coll_index(coll.id) | ||
|
||
result = await self.collections.delete_one({"_id": coll_id, "oid": org.id}) | ||
if result.deleted_count < 1: | ||
raise HTTPException(status_code=404, detail="collection_not_found") | ||
|
@@ -740,7 +809,9 @@ async def update_collection_counts_and_tags(self, collection_id: UUID): | |
}, | ||
) | ||
|
||
async def update_collection_dates(self, coll_id: UUID, oid: UUID): | ||
async def update_collection_dates( | ||
self, coll_id: UUID, oid: UUID, update_index=False | ||
): | ||
"""Update collection earliest and latest dates from page timestamps""" | ||
# pylint: disable=too-many-locals | ||
coll = await self.get_collection(coll_id, oid) | ||
|
@@ -749,6 +820,10 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID): | |
earliest_ts = None | ||
latest_ts = None | ||
|
||
# update_index is set, update dedup index if it exists | ||
if update_index and coll.hasDedupIndex: | ||
await self.crawl_manager.update_coll_index(coll_id) | ||
|
||
match_query = { | ||
"oid": coll.oid, | ||
"crawl_id": {"$in": crawl_ids}, | ||
|
@@ -783,13 +858,16 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID): | |
|
||
async def update_crawl_collections(self, crawl_id: str, oid: UUID): | ||
"""Update counts, dates, and modified for all collections in crawl""" | ||
# accessing directly to handle both crawls and uploads | ||
crawl = await self.crawls.find_one({"_id": crawl_id}) | ||
crawl_coll_ids = crawl.get("collectionIds") | ||
crawl_coll_ids = crawl.get("collectionIds") or [] | ||
modified = dt_now() | ||
|
||
for coll_id in crawl_coll_ids: | ||
await self.update_collection_counts_and_tags(coll_id) | ||
await self.update_collection_dates(coll_id, oid) | ||
await self.update_collection_dates( | ||
coll_id, oid, crawl.get("dedupCollId") != coll_id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we standardize |
||
) | ||
await self.collections.find_one_and_update( | ||
{"_id": coll_id}, | ||
{"$set": {"modified": modified}}, | ||
|
@@ -1000,12 +1078,20 @@ async def calculate_thumbnail_storage(self, oid: UUID) -> int: | |
# ============================================================================ | ||
# pylint: disable=too-many-locals | ||
def init_collections_api( | ||
app, mdb, orgs, storage_ops, event_webhook_ops, user_dep | ||
app, | ||
mdb, | ||
orgs: OrgOps, | ||
storage_ops: StorageOps, | ||
crawl_manager: CrawlManager, | ||
event_webhook_ops: EventWebhookOps, | ||
user_dep, | ||
) -> CollectionOps: | ||
"""init collections api""" | ||
# pylint: disable=invalid-name, unused-argument, too-many-arguments | ||
|
||
colls: CollectionOps = CollectionOps(mdb, storage_ops, orgs, event_webhook_ops) | ||
colls: CollectionOps = CollectionOps( | ||
mdb, orgs, storage_ops, crawl_manager, event_webhook_ops | ||
) | ||
|
||
org_crawl_dep = orgs.org_crawl_dep | ||
org_viewer_dep = orgs.org_viewer_dep | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a bit more idiomatic