Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 153 additions & 36 deletions hindsight-api-slim/hindsight_api/engine/retain/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ async def _insert_facts_and_links(
if unit_ids:
# Process entities
step_start = time.time()
user_entities_per_content = {
idx: content.entities for idx, content in enumerate(contents) if content.entities
}
user_entities_per_content = {idx: content.entities for idx, content in enumerate(contents) if content.entities}
entity_links = await entity_processing.process_entities_batch(
entity_resolver,
conn,
Expand Down Expand Up @@ -217,9 +215,7 @@ async def _extract_and_embed(
embeddings = await embedding_processing.generate_embeddings_batch(embeddings_model, augmented_texts)
log_buffer.append(f" Generate embeddings: {len(embeddings)} embeddings in {time.time() - step_start:.3f}s")

processed_facts = [
ProcessedFact.from_extracted_fact(ef, emb) for ef, emb in zip(extracted_facts, embeddings)
]
processed_facts = [ProcessedFact.from_extracted_fact(ef, emb) for ef, emb in zip(extracted_facts, embeddings)]

return extracted_facts, processed_facts, chunks, usage

Expand Down Expand Up @@ -268,23 +264,56 @@ async def retain_batch(
# --- Delta retain: check if we can skip unchanged chunks ---
if is_first_batch:
delta_result = await _try_delta_retain(
pool, embeddings_model, llm_config, entity_resolver, format_date_fn,
bank_id, contents_dicts, contents, config, document_id, fact_type_override,
document_tags, agent_name, log_buffer, start_time, operation_id, schema, outbox_callback,
pool,
embeddings_model,
llm_config,
entity_resolver,
format_date_fn,
bank_id,
contents_dicts,
contents,
config,
document_id,
fact_type_override,
document_tags,
agent_name,
log_buffer,
start_time,
operation_id,
schema,
outbox_callback,
)
if delta_result is not None:
return delta_result

# --- Full retain path ---
extracted_facts, processed_facts, chunks, usage = await _extract_and_embed(
contents, llm_config, agent_name, config, embeddings_model, format_date_fn,
fact_type_override, log_buffer, pool, operation_id, schema,
contents,
llm_config,
agent_name,
config,
embeddings_model,
format_date_fn,
fact_type_override,
log_buffer,
pool,
operation_id,
schema,
)

if not extracted_facts:
await _handle_zero_facts_documents(
pool, bank_id, contents_dicts, contents, config, document_id,
is_first_batch, document_tags, chunks, log_buffer, start_time,
pool,
bank_id,
contents_dicts,
contents,
config,
document_id,
is_first_batch,
document_tags,
chunks,
log_buffer,
start_time,
)
return [[] for _ in contents], usage

Expand Down Expand Up @@ -336,8 +365,13 @@ async def _run_db_work() -> None:
contents_dicts, document_tags, doc_contents=doc_contents
)
await fact_storage.handle_document_tracking(
conn, bank_id, actual_doc_id, combined_content,
is_first_batch, retain_params, merged_tags,
conn,
bank_id,
actual_doc_id,
combined_content,
is_first_batch,
retain_params,
merged_tags,
)
document_ids_added.append(actual_doc_id)

Expand Down Expand Up @@ -382,8 +416,15 @@ async def _run_db_work() -> None:

# Insert facts and create all links (shared pipeline)
result_unit_ids = await _insert_facts_and_links(
conn, entity_resolver, bank_id, contents, extracted_facts,
processed_facts, config, log_buffer, outbox_callback,
conn,
entity_resolver,
bank_id,
contents,
extracted_facts,
processed_facts,
config,
log_buffer,
outbox_callback,
)

await entity_resolver.flush_pending_stats()
Expand All @@ -406,9 +447,24 @@ async def _run_db_work() -> None:


async def _try_delta_retain(
pool, embeddings_model, llm_config, entity_resolver, format_date_fn,
bank_id, contents_dicts, contents, config, document_id, fact_type_override,
document_tags, agent_name, log_buffer, start_time, operation_id, schema, outbox_callback,
pool,
embeddings_model,
llm_config,
entity_resolver,
format_date_fn,
bank_id,
contents_dicts,
contents,
config,
document_id,
fact_type_override,
document_tags,
agent_name,
log_buffer,
start_time,
operation_id,
schema,
outbox_callback,
):
"""
Attempt delta retain for a document upsert. Returns result tuple if delta
Expand Down Expand Up @@ -472,23 +528,46 @@ async def _try_delta_retain(
# Nothing changed — just update document metadata/tags
log_buffer.append("[delta] No chunk changes detected — updating document metadata only")
return await _delta_metadata_only(
pool, bank_id, contents_dicts, contents, effective_doc_id,
document_tags, log_buffer, start_time, outbox_callback,
pool,
bank_id,
contents_dicts,
contents,
effective_doc_id,
document_tags,
log_buffer,
start_time,
outbox_callback,
)

# Build content items for only the changed/new chunks
delta_contents, delta_chunk_map = _build_delta_contents(contents, new_chunks_with_contents, chunks_to_process)

if not delta_contents:
return await _delta_metadata_only(
pool, bank_id, contents_dicts, contents, effective_doc_id,
document_tags, log_buffer, start_time, outbox_callback,
pool,
bank_id,
contents_dicts,
contents,
effective_doc_id,
document_tags,
log_buffer,
start_time,
outbox_callback,
)

# Extract facts and generate embeddings (shared pipeline)
extracted_facts, processed_facts, new_chunk_metadata, usage = await _extract_and_embed(
delta_contents, llm_config, agent_name, config, embeddings_model, format_date_fn,
fact_type_override, log_buffer, pool, operation_id, schema,
delta_contents,
llm_config,
agent_name,
config,
embeddings_model,
format_date_fn,
fact_type_override,
log_buffer,
pool,
operation_id,
schema,
)

# Database transaction
Expand All @@ -510,7 +589,12 @@ async def _run_delta_db_work() -> None:
combined_content = "\n".join([c.get("content", "") for c in contents_dicts])
retain_params, merged_tags = _build_retain_params(contents_dicts, document_tags)
await fact_storage.upsert_document_metadata(
conn, bank_id, effective_doc_id, combined_content, retain_params, merged_tags,
conn,
bank_id,
effective_doc_id,
combined_content,
retain_params,
merged_tags,
)
log_buffer.append(f" Document metadata update in {time.time() - step_start:.3f}s")

Expand Down Expand Up @@ -570,8 +654,15 @@ async def _run_delta_db_work() -> None:

# Insert facts and create all links (shared pipeline)
result_unit_ids = await _insert_facts_and_links(
conn, entity_resolver, bank_id, contents, extracted_facts,
processed_facts, config, log_buffer, outbox_callback,
conn,
entity_resolver,
bank_id,
contents,
extracted_facts,
processed_facts,
config,
log_buffer,
outbox_callback,
)

await entity_resolver.flush_pending_stats()
Expand All @@ -591,16 +682,28 @@ async def _run_delta_db_work() -> None:


async def _delta_metadata_only(
pool, bank_id, contents_dicts, contents, document_id, document_tags,
log_buffer, start_time, outbox_callback,
pool,
bank_id,
contents_dicts,
contents,
document_id,
document_tags,
log_buffer,
start_time,
outbox_callback,
):
"""Handle the case where no chunks changed — just update document metadata and tags."""
async with acquire_with_retry(pool) as conn:
async with conn.transaction():
combined_content = "\n".join([c.get("content", "") for c in contents_dicts])
retain_params, merged_tags = _build_retain_params(contents_dicts, document_tags)
await fact_storage.upsert_document_metadata(
conn, bank_id, document_id, combined_content, retain_params, merged_tags,
conn,
bank_id,
document_id,
combined_content,
retain_params,
merged_tags,
)
await fact_storage.update_memory_units_tags(conn, bank_id, document_id, merged_tags)
if outbox_callback:
Expand Down Expand Up @@ -645,8 +748,17 @@ def _build_contents(contents_dicts: list[RetainContentDict], document_tags: list


async def _handle_zero_facts_documents(
pool, bank_id, contents_dicts, contents, config, document_id,
is_first_batch, document_tags, chunks, log_buffer, start_time,
pool,
bank_id,
contents_dicts,
contents,
config,
document_id,
is_first_batch,
document_tags,
chunks,
log_buffer,
start_time,
):
"""Handle document tracking when zero facts were extracted."""
docs_tracked = 0
Expand Down Expand Up @@ -677,8 +789,13 @@ async def _handle_zero_facts_documents(
contents_dicts, document_tags, doc_contents=doc_contents
)
await fact_storage.handle_document_tracking(
conn, bank_id, actual_doc_id, combined_content,
is_first_batch, retain_params, merged_tags,
conn,
bank_id,
actual_doc_id,
combined_content,
is_first_batch,
retain_params,
merged_tags,
)
docs_tracked += 1

Expand Down
32 changes: 25 additions & 7 deletions hindsight-clients/python/hindsight_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
"""
Hindsight Client - Clean, pythonic wrapper for the Hindsight API.

This package provides a high-level interface for common Hindsight operations.
For advanced use cases, use the auto-generated API client directly.
This package provides a high-level ``Hindsight`` class with simplified methods
for the most common operations (retain, recall, reflect, banks, mental models,
directives).

For operations not available as convenience methods — such as documents,
entities, async operations, webhooks, and monitoring — use the low-level API
clients exposed as properties on the ``Hindsight`` instance (e.g.
``client.documents``, ``client.entities``, ``client.operations``).
All low-level methods are async.

Quick start::

Example:
```python
from hindsight_client import Hindsight

client = Hindsight(base_url="http://localhost:8888")

# Store a memory
result = client.retain(bank_id="alice", content="Alice loves AI")
print(result.success)
client.retain(bank_id="alice", content="Alice loves AI")

# Search memories
response = client.recall(bank_id="alice", query="What does Alice like?")
Expand All @@ -22,7 +28,19 @@
# Generate contextual answer
answer = client.reflect(bank_id="alice", query="What are my interests?")
print(answer.text)
```

Low-level API access::

import asyncio

# List documents
docs = asyncio.run(client.documents.list_documents("alice"))

# Check operation status
status = asyncio.run(client.operations.get_operation_status("alice", "op-id"))

# List entities
entities = asyncio.run(client.entities.list_entities("alice"))
"""

from hindsight_client_api.models.bank_profile_response import BankProfileResponse
Expand Down
Loading
Loading