Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 57 additions & 46 deletions openviking/storage/stats_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from openviking.retrieve.memory_lifecycle import hotness_score
from openviking.server.identity import RequestContext
from openviking.storage.expr import Eq
from openviking.storage.expr import And, Eq
from openviking_cli.utils import get_logger

logger = get_logger(__name__)
Expand All @@ -33,6 +33,21 @@
HOT_THRESHOLD = 0.6


def _category_from_uri(uri: str) -> Optional[str]:
"""Determine memory category from its Viking URI.

Handles both directory-based memories (``/preferences/mem_*.md``)
and the single-file ``profile.md`` at the memories root.
"""
for cat in MEMORY_CATEGORIES:
if cat == "profile":
if uri.endswith("/profile.md"):
return cat
elif f"/{cat}/" in uri:
return cat
return None


class StatsAggregator:
"""Aggregates memory health statistics from VikingDB.

Expand Down Expand Up @@ -63,57 +78,53 @@ async def get_memory_stats(
# Build category list to query
categories = [category] if category else MEMORY_CATEGORIES

by_category: Dict[str, int] = {}
by_category: Dict[str, int] = {cat: 0 for cat in categories}
hotness_dist = {"cold": 0, "warm": 0, "hot": 0}
staleness = {
"not_accessed_7d": 0,
"not_accessed_30d": 0,
"oldest_memory_age_days": 0,
}

# Fetch all memories once and group by category in Python
all_records = await self._query_all_memories(ctx)
grouped: Dict[str, List[Dict[str, Any]]] = {cat: [] for cat in categories}
for record in all_records:
uri = record.get("uri", "")
for cat in categories:
if f"/{cat}/" in uri:
grouped[cat].append(record)
break

for cat in categories:
records = grouped[cat]
by_category[cat] = len(records)

for record in records:
active_count = record.get("active_count", 0)
updated_at_raw = record.get("updated_at")
updated_at = _parse_datetime(updated_at_raw)
created_at_raw = record.get("created_at")
created_at = _parse_datetime(created_at_raw)

# Hotness distribution
score = hotness_score(active_count, updated_at, now=now)
if score < COLD_THRESHOLD:
hotness_dist["cold"] += 1
elif score > HOT_THRESHOLD:
hotness_dist["hot"] += 1
else:
hotness_dist["warm"] += 1

# Staleness: use updated_at for access tracking
if updated_at:
age_days = (now - updated_at).total_seconds() / 86400.0
if age_days > 7:
staleness["not_accessed_7d"] += 1
if age_days > 30:
staleness["not_accessed_30d"] += 1

# Track oldest memory by created_at
if created_at:
age = (now - created_at).total_seconds() / 86400.0
if age > staleness["oldest_memory_age_days"]:
staleness["oldest_memory_age_days"] = round(age, 1)
record_cat = _category_from_uri(uri)

# Skip records not in the requested categories
if not (record_cat and record_cat in by_category):
continue

by_category[record_cat] += 1

active_count = record.get("active_count", 0)
updated_at_raw = record.get("updated_at")
updated_at = _parse_datetime(updated_at_raw)
created_at_raw = record.get("created_at")
created_at = _parse_datetime(created_at_raw)

# Hotness distribution
score = hotness_score(active_count, updated_at, now=now)
if score < COLD_THRESHOLD:
hotness_dist["cold"] += 1
elif score > HOT_THRESHOLD:
hotness_dist["hot"] += 1
else:
hotness_dist["warm"] += 1

# Staleness: use updated_at for access tracking
if updated_at:
age_days = (now - updated_at).total_seconds() / 86400.0
if age_days > 7:
staleness["not_accessed_7d"] += 1
if age_days > 30:
staleness["not_accessed_30d"] += 1

# Track oldest memory by created_at
if created_at:
age = (now - created_at).total_seconds() / 86400.0
if age > staleness["oldest_memory_age_days"]:
staleness["oldest_memory_age_days"] = round(age, 1)

total_memories = sum(by_category.values())

Expand Down Expand Up @@ -156,14 +167,14 @@ async def _query_all_memories(
self,
ctx: RequestContext,
) -> List[Dict[str, Any]]:
"""Query all memory records in a single DB round-trip.
"""Query all memory records from the vector index.

Uses the context_type="memory" filter. Callers group by category
in Python to avoid N+1 queries.
Filters to ``level=2`` so only actual memory records are returned,
excluding directory-level abstract/overview metadata.
"""
try:
return await self._vikingdb.query(
filter=Eq("context_type", "memory"),
filter=And([Eq("context_type", "memory"), Eq("level", 2)]),
limit=10000,
output_fields=[
"uri",
Expand Down
92 changes: 82 additions & 10 deletions tests/unit/stats/test_stats_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ def mock_vikingdb():
@pytest.fixture
def mock_ctx():
"""Create a mock request context."""
return MagicMock()
ctx = MagicMock()
ctx.user.user_id = "default"
return ctx


@pytest.fixture
Expand All @@ -33,10 +35,14 @@ def _make_memory_record(
updated_at: datetime = None,
created_at: datetime = None,
):
"""Helper to build a mock memory record."""
"""Helper to build a mock memory record with a realistic URI."""
now = datetime.now(timezone.utc)
if category == "profile":
uri = "viking://user/default/memories/profile.md"
else:
uri = f"viking://user/default/memories/{category}/test-item"
return {
"uri": f"viking://memories/{category}/test-item",
"uri": uri,
"context_type": "memory",
"active_count": active_count,
"updated_at": (updated_at or now).isoformat(),
Expand All @@ -58,13 +64,14 @@ async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx):

@pytest.mark.asyncio
async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx):
"""Records should be bucketed into the correct category."""
"""Records should be bucketed into the correct category from their URI."""
now = datetime.now(timezone.utc)
records = [
_make_memory_record("cases", active_count=5, updated_at=now),
_make_memory_record("cases", active_count=3, updated_at=now),
_make_memory_record("tools", active_count=1, updated_at=now),
]

mock_vikingdb.query = AsyncMock(return_value=records)

result = await aggregator.get_memory_stats(mock_ctx)
Expand All @@ -75,28 +82,63 @@ async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx):

