diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index da320e6a4..56ebd0f8e 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -11,7 +11,7 @@ from openviking.retrieve.memory_lifecycle import hotness_score from openviking.server.identity import RequestContext -from openviking.storage.expr import Eq +from openviking.storage.expr import And, Eq from openviking_cli.utils import get_logger logger = get_logger(__name__) @@ -33,6 +33,21 @@ HOT_THRESHOLD = 0.6 +def _category_from_uri(uri: str) -> Optional[str]: + """Determine memory category from its Viking URI. + + Handles both directory-based memories (``/preferences/mem_*.md``) + and the single-file ``profile.md`` at the memories root. + """ + for cat in MEMORY_CATEGORIES: + if cat == "profile": + if uri.endswith("/profile.md"): + return cat + elif f"/{cat}/" in uri: + return cat + return None + + class StatsAggregator: """Aggregates memory health statistics from VikingDB. @@ -63,7 +78,7 @@ async def get_memory_stats( # Build category list to query categories = [category] if category else MEMORY_CATEGORIES - by_category: Dict[str, int] = {} + by_category: Dict[str, int] = {cat: 0 for cat in categories} hotness_dist = {"cold": 0, "warm": 0, "hot": 0} staleness = { "not_accessed_7d": 0, @@ -71,49 +86,45 @@ async def get_memory_stats( "oldest_memory_age_days": 0, } - # Fetch all memories once and group by category in Python all_records = await self._query_all_memories(ctx) - grouped: Dict[str, List[Dict[str, Any]]] = {cat: [] for cat in categories} for record in all_records: uri = record.get("uri", "") - for cat in categories: - if f"/{cat}/" in uri: - grouped[cat].append(record) - break - - for cat in categories: - records = grouped[cat] - by_category[cat] = len(records) - - for record in records: - active_count = record.get("active_count", 0) - updated_at_raw = record.get("updated_at") - updated_at = _parse_datetime(updated_at_raw) - created_at_raw = record.get("created_at") - created_at = _parse_datetime(created_at_raw) - - # Hotness distribution - score = hotness_score(active_count, updated_at, now=now) - if score < COLD_THRESHOLD: - hotness_dist["cold"] += 1 - elif score > HOT_THRESHOLD: - hotness_dist["hot"] += 1 - else: - hotness_dist["warm"] += 1 - - # Staleness: use updated_at for access tracking - if updated_at: - age_days = (now - updated_at).total_seconds() / 86400.0 - if age_days > 7: - staleness["not_accessed_7d"] += 1 - if age_days > 30: - staleness["not_accessed_30d"] += 1 - - # Track oldest memory by created_at - if created_at: - age = (now - created_at).total_seconds() / 86400.0 - if age > staleness["oldest_memory_age_days"]: - staleness["oldest_memory_age_days"] = round(age, 1) + record_cat = _category_from_uri(uri) + + # Skip records not in the requested categories + if not (record_cat and record_cat in by_category): + continue + + by_category[record_cat] += 1 + + active_count = record.get("active_count", 0) + updated_at_raw = record.get("updated_at") + updated_at = _parse_datetime(updated_at_raw) + created_at_raw = record.get("created_at") + created_at = _parse_datetime(created_at_raw) + + # Hotness distribution + score = hotness_score(active_count, updated_at, now=now) + if score < COLD_THRESHOLD: + hotness_dist["cold"] += 1 + elif score > HOT_THRESHOLD: + hotness_dist["hot"] += 1 + else: + hotness_dist["warm"] += 1 + + # Staleness: use updated_at for access tracking + if updated_at: + age_days = (now - updated_at).total_seconds() / 86400.0 + if age_days > 7: + staleness["not_accessed_7d"] += 1 + if age_days > 30: + staleness["not_accessed_30d"] += 1 + + # Track oldest memory by created_at + if created_at: + age = (now - created_at).total_seconds() / 86400.0 + if age > staleness["oldest_memory_age_days"]: + staleness["oldest_memory_age_days"] = round(age, 1) total_memories = sum(by_category.values()) @@ -156,14 +167,14 @@ async def _query_all_memories( self, ctx: RequestContext, ) -> List[Dict[str, Any]]: - """Query all memory records in a single DB round-trip. + """Query all memory records from the vector index. - Uses the context_type="memory" filter. Callers group by category - in Python to avoid N+1 queries. + Filters to ``level=2`` so only actual memory records are returned, + excluding directory-level abstract/overview metadata. """ try: return await self._vikingdb.query( - filter=Eq("context_type", "memory"), + filter=And([Eq("context_type", "memory"), Eq("level", 2)]), limit=10000, output_fields=[ "uri", diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index 5f0e26473..94d7e3e7b 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -19,7 +19,9 @@ def mock_vikingdb(): @pytest.fixture def mock_ctx(): """Create a mock request context.""" - return MagicMock() + ctx = MagicMock() + ctx.user.user_id = "default" + return ctx @pytest.fixture @@ -33,10 +35,14 @@ def _make_memory_record( updated_at: datetime = None, created_at: datetime = None, ): - """Helper to build a mock memory record.""" + """Helper to build a mock memory record with a realistic URI.""" now = datetime.now(timezone.utc) + if category == "profile": + uri = "viking://user/default/memories/profile.md" + else: + uri = f"viking://user/default/memories/{category}/test-item" return { - "uri": f"viking://memories/{category}/test-item", + "uri": uri, "context_type": "memory", "active_count": active_count, "updated_at": (updated_at or now).isoformat(), @@ -58,13 +64,14 @@ async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): @pytest.mark.asyncio async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): - """Records should be bucketed into the correct category.""" + """Records should be bucketed into the correct category from their URI.""" now = datetime.now(timezone.utc) records = [ _make_memory_record("cases", active_count=5, updated_at=now), _make_memory_record("cases", active_count=3, updated_at=now), _make_memory_record("tools", active_count=1, updated_at=now), ] + mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx) @@ -75,28 +82,63 @@ async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): @pytest.mark.asyncio async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): - """Passing a category filter should only query that category.""" + """Passing a category filter should only count that category.""" now = datetime.now(timezone.utc) records = [ _make_memory_record("patterns", active_count=2, updated_at=now), ] + mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx, category="patterns") assert "patterns" in result["by_category"] assert len(result["by_category"]) == 1 + assert result["total_memories"] == 1 + + @pytest.mark.asyncio + async def test_profile_counted(self, aggregator, mock_vikingdb, mock_ctx): + """profile.md should be counted as 1 when present in query results.""" + records = [ + _make_memory_record("profile", active_count=0), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["by_category"]["profile"] == 1 + assert result["total_memories"] == 1 + + @pytest.mark.asyncio + async def test_unrecognized_uri_ignored(self, aggregator, mock_vikingdb, mock_ctx): + """Records with unrecognized URIs should not be counted in any category.""" + now = datetime.now(timezone.utc) + records = [ + { + "uri": "viking://some/random/path", + "context_type": "memory", + "active_count": 1, + "updated_at": now.isoformat(), + "created_at": now.isoformat(), + } + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["total_memories"] == 0 + for cat in result["by_category"]: + assert result["by_category"][cat] == 0 @pytest.mark.asyncio async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): """Records should be classified into cold/warm/hot based on score.""" now = datetime.now(timezone.utc) - # Recent + high access -> hot hot_record = _make_memory_record("cases", active_count=50, updated_at=now) - # Old + no access -> cold cold_record = _make_memory_record( "cases", active_count=0, updated_at=now - timedelta(days=60) ) + mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record]) result = await aggregator.get_memory_stats(mock_ctx, category="cases") @@ -115,6 +157,7 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): updated_at=now - timedelta(days=40), created_at=now - timedelta(days=50), ) + mock_vikingdb.query = AsyncMock(return_value=[old_record]) result = await aggregator.get_memory_stats(mock_ctx, category="events") @@ -124,14 +167,43 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): assert result["staleness"]["oldest_memory_age_days"] >= 49 @pytest.mark.asyncio - async def test_query_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx): - """If VikingDB query fails, the category should show 0 records.""" - mock_vikingdb.query = AsyncMock(side_effect=Exception("connection error")) + async def test_category_filter_excludes_other_records_from_metrics( + self, aggregator, mock_vikingdb, mock_ctx + ): + """When a category filter is applied, hotness/staleness should only + count records that match the filter, even if the query returns + records from other categories. + """ + now = datetime.now(timezone.utc) + records = [ + _make_memory_record("cases", active_count=50, updated_at=now), + _make_memory_record( + "tools", active_count=0, updated_at=now - timedelta(days=60) + ), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx, category="cases") + + assert result["by_category"]["cases"] == 1 + assert result["total_memories"] == 1 + # Only the "cases" record should contribute to hotness + assert result["hotness_distribution"]["hot"] == 1 + assert result["hotness_distribution"]["cold"] == 0 + # Only the "cases" record should contribute to staleness + assert result["staleness"]["not_accessed_7d"] == 0 + assert result["staleness"]["not_accessed_30d"] == 0 + + @pytest.mark.asyncio + async def test_query_error_returns_zeros(self, aggregator, mock_vikingdb, mock_ctx): + """If the vector query fails, stats should gracefully return zeros.""" + mock_vikingdb.query = AsyncMock(side_effect=Exception("db down")) result = await aggregator.get_memory_stats(mock_ctx, category="cases") assert result["by_category"]["cases"] == 0 assert result["total_memories"] == 0 + assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0} class TestParseDatetime: