Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Fix per-bank vector indexes to match configured extension

Revision ID: a4b5c6d7e8f9
Revises: c2d3e4f5g6h7, c5d6e7f8a9b0
Revises: d6e7f8a9b0c1
Create Date: 2026-04-01

Migration d5e6f7a8b9c0 hardcoded HNSW when creating per-bank partial vector
Expand All @@ -21,10 +21,7 @@
from sqlalchemy import text

revision: str = "a4b5c6d7e8f9"
# Updated: the merge migration d6e7f8a9b0c1 was renamed to d6e7f8a9b0c2
# to avoid colliding with the case_insensitive_entities_trgm_index migration
# that shares the same revision ID.
down_revision: str | Sequence[str] | None = "d6e7f8a9b0c2"
down_revision: str | Sequence[str] | None = "d6e7f8a9b0c1"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

Expand Down
110 changes: 73 additions & 37 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3625,7 +3625,10 @@ async def delete_document(
}

if invalidated_obs > 0:
await self.submit_async_consolidation(bank_id=bank_id, request_context=request_context)
try:
await self.submit_async_consolidation(bank_id=bank_id, request_context=request_context)
except Exception as e:
logger.warning(f"Failed to submit consolidation after document deletion for bank {bank_id}: {e}")

return result

Expand Down Expand Up @@ -3759,7 +3762,10 @@ async def update_document(
)

if invalidated_obs > 0:
await self.submit_async_consolidation(bank_id=bank_id, request_context=request_context)
try:
await self.submit_async_consolidation(bank_id=bank_id, request_context=request_context)
except Exception as e:
logger.warning(f"Failed to submit consolidation after document update for bank {bank_id}: {e}")

return True

Expand Down Expand Up @@ -3821,7 +3827,14 @@ async def delete_memory_unit(
}

if bank_id_for_consolidation:
await self.submit_async_consolidation(bank_id=bank_id_for_consolidation, request_context=request_context)
try:
await self.submit_async_consolidation(
bank_id=bank_id_for_consolidation, request_context=request_context
)
except Exception as e:
logger.warning(
f"Failed to submit consolidation after memory deletion for bank {bank_id_for_consolidation}: {e}"
)

return result

