Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions daras_ai_v2/vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,14 @@ def get_top_k_references(

def vespa_search_results_to_refs(
search_result: dict,
) -> typing.Iterable[SearchReference]:
) -> typing.Iterable[tuple[str, SearchReference]]:
for hit in search_result["root"].get("children", []):
try:
ref = EmbeddingsReference.objects.get(vespa_doc_id=hit["fields"]["id"])
ref_key = ref.url
except EmbeddingsReference.DoesNotExist:
continue
if "text/html" in ref.embedded_file.metadata.mime_type:
# logger.debug(f"Generating fragments {ref['url']} as it is a HTML file")
ref.url = generate_text_fragment_url(url=ref.url, text=ref.snippet)
yield (
ref_key,
Expand Down
66 changes: 66 additions & 0 deletions scripts/cleanup_vespa_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import typing
from datetime import timedelta

from django.utils import timezone

from daras_ai_v2 import settings
from daras_ai_v2.vector_search import get_vespa_app
from embeddings.models import EmbeddedFile

if typing.TYPE_CHECKING:
from vespa.io import VespaResponse


STALENESS_THRESHOLD_DAYS = 90
BATCH_SIZE = 1_000


def cleanup_stale_cache():
vespa = get_vespa_app()

while True:
stale_qs = (
EmbeddedFile.objects.prefetch_related("embeddings_references")
.filter(
updated_at__lt=timezone.now() - timedelta(days=STALENESS_THRESHOLD_DAYS)
)
.order_by("updated_at")[:BATCH_SIZE]
)
stale_files = list(stale_qs)
if not stale_files:
break

docs_to_delete = (
{"id": ref.vespa_doc_id}
for ef in stale_files
for ref in ef.embeddings_references.all()
)
total_deleted = 0

def vespa_callback(response: "VespaResponse", id: str):
nonlocal total_deleted
if response.is_successful():
total_deleted += 1
else:
print(f"""\
Failed to delete document {id} from Vespa.
Status code: {response.get_status_code()}
JSON: {response.get_json()}
""")

vespa.feed_iterable(
docs_to_delete,
schema=settings.VESPA_SCHEMA,
operation_type="delete",
callback=vespa_callback,
)
print(f"Deleted {total_deleted} documents from Vespa.")

deleted_per_model = EmbeddedFile.objects.filter(
id__in=[ef.id for ef in stale_files]
).delete()
print(f"Deleted EmbeddedFiles & related objects: {deleted_per_model}")


def run():
cleanup_stale_cache()