@pytest.mark.asyncio
async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx):
"""Passing a category filter should only query that category."""
"""Passing a category filter should only count that category."""
now = datetime.now(timezone.utc)
records = [
_make_memory_record("patterns", active_count=2, updated_at=now),
]

mock_vikingdb.query = AsyncMock(return_value=records)

result = await aggregator.get_memory_stats(mock_ctx, category="patterns")

assert "patterns" in result["by_category"]
assert len(result["by_category"]) == 1
assert result["total_memories"] == 1

@pytest.mark.asyncio
async def test_profile_counted(self, aggregator, mock_vikingdb, mock_ctx):
"""profile.md should be counted as 1 when present in query results."""
records = [
_make_memory_record("profile", active_count=0),
]
mock_vikingdb.query = AsyncMock(return_value=records)

result = await aggregator.get_memory_stats(mock_ctx)

assert result["by_category"]["profile"] == 1
assert result["total_memories"] == 1

@pytest.mark.asyncio
async def test_unrecognized_uri_ignored(self, aggregator, mock_vikingdb, mock_ctx):
"""Records with unrecognized URIs should not be counted in any category."""
now = datetime.now(timezone.utc)
records = [
{
"uri": "viking://some/random/path",
"context_type": "memory",
"active_count": 1,
"updated_at": now.isoformat(),
"created_at": now.isoformat(),
}
]
mock_vikingdb.query = AsyncMock(return_value=records)

result = await aggregator.get_memory_stats(mock_ctx)

assert result["total_memories"] == 0
for cat in result["by_category"]:
assert result["by_category"][cat] == 0

@pytest.mark.asyncio
async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx):
"""Records should be classified into cold/warm/hot based on score."""
now = datetime.now(timezone.utc)
# Recent + high access -> hot
hot_record = _make_memory_record("cases", active_count=50, updated_at=now)
# Old + no access -> cold
cold_record = _make_memory_record(
"cases", active_count=0, updated_at=now - timedelta(days=60)
)

mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record])

result = await aggregator.get_memory_stats(mock_ctx, category="cases")
Expand All @@ -115,6 +157,7 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx):
updated_at=now - timedelta(days=40),
created_at=now - timedelta(days=50),
)

mock_vikingdb.query = AsyncMock(return_value=[old_record])

result = await aggregator.get_memory_stats(mock_ctx, category="events")
Expand All @@ -124,14 +167,43 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx):
assert result["staleness"]["oldest_memory_age_days"] >= 49

@pytest.mark.asyncio
async def test_query_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx):
"""If VikingDB query fails, the category should show 0 records."""
mock_vikingdb.query = AsyncMock(side_effect=Exception("connection error"))
async def test_category_filter_excludes_other_records_from_metrics(
self, aggregator, mock_vikingdb, mock_ctx
):
"""When a category filter is applied, hotness/staleness should only
count records that match the filter, even if the query returns
records from other categories.
"""
now = datetime.now(timezone.utc)
records = [
_make_memory_record("cases", active_count=50, updated_at=now),
_make_memory_record(
"tools", active_count=0, updated_at=now - timedelta(days=60)
),
]
mock_vikingdb.query = AsyncMock(return_value=records)

result = await aggregator.get_memory_stats(mock_ctx, category="cases")

assert result["by_category"]["cases"] == 1
assert result["total_memories"] == 1
# Only the "cases" record should contribute to hotness
assert result["hotness_distribution"]["hot"] == 1
assert result["hotness_distribution"]["cold"] == 0
# Only the "cases" record should contribute to staleness
assert result["staleness"]["not_accessed_7d"] == 0
assert result["staleness"]["not_accessed_30d"] == 0

@pytest.mark.asyncio
async def test_query_error_returns_zeros(self, aggregator, mock_vikingdb, mock_ctx):
"""If the vector query fails, stats should gracefully return zeros."""
mock_vikingdb.query = AsyncMock(side_effect=Exception("db down"))

result = await aggregator.get_memory_stats(mock_ctx, category="cases")

assert result["by_category"]["cases"] == 0
assert result["total_memories"] == 0
assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0}


class TestParseDatetime:
Expand Down