Expand All @@ -3830,6 +3843,7 @@ async def delete_bank(
bank_id: str,
fact_type: str | None = None,
*,
delete_bank_profile: bool = True,
request_context: "RequestContext",
) -> dict[str, int]:
"""
Expand Down Expand Up @@ -3916,20 +3930,21 @@ async def delete_bank(
# Delete entities (cascades to unit_entities, entity_cooccurrences, memory_links with entity_id)
await conn.execute(f"DELETE FROM {fq_table('entities')} WHERE bank_id = $1", bank_id)

# Delete the bank profile and retrieve internal_id for HNSW index cleanup
internal_id = await conn.fetchval(
f"DELETE FROM {fq_table('banks')} WHERE bank_id = $1 RETURNING internal_id", bank_id
)
if internal_id:
bank_internal_id = str(internal_id)

result = {
"memory_units_deleted": units_count,
"entities_deleted": entities_count,
"documents_deleted": documents_count,
"bank_deleted": True,
}

if delete_bank_profile:
# Delete the bank profile and retrieve internal_id for HNSW index cleanup
internal_id = await conn.fetchval(
f"DELETE FROM {fq_table('banks')} WHERE bank_id = $1 RETURNING internal_id", bank_id
)
if internal_id:
bank_internal_id = str(internal_id)
result["bank_deleted"] = True

except Exception as e:
raise Exception(f"Failed to delete agent data: {str(e)}")

Expand All @@ -3940,7 +3955,10 @@ async def delete_bank(
await bank_utils.drop_bank_vector_indexes(conn, bank_internal_id)

if invalidated_obs > 0:
await self.submit_async_consolidation(bank_id=bank_id, request_context=request_context)
try:
await self.submit_async_consolidation(bank_id=bank_id, request_context=request_context)
except Exception as e:
logger.warning(f"Failed to submit consolidation after bank deletion for bank {bank_id}: {e}")

return result

Expand Down Expand Up @@ -4331,7 +4349,10 @@ async def get_graph_data(
]

# Get entity information — only for visible units
if unit_ids:
# Fetch entities for visible units AND their source memories
# (so observations can inherit entities from source memories)
entity_lookup_ids = unit_ids + source_memory_ids
if entity_lookup_ids:
unit_entities = await conn.fetch(
f"""
SELECT ue.unit_id, e.canonical_name
Expand All @@ -4340,7 +4361,7 @@ async def get_graph_data(
WHERE ue.unit_id = ANY($1::uuid[])
ORDER BY ue.unit_id
""",
unit_ids,
entity_lookup_ids,
)
else:
unit_entities = []
Expand Down Expand Up @@ -6340,6 +6361,7 @@ async def list_mental_models(
*,
tags: list[str] | None = None,
tags_match: str = "any",
detail: str = "full",
limit: int = 100,
offset: int = 0,
request_context: "RequestContext",
Expand All @@ -6350,6 +6372,7 @@ async def list_mental_models(
bank_id: Bank identifier
tags: Optional tags to filter by
tags_match: How to match tags - 'any', 'all', or 'exact'
detail: Detail level - 'metadata', 'content', or 'full'
limit: Maximum number of results
offset: Offset for pagination
request_context: Request context for authentication
Expand Down Expand Up @@ -6391,20 +6414,22 @@ async def list_mental_models(
*params,
)

return [self._row_to_mental_model(row) for row in rows]
return [self._row_to_mental_model(row, detail=detail) for row in rows]

async def get_mental_model(
self,
bank_id: str,
mental_model_id: str,
*,
detail: str = "full",
request_context: "RequestContext",
) -> dict[str, Any] | None:
"""Get a single pinned mental model by ID.

Args:
bank_id: Bank identifier
mental_model_id: Pinned mental model UUID
detail: Detail level - 'metadata', 'content', or 'full'
request_context: Request context for authentication

Returns:
Expand Down Expand Up @@ -6438,7 +6463,7 @@ async def get_mental_model(
mental_model_id,
)

result = self._row_to_mental_model(row) if row else None
result = self._row_to_mental_model(row, detail=detail) if row else None

# Post-operation hook (usage recording)
if result and self._operation_validator:
Expand Down Expand Up @@ -6836,34 +6861,45 @@ async def delete_mental_model(

return result == "DELETE 1"

def _row_to_mental_model(self, row) -> dict[str, Any]:
"""Convert a database row to a mental model dict."""
reflect_response = row.get("reflect_response")
# Parse JSON string to dict if needed (asyncpg may return JSONB as string)
if isinstance(reflect_response, str):
try:
reflect_response = json.loads(reflect_response)
except json.JSONDecodeError:
reflect_response = None
trigger = row.get("trigger")
if isinstance(trigger, str):
try:
trigger = json.loads(trigger)
except json.JSONDecodeError:
trigger = None
return {
def _row_to_mental_model(self, row, *, detail: str = "full") -> dict[str, Any]:
"""Convert a database row to a mental model dict.

Args:
row: Database row
detail: Detail level - 'metadata', 'content', or 'full'
"""
result: dict[str, Any] = {
"id": str(row["id"]),
"bank_id": row["bank_id"],
"name": row["name"],
"source_query": row["source_query"],
"content": row["content"],
"tags": row["tags"] or [],
"max_tokens": row.get("max_tokens"),
"trigger": trigger,
"last_refreshed_at": row["last_refreshed_at"].isoformat() if row["last_refreshed_at"] else None,
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"reflect_response": reflect_response,
}
if detail == "metadata":
return result

trigger = row.get("trigger")
if isinstance(trigger, str):
try:
trigger = json.loads(trigger)
except json.JSONDecodeError:
trigger = None
result["source_query"] = row["source_query"]
result["content"] = row["content"]
result["max_tokens"] = row.get("max_tokens")
result["trigger"] = trigger

if detail == "full":
reflect_response = row.get("reflect_response")
if isinstance(reflect_response, str):
try:
reflect_response = json.loads(reflect_response)
except json.JSONDecodeError:
reflect_response = None
result["reflect_response"] = reflect_response

return result

# =========================================================================
# Directives - Hard rules injected into prompts
Expand Down
12 changes: 4 additions & 8 deletions hindsight-api-slim/hindsight_api/engine/task_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,16 @@ async def _execute_task(self, task_dict: dict[str, Any]):

Args:
task_dict: Task dictionary to execute

Raises:
Exception: Re-raised from executor on failure.
"""
if self._executor is None:
task_type = task_dict.get("type", "unknown")
logger.warning(f"No executor registered, skipping task {task_type}")
return

try:
await self._executor(task_dict)
except Exception as e:
task_type = task_dict.get("type", "unknown")
logger.error(f"Error executing task {task_type}: {e}")
import traceback

traceback.print_exc()
await self._executor(task_dict)


class SyncTaskBackend(TaskBackend):
Expand Down
1 change: 1 addition & 0 deletions hindsight-api-slim/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ dev = [
"pytest-asyncio>=1.3.0",
"pytest-timeout>=2.4.0",
"pytest-xdist>=3.8.0",
"pytest-rerunfailures>=15.0",
"python-dotenv>=1.2.1",
"filelock>=3.20.1", # TOCTOU race condition fix
"ruff>=0.8.0",
Expand Down
1 change: 1 addition & 0 deletions hindsight-api-slim/tests/test_horse_observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def _parse_history(hist: Any) -> list[str]:


@pytest.mark.asyncio
@pytest.mark.flaky(reruns=2, reruns_delay=5)
async def test_horse_farm_observation_history(memory: MemoryEngine, request_context: Any) -> None:
"""Retain a sequence of horse facts and inspect how observations evolve."""
bank_id = f"test-horses-{uuid.uuid4().hex[:8]}"
Expand Down
27 changes: 13 additions & 14 deletions hindsight-api-slim/tests/test_retain.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,12 @@ async def test_event_date_storage(memory, request_context):

assert len(unit_ids) > 0, "Should have created at least one memory unit"

# Recall the fact
# Recall the fact (no fact_type filter — LLM may classify as world or experience)
result = await memory.recall_async(
bank_id=bank_id,
query="When did Alice complete the product launch?",
budget=Budget.LOW,
max_tokens=500,
fact_type=["world"],
request_context=request_context,
)

Expand Down Expand Up @@ -547,13 +546,12 @@ async def test_mentioned_at_from_context_string(memory, request_context):

assert len(unit_ids) > 0, "Should create memory unit"

# Recall and verify mentioned_at is set
# Recall and verify mentioned_at is set (no fact_type filter — LLM may classify as world or experience)
result = await memory.recall_async(
bank_id=bank_id,
query="What does Alice like?",
budget=Budget.LOW,
max_tokens=500,
fact_type=["world"],
request_context=request_context,
)

Expand Down Expand Up @@ -738,13 +736,12 @@ async def test_context_preservation(memory, request_context):

assert len(unit_ids) > 0, "Should create at least one memory unit"

# Recall and verify context is returned
# Recall and verify context is returned (no fact_type filter — LLM may classify as world or experience)
result = await memory.recall_async(
bank_id=bank_id,
query="What did the team decide?",
budget=Budget.LOW,
max_tokens=500,
fact_type=["world"],
request_context=request_context,
)

Expand Down Expand Up @@ -1106,13 +1103,12 @@ async def test_document_upsert_behavior(memory, request_context):

assert len(v2_units) > 0, "Should create units for v2"

# Recall should return the updated information
# Recall should return the updated information (no fact_type filter — LLM may classify as world or experience)
result = await memory.recall_async(
bank_id=bank_id,
query="What is the project status?",
budget=Budget.MID,
max_tokens=1000,
fact_type=["world"],
request_context=request_context,
)

Expand Down Expand Up @@ -2060,20 +2056,23 @@ async def test_semantic_links_phase1_ann_cross_batch(memory, request_context):
bank_id = f"test_semantic_phase1_{datetime.now(timezone.utc).timestamp()}"

try:
# First batch: store some facts about Python
# First batch: store some world facts about a topic
# Use clearly "world" content (general knowledge, not personal experience)
# to ensure consistent fact_type classification across batches,
# since ANN search filters by fact_type.
await memory.retain_async(
bank_id=bank_id,
content="Alice is an expert Python developer who builds web applications using FastAPI.",
context="team skills",
content="Python is a high-level programming language widely used for web development with frameworks like FastAPI.",
context="programming languages",
request_context=request_context,
)

# Second batch: store similar facts — Phase 1 ANN should find the first batch's
# Second batch: store similar world facts — Phase 1 ANN should find the first batch's
# facts via HNSW index and create cross-batch semantic links
unit_ids_2 = await memory.retain_async(
bank_id=bank_id,
content="Bob specializes in Python programming and creates REST APIs with FastAPI.",
context="team skills",
content="FastAPI is a modern Python web framework known for its high performance and automatic API documentation.",
context="programming languages",
request_context=request_context,
)

Expand Down
Loading
Loading