Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""add hnsw index on knowledge_base_embeddings

Revision ID: e8f2a1c3b5d9
Revises: c7a9e2f4b1d0
Create Date: 2026-04-10 10:00:00.000000

"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text


# revision identifiers, used by Alembic.
revision: str = 'e8f2a1c3b5d9'
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
down_revision: Union[str, None] = 'c7a9e2f4b1d0'
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed


def upgrade() -> None:
# HNSW requires a dimensioned vector expression — the column is stored without
# dimensions so we cast inline.
# embedding_vector → 512 dims (CLIP image / text embeddings)
# embedding_vector_1 → 1024 dims (DINO image embeddings)
#
# CREATE INDEX CONCURRENTLY cannot run inside a transaction block.
# SQLAlchemy 2.x autobegins a transaction on op.get_bind(), and
# execution_options(isolation_level=AUTOCOMMIT) is rejected while a
# Transaction object is active. We get a fresh AUTOCOMMIT connection
# directly from the underlying sync engine instead.
bind = op.get_bind()
sync_engine = getattr(bind.engine, 'sync_engine', bind.engine)

with sync_engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn:
conn.execute(text("SET maintenance_work_mem = '2GB'"))

conn.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_embedding_vector_hnsw_cosine
ON knowledge_base_embeddings
USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
)

conn.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_embedding_vector_1_hnsw_cosine
ON knowledge_base_embeddings
USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
)

conn.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_token_gin
ON knowledge_base_embeddings
USING gin (token)
""")
)


def downgrade() -> None:
bind = op.get_bind()
sync_engine = getattr(bind.engine, 'sync_engine', bind.engine)

with sync_engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn:
conn.execute(
text(
'DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine'
)
)
conn.execute(
text(
'DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine'
)
)
conn.execute(text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin'))
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,37 @@ def get_combined_search_query(
)
query_params.update(filter_params)
sql_query = f"""
WITH vector_results AS (
WITH hnsw_candidates AS (
SELECT
e.id as embedding_id,
e.chunk_text,
e.chunk_index,
id,
document_id,
chunk_text,
chunk_index,
(embedding_vector::vector(512)) <=> :query_embed ::vector(512) AS distance
FROM
{KnowledgeBaseEmbeddings.__tablename__}
ORDER BY
(embedding_vector::vector(512)) <=> :query_embed ::vector(512)
LIMIT :limit * 20
),
vector_results AS (
SELECT
hc.id as embedding_id,
hc.chunk_text,
hc.chunk_index,
d.id as document_id,
d.file_path,
d.knowledge_base_id,
d.metadata_value,
1 - (e.embedding_vector <=> :query_embed ::vector) as vector_score
1 - hc.distance as vector_score
FROM
{KnowledgeBaseEmbeddings.__tablename__} e
hnsw_candidates hc
JOIN
{KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id
{KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id
WHERE
d.knowledge_base_id = :kb_id {'AND (' + metadata_filter_clause_inner + ')' if metadata_filter_clause_inner else ''}
ORDER BY
vector_score DESC
hc.distance ASC
LIMIT :limit
Comment thread
coderabbitai[bot] marked this conversation as resolved.
),
keyword_results AS (
Expand Down Expand Up @@ -169,40 +182,45 @@ def get_image_embedding(
'kb_id': kb_id,
'top_k': top_k,
}
metadata_filter_clause_final = ''
metadata_filter_clause = ''
if filter:
where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter)
if where_clause and filter_params:
metadata_filter_clause_final = self.build_metadata_clause(
metadata_filter_clause = self.build_metadata_clause(
where_clause,
filter_params,
lambda field: f"(d.metadata_value ->> '{field}')",
)
params.update(filter_params)
sql_query = f"""
WITH ranked_embeddings AS (
WITH hnsw_candidates AS (
SELECT
e.id AS embedding_id,
e.chunk_text,
e.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
e.embedding_vector <-> :query_embedding ::vector AS distance
id,
document_id,
chunk_text,
chunk_index,
(embedding_vector::vector(512)) <=> :query_embedding ::vector(512) AS distance
FROM
{KnowledgeBaseEmbeddings.__tablename__} e
JOIN
{KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id
WHERE
d.knowledge_base_id = :kb_id {'AND (' + metadata_filter_clause_final + ')' if metadata_filter_clause_final else ''}
ORDER BY distance ASC
{KnowledgeBaseEmbeddings.__tablename__}
ORDER BY
(embedding_vector::vector(512)) <=> :query_embedding ::vector(512)
LIMIT :top_k * 20
)
SELECT
*
FROM
ranked_embeddings
hc.id AS embedding_id,
hc.chunk_text,
hc.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
hc.distance
FROM hnsw_candidates hc
JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id
WHERE d.knowledge_base_id = :kb_id
{'AND (' + metadata_filter_clause + ')' if metadata_filter_clause else ''}
ORDER BY hc.distance ASC
LIMIT :top_k
Comment on lines +196 to 224

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Scope the CLIP ANN candidate set before LIMIT.

get_image_embedding() has the same truncation problem: the HNSW CTE is global, and the KB/metadata filters only run after LIMIT :top_k * 20. In a multi-KB or selective-filter dataset this can silently return too few results.

Possible fix
         WITH hnsw_candidates AS (
             SELECT
                 id,
                 document_id,
                 chunk_text,
                 chunk_index,
                 (embedding_vector::vector(512)) <=> :query_embedding ::vector(512) AS distance
             FROM
                 {KnowledgeBaseEmbeddings.__tablename__}
+            WHERE
+                document_id IN (
+                    SELECT d.id
+                    FROM {KnowledgeBaseDocuments.__tablename__} d
+                    WHERE d.knowledge_base_id = :kb_id
+                    {'AND (' + metadata_filter_clause + ')' if metadata_filter_clause else ''}
+                )
             ORDER BY
                 (embedding_vector::vector(512)) <=> :query_embedding ::vector(512)
             LIMIT :top_k * 20
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py`
around lines 196 - 224, The HNSW CTE (hnsw_candidates) in generate_query.py
builds a global ANN candidate set and applies the knowledge-base (KB) and
metadata filters only after LIMIT, which can truncate results incorrectly;
update the SQL generation so the WHERE clause (kb_id = :kb_id and the
metadata_filter_clause) is applied inside the hnsw_candidates CTE (i.e., add
"WHERE d.knowledge_base_id = :kb_id" and the metadata filter when selecting from
KnowledgeBaseEmbeddings or by joining KnowledgeBaseDocuments inside the CTE)
before the ORDER BY / LIMIT to ensure the top_k*20 candidates are scoped
correctly; also apply the same change to the analogous get_image_embedding()
query builder to fix the identical truncation bug.

"""

Expand Down Expand Up @@ -232,49 +250,52 @@ def get_image_embedding_dino(
params = {
'query_embedding': query_embeddings,
'kb_id': kb_id,
'top_k': effective_limit,
'reference_ids': processed_reference_ids,
'offset': effective_offset,
'limit': effective_limit,
}

metadata_filter_clause_final = ''
metadata_filter_clause = ''
if filter:
where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter)
if where_clause and filter_params:
metadata_filter_clause_final = self.build_metadata_clause(
metadata_filter_clause = self.build_metadata_clause(
where_clause,
filter_params,
lambda field: f"(d.metadata_value ->> '{field}')",
)
params.update(filter_params)
# Use ANY operator for PostgreSQL array matching
reference_filter = (
'AND e.document_id = ANY(:reference_ids)' if processed_reference_ids else ''
)

sql_query = f"""
WITH ranked_embeddings AS (
WITH hnsw_candidates AS (
SELECT
e.id AS embedding_id,
e.chunk_text,
e.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
(1 - (e.embedding_vector_1 <=> :query_embedding ::vector)) AS similarity
FROM {KnowledgeBaseEmbeddings.__tablename__} e
JOIN {KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id
WHERE
d.knowledge_base_id = :kb_id {reference_filter} {'AND (' + metadata_filter_clause_final + ')' if metadata_filter_clause_final else ''}
ORDER BY similarity DESC
id,
document_id,
chunk_text,
chunk_index,
(embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) AS distance
FROM
{KnowledgeBaseEmbeddings.__tablename__}
ORDER BY
(embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024)
LIMIT :limit * 20
)
SELECT
*
FROM
ranked_embeddings
hc.id AS embedding_id,
hc.chunk_text,
hc.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
1 - hc.distance AS similarity
FROM hnsw_candidates hc
JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id
WHERE d.knowledge_base_id = :kb_id
{('AND d.id = ANY(:reference_ids)' if processed_reference_ids else '')}
{'AND (' + metadata_filter_clause + ')' if metadata_filter_clause else ''}
ORDER BY similarity DESC
LIMIT :limit OFFSET :offset
Comment on lines +270 to 299

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Filter reference_ids inside the DINO candidate CTE.

image_rag_retrieve.py uses this query to rerank the CLIP-selected reference_id_list, but d.id = ANY(:reference_ids) is applied only after LIMIT :limit * 20. That means the global DINO top-N can exclude every referenced document and return empty/incomplete results even when CLIP found good matches.

Possible fix
         WITH hnsw_candidates AS (
             SELECT
                 id,
                 document_id,
                 chunk_text,
                 chunk_index,
                 (embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) AS distance
             FROM
                 {KnowledgeBaseEmbeddings.__tablename__}
+            WHERE
+                document_id IN (
+                    SELECT d.id
+                    FROM {KnowledgeBaseDocuments.__tablename__} d
+                    WHERE d.knowledge_base_id = :kb_id
+                    {('AND d.id = ANY(:reference_ids)' if processed_reference_ids else '')}
+                    {'AND (' + metadata_filter_clause + ')' if metadata_filter_clause else ''}
+                )
             ORDER BY
                 (embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024)
             LIMIT :limit * 20
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py`
around lines 270 - 299, The hnsw_candidates CTE currently applies the d.id =
ANY(:reference_ids) filter only after LIMIT, causing referenced documents to be
excluded before reranking; modify the query in generate_query.py so the
reference_ids (when processed_reference_ids is truthy) and the
kb_id/metadata_filter_clause are applied inside the hnsw_candidates CTE (either
by joining KnowledgeBaseDocuments within the CTE or by filtering
KnowledgeBaseEmbeddings.document_id against :reference_ids and :kb_id) so the
LIMIT :limit * 20 is computed after narrowing candidates; update the CTE
selection logic that uses KnowledgeBaseEmbeddings, hnsw_candidates,
:reference_ids, :kb_id, and metadata_filter_clause accordingly.

"""

Expand Down Expand Up @@ -346,4 +367,4 @@ def get_update_tokens_query() -> str:
Returns:
SQL query string
"""
return "UPDATE knowledge_base_embeddings SET token = to_tsvector('english', chunk_text)"
return "UPDATE knowledge_base_embeddings SET token = to_tsvector('english', chunk_text) WHERE token IS NULL"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def __init__(
KnowledgeBaseEmbeddings
],
):
self.reranked_image = []
self.query_generator = QueryGenerator()
self.knowledge_base_embeddings_repository = knowledge_base_embeddings_repository

Expand Down Expand Up @@ -42,22 +41,24 @@ async def retrieve_images(
response = await client.post(internal_api_url, json=data)
embedding = response.json().get('data', {}).get('response', [])

if embedding:
self.reranked_image = await self.image_retrieve(
embedding[0]['clip'], kb_id, threshold, top_k, query_filter
clip_embedding = next((e['clip'] for e in embedding if 'clip' in e), None)
dino_embedding = next((e['dino'] for e in embedding if 'dino' in e), None)

if clip_embedding and dino_embedding:
clip_results = await self.image_retrieve(
clip_embedding, kb_id, threshold, top_k, query_filter
)
reference_id_list = [
str(data['document_id']) for data in self.reranked_image
]
self.reranked_image = await self.image_retrieve_dino(
embedding[1]['dino'],
if not clip_results:
return []
reference_id_list = [str(data['document_id']) for data in clip_results]
return await self.image_retrieve_dino(
dino_embedding,
kb_id,
reference_id_list,
query_filter,
offset,
limit,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return self.reranked_image
else:
return []

Expand Down
Loading