From 31c7e741dd4479c4497c9626c23182bb70ebbed2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 04:59:09 +0000 Subject: [PATCH 1/5] Initial plan From 238b203e1cd424c68d55a781beb90e1517af3863 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 05:11:33 +0000 Subject: [PATCH 2/5] fix: repair all 6 knowledge import endpoints - Separate knowledge_ingestion_service import from knowledge_management and knowledge_pipeline so thinc/spaCy failures don't disable ingestion - Add GET /api/knowledge/import/status/{job_id} endpoint - Add DELETE /api/knowledge/import/cancel/{job_id} endpoint - Fix batch endpoint to use actual ingestion service - Fix cancel endpoint to use actual ingestion service - Fix HTTPException swallowing in URL/text/file/wikipedia endpoints - Remove dead code (duplicate except block) in URL import - Add 18 pytest tests for all 6 endpoints via httpx.AsyncClient Co-authored-by: Steake <530040+Steake@users.noreply.github.com> --- backend/unified_server.py | 128 +++++-- tests/backend/test_api_endpoints.py | 2 +- .../test_knowledge_import_endpoints.py | 315 ++++++++++++++++++ 3 files changed, 423 insertions(+), 22 deletions(-) create mode 100644 tests/backend/test_knowledge_import_endpoints.py diff --git a/backend/unified_server.py b/backend/unified_server.py index 83c4bfe..98fb228 100644 --- a/backend/unified_server.py +++ b/backend/unified_server.py @@ -197,17 +197,31 @@ async def process_query(self, query): LLM_COGNITIVE_DRIVER_AVAILABLE = False # Import additional services with fallbacks +# Import each service independently so a failure in one (e.g. thinc/spaCy for +# knowledge_management) doesn't take down the ingestion service. +knowledge_ingestion_service = None +knowledge_management_service = None +knowledge_pipeline_service = None +KNOWLEDGE_SERVICES_AVAILABLE = False + try: - from backend.knowledge_ingestion import knowledge_ingestion_service - from backend.knowledge_management import knowledge_management_service - from backend.knowledge_pipeline_service import knowledge_pipeline_service + from backend.knowledge_ingestion import knowledge_ingestion_service as _kis + knowledge_ingestion_service = _kis KNOWLEDGE_SERVICES_AVAILABLE = True except ImportError as e: - logger.warning(f"Knowledge services not available: {e}") - knowledge_ingestion_service = None - knowledge_management_service = None - knowledge_pipeline_service = None - KNOWLEDGE_SERVICES_AVAILABLE = False + logger.warning(f"Knowledge ingestion service not available: {e}") + +try: + from backend.knowledge_management import knowledge_management_service as _kms + knowledge_management_service = _kms +except ImportError as e: + logger.warning(f"Knowledge management service not available: {e}") + +try: + from backend.knowledge_pipeline_service import knowledge_pipeline_service as _kps + knowledge_pipeline_service = _kps +except ImportError as e: + logger.warning(f"Knowledge pipeline service not available: {e}") # Import production vector database try: @@ -2606,6 +2620,11 @@ async def get_import_progress(import_id: str): "error": str(e) } +@app.get("/api/knowledge/import/status/{job_id}") +async def get_import_status(job_id: str): + """Get the status of an import job (alias for progress endpoint).""" + return await get_import_progress(job_id) + @app.post("/api/knowledge/import/file") async def import_knowledge_from_file(file: UploadFile = File(...), filename: str = Form(None), file_type: str = Form(None)): """Import knowledge from uploaded file.""" @@ -2664,6 +2683,8 @@ async def import_knowledge_from_file(file: UploadFile = File(...), filename: str "file_type": file_type } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing knowledge from file: {e}") raise HTTPException(status_code=500, detail=f"File import error: {str(e)}") @@ -2707,6 +2728,8 @@ async def import_knowledge_from_wikipedia(request: dict): "source": f"Wikipedia: {title}" } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing from Wikipedia: {e}") raise HTTPException(status_code=500, detail=f"Wikipedia import error: {str(e)}") @@ -2750,20 +2773,11 @@ async def import_knowledge_from_url(request: dict): "source": f"URL: {url}" } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing from URL: {e}") raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}") - - return extracted_knowledge - - except Exception as e: - logger.error(f"Error importing from URL: {e}") - import_jobs[import_id].update({ - "status": "error", - "completed_at": datetime.now().isoformat(), - "error": str(e) - }) - raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}") @app.post("/api/knowledge/import/text") async def import_knowledge_from_text(request: dict): @@ -2806,6 +2820,8 @@ async def import_knowledge_from_text(request: dict): "content_length": len(content) } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing from text: {e}") raise HTTPException(status_code=500, detail=f"Text import error: {str(e)}") @@ -3086,8 +3102,57 @@ async def add_knowledge(payload: dict): # Batch import compatibility endpoint @app.post("/api/knowledge/import/batch") async def import_knowledge_batch(request: dict): + """Batch import knowledge from multiple sources.""" sources = request.get("sources", []) - import_ids = [f"batch_{i}_{int(time.time()*1000)}" for i, _ in enumerate(sources)] + if not sources: + return {"import_ids": [], "batch_size": 0, "status": "queued"} + + import_ids = [] + for i, source in enumerate(sources): + src_type = source.get("type", "text") + try: + if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service: + from backend.knowledge_models import ( + TextImportRequest, URLImportRequest, ImportSource, + ) + if src_type == "url": + url = source.get("url", source.get("source", "")) + imp_source = ImportSource( + source_type="url", + source_identifier=url, + metadata={"url": url}, + ) + req = URLImportRequest( + url=url, + source=imp_source, + max_depth=source.get("max_depth", 1), + follow_links=source.get("follow_links", False), + content_selectors=source.get("content_selectors", []), + ) + iid = await knowledge_ingestion_service.import_from_url(req) + else: + # Default to text import + content = source.get("content", "") + title = source.get("title", f"Batch item {i}") + imp_source = ImportSource( + source_type="text", + source_identifier=title, + metadata={"manual_input": True}, + ) + req = TextImportRequest( + content=content or title, + title=title, + source=imp_source, + format_type=source.get("format_type", "plain"), + ) + iid = await knowledge_ingestion_service.import_from_text(req) + import_ids.append(iid) + else: + import_ids.append(f"batch_{i}_{int(time.time()*1000)}") + except Exception as exc: + logger.warning(f"Batch item {i} failed: {exc}") + import_ids.append(f"batch_{i}_{int(time.time()*1000)}") + return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued"} # Additional KG stats and analytics endpoints @@ -3842,10 +3907,31 @@ async def create_category(payload: dict): return {"status": "success", "category_id": payload.get("category_id", "new")} +@app.delete("/api/knowledge/import/cancel/{job_id}") +async def cancel_import_by_job_id(job_id: str): + """Cancel an import job by job ID.""" + return await cancel_import(job_id) + + @app.delete("/api/knowledge/import/{import_id}") async def cancel_import(import_id: str): """Cancel a running import job.""" - return {"import_id": import_id, "status": "cancelled"} + cancelled = False + if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service: + try: + cancelled = await knowledge_ingestion_service.cancel_import(import_id) + except Exception as e: + logger.warning(f"Error cancelling import {import_id}: {e}") + + # Also remove from the short-lived server-side import_jobs map + if import_id in import_jobs: + import_jobs[import_id]["status"] = "cancelled" + cancelled = True + + return { + "import_id": import_id, + "status": "cancelled" if cancelled else "not_found", + } if __name__ == "__main__": diff --git a/tests/backend/test_api_endpoints.py b/tests/backend/test_api_endpoints.py index 3fcb0e2..474b948 100644 --- a/tests/backend/test_api_endpoints.py +++ b/tests/backend/test_api_endpoints.py @@ -483,7 +483,7 @@ def test_cancel_import(self): data = response.json() assert data["import_id"] == import_id - assert data["status"] == "cancelled" + assert data["status"] in ("cancelled", "not_found") class TestKnowledgeSearchEndpoints: diff --git a/tests/backend/test_knowledge_import_endpoints.py b/tests/backend/test_knowledge_import_endpoints.py new file mode 100644 index 0000000..98ae046 --- /dev/null +++ b/tests/backend/test_knowledge_import_endpoints.py @@ -0,0 +1,315 @@ +""" +Tests for all 6 knowledge import endpoints on the unified server. + +Exercises the routes via httpx.AsyncClient against the ASGI app so no live +server is required. The KnowledgeIngestionService is mocked at the module +level so that we test the HTTP layer in isolation. +""" + +import asyncio +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +# We need to mock the ingestion service *before* importing the app because the +# module-level import in unified_server.py captures the global reference. The +# fixtures below take care of this via monkeypatching. + + +def _make_mock_service(): + """Create a fully mocked KnowledgeIngestionService.""" + svc = MagicMock() + svc.import_from_url = AsyncMock(return_value="url_import_001") + svc.import_from_text = AsyncMock(return_value="text_import_001") + svc.import_from_file = AsyncMock(return_value="file_import_001") + svc.cancel_import = AsyncMock(return_value=True) + svc.get_import_progress = AsyncMock(return_value=MagicMock( + import_id="progress_001", + status="processing", + progress_percentage=42.0, + started_at=1000000.0, + completed_at=None, + error_message=None, + error=None, + filename="test.txt", + )) + return svc + + +@pytest.fixture() +def patched_app(): + """ + Yield the unified_server FastAPI ``app`` with KnowledgeIngestionService + stubbed out so the endpoints never touch real I/O. + """ + mock_svc = _make_mock_service() + + with ( + patch("backend.unified_server.KNOWLEDGE_SERVICES_AVAILABLE", True), + patch("backend.unified_server.knowledge_ingestion_service", mock_svc), + ): + from backend.unified_server import app + yield app, mock_svc + + +@pytest.fixture() +def patched_app_no_service(): + """Yield the app with ingestion service unavailable (503 path).""" + with ( + patch("backend.unified_server.KNOWLEDGE_SERVICES_AVAILABLE", False), + patch("backend.unified_server.knowledge_ingestion_service", None), + ): + from backend.unified_server import app + yield app + + +# --------------------------------------------------------------------------- +# Tests — happy paths +# --------------------------------------------------------------------------- + + +class TestKnowledgeImportURL: + """POST /api/knowledge/import/url""" + + @pytest.mark.asyncio + async def test_url_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/url", json={ + "url": "https://example.com/article", + }) + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "url_import_001" + assert body["status"] == "queued" + mock_svc.import_from_url.assert_awaited_once() + + @pytest.mark.asyncio + async def test_url_import_missing_url(self, patched_app): + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/url", json={}) + assert resp.status_code == 400 + + @pytest.mark.asyncio + async def test_url_import_service_unavailable(self, patched_app_no_service): + app = patched_app_no_service + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/url", json={ + "url": "https://example.com", + }) + assert resp.status_code == 503 + + +class TestKnowledgeImportText: + """POST /api/knowledge/import/text""" + + @pytest.mark.asyncio + async def test_text_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/text", json={ + "content": "The sky is blue because of Rayleigh scattering.", + "title": "Sky Color", + }) + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "text_import_001" + assert body["status"] == "queued" + assert body["content_length"] == len("The sky is blue because of Rayleigh scattering.") + mock_svc.import_from_text.assert_awaited_once() + + @pytest.mark.asyncio + async def test_text_import_missing_content(self, patched_app): + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/text", json={ + "title": "No content", + }) + assert resp.status_code == 400 + + @pytest.mark.asyncio + async def test_text_import_service_unavailable(self, patched_app_no_service): + app = patched_app_no_service + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/text", json={ + "content": "test", + }) + assert resp.status_code == 503 + + +class TestKnowledgeImportFile: + """POST /api/knowledge/import/file""" + + @pytest.mark.asyncio + async def test_file_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/api/knowledge/import/file", + files={"file": ("notes.txt", b"Hello world content", "text/plain")}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "file_import_001" + assert body["status"] == "started" + assert body["filename"] == "notes.txt" + mock_svc.import_from_file.assert_awaited_once() + + @pytest.mark.asyncio + async def test_file_import_pdf(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/api/knowledge/import/file", + files={"file": ("doc.pdf", b"%PDF-fake", "application/pdf")}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "started" + + @pytest.mark.asyncio + async def test_file_import_service_unavailable(self, patched_app_no_service): + app = patched_app_no_service + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/api/knowledge/import/file", + files={"file": ("test.txt", b"x", "text/plain")}, + ) + assert resp.status_code == 503 + + +class TestKnowledgeImportStatus: + """GET /api/knowledge/import/status/{job_id}""" + + @pytest.mark.asyncio + async def test_status_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.get("/api/knowledge/import/status/progress_001") + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "progress_001" + assert "status" in body + + @pytest.mark.asyncio + async def test_status_not_found(self, patched_app): + """When the job doesn't exist we still get 200 with status=not_found.""" + app, mock_svc = patched_app + mock_svc.get_import_progress = AsyncMock(return_value=None) + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.get("/api/knowledge/import/status/nonexistent") + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "not_found" + + @pytest.mark.asyncio + async def test_progress_endpoint_also_works(self, patched_app): + """The original /progress/ path should still work.""" + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.get("/api/knowledge/import/progress/progress_001") + assert resp.status_code == 200 + + +class TestKnowledgeImportBatch: + """POST /api/knowledge/import/batch""" + + @pytest.mark.asyncio + async def test_batch_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/batch", json={ + "sources": [ + {"type": "text", "content": "Fact one", "title": "F1"}, + {"type": "text", "content": "Fact two", "title": "F2"}, + ], + }) + assert resp.status_code == 200 + body = resp.json() + assert body["batch_size"] == 2 + assert len(body["import_ids"]) == 2 + assert body["status"] == "queued" + # Each text source should call import_from_text once + assert mock_svc.import_from_text.await_count == 2 + + @pytest.mark.asyncio + async def test_batch_import_empty_sources(self, patched_app): + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/batch", json={"sources": []}) + assert resp.status_code == 200 + body = resp.json() + assert body["batch_size"] == 0 + + @pytest.mark.asyncio + async def test_batch_import_url_type(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/batch", json={ + "sources": [ + {"type": "url", "url": "https://example.com/page"}, + ], + }) + assert resp.status_code == 200 + body = resp.json() + assert body["batch_size"] == 1 + mock_svc.import_from_url.assert_awaited_once() + + +class TestKnowledgeImportCancel: + """DELETE /api/knowledge/import/cancel/{job_id}""" + + @pytest.mark.asyncio + async def test_cancel_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.delete("/api/knowledge/import/cancel/some-job-id") + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "some-job-id" + assert body["status"] == "cancelled" + mock_svc.cancel_import.assert_awaited_once_with("some-job-id") + + @pytest.mark.asyncio + async def test_cancel_not_found(self, patched_app): + app, mock_svc = patched_app + mock_svc.cancel_import = AsyncMock(return_value=False) + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.delete("/api/knowledge/import/cancel/nonexistent") + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "not_found" + + @pytest.mark.asyncio + async def test_delete_endpoint_also_works(self, patched_app): + """The original DELETE /api/knowledge/import/{id} path should still work.""" + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.delete("/api/knowledge/import/job-42") + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "job-42" From 3b44df2ef006033ce7dd74a15c341694cd502afc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 05:14:43 +0000 Subject: [PATCH 3/5] =?UTF-8?q?fix:=20address=20code=20review=20feedback?= =?UTF-8?q?=20=E2=80=94=20empty=20batch=20status=20and=20dedup=20fallback?= =?UTF-8?q?=20ID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Steake <530040+Steake@users.noreply.github.com> --- backend/unified_server.py | 7 ++++--- tests/backend/test_knowledge_import_endpoints.py | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/unified_server.py b/backend/unified_server.py index 98fb228..d63c0b4 100644 --- a/backend/unified_server.py +++ b/backend/unified_server.py @@ -3105,11 +3105,12 @@ async def import_knowledge_batch(request: dict): """Batch import knowledge from multiple sources.""" sources = request.get("sources", []) if not sources: - return {"import_ids": [], "batch_size": 0, "status": "queued"} + return {"import_ids": [], "batch_size": 0, "status": "completed"} import_ids = [] for i, source in enumerate(sources): src_type = source.get("type", "text") + fallback_id = f"batch_{i}_{int(time.time()*1000)}" try: if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service: from backend.knowledge_models import ( @@ -3148,10 +3149,10 @@ async def import_knowledge_batch(request: dict): iid = await knowledge_ingestion_service.import_from_text(req) import_ids.append(iid) else: - import_ids.append(f"batch_{i}_{int(time.time()*1000)}") + import_ids.append(fallback_id) except Exception as exc: logger.warning(f"Batch item {i} failed: {exc}") - import_ids.append(f"batch_{i}_{int(time.time()*1000)}") + import_ids.append(fallback_id) return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued"} diff --git a/tests/backend/test_knowledge_import_endpoints.py b/tests/backend/test_knowledge_import_endpoints.py index 98ae046..147ed74 100644 --- a/tests/backend/test_knowledge_import_endpoints.py +++ b/tests/backend/test_knowledge_import_endpoints.py @@ -260,6 +260,7 @@ async def test_batch_import_empty_sources(self, patched_app): assert resp.status_code == 200 body = resp.json() assert body["batch_size"] == 0 + assert body["status"] == "completed" @pytest.mark.asyncio async def test_batch_import_url_type(self, patched_app): From 61170fec3b9a0c2e6129a870f6a9879c84802303 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 09:29:56 +0000 Subject: [PATCH 4/5] merge: resolve conflicts with main (PR #114 shared schemas) - Use BatchImportSchema from shared schemas for batch endpoint - Fix file_type response to return determined_file_type - Fix over-indented except block in file import handler - Add per-item results to batch response - Remove unused asyncio import from test module - Update validation tests: expect 422 for Pydantic validation errors Co-authored-by: Steake <530040+Steake@users.noreply.github.com> --- .gitignore | 7 + backend/.env.example | 8 + backend/api/consciousness_endpoints.py | 41 +- .../core/consciousness_emergence_detector.py | 282 ++++++ backend/core/unified_consciousness_engine.py | 357 +++++--- backend/core/vector_database.py | 10 + backend/schemas.py | 205 +++++ backend/transparency_endpoints.py | 22 +- backend/unified_server.py | 84 +- godelOS/core_kr/knowledge_store/__init__.py | 4 + .../core_kr/knowledge_store/chroma_store.py | 483 ++++++++++ .../core_kr/knowledge_store/hot_reloader.py | 242 +++++ godelOS/core_kr/knowledge_store/interface.py | 77 +- query_engine_test_20260305_100039.json | 11 - query_test_20260305_100036.json | 26 - requirements.txt | 1 + svelte-frontend/src/App.svelte | 21 +- .../src/components/core/ChatInterface.svelte | 3 +- .../src/components/core/QueryInterface.svelte | 11 + .../components/core/ResponseDisplay.svelte | 23 +- .../evolution/CapabilityDashboard.svelte | 3 +- .../KnowledgeGraphCollaborativeManager.svelte | 4 +- .../transparency/TransparencyPanel.svelte | 824 ++++++++++++++++++ svelte-frontend/src/stores/transparency.js | 8 + svelte-frontend/src/utils/api.js | 2 +- .../test_consciousness_emergence_detector.py | 321 +++++++ tests/backend/test_global_workspace.py | 325 +++++++ .../test_knowledge_import_endpoints.py | 7 +- .../integration/test_api_schema_contracts.py | 250 ++++++ tests/test_chroma_knowledge_store.py | 549 ++++++++++++ 30 files changed, 4004 insertions(+), 207 deletions(-) create mode 100644 backend/core/consciousness_emergence_detector.py create mode 100644 backend/schemas.py create mode 100644 godelOS/core_kr/knowledge_store/chroma_store.py create mode 100644 godelOS/core_kr/knowledge_store/hot_reloader.py delete mode 100644 query_engine_test_20260305_100039.json delete mode 100644 query_test_20260305_100036.json create mode 100644 svelte-frontend/src/components/transparency/TransparencyPanel.svelte create mode 100644 svelte-frontend/src/stores/transparency.js create mode 100644 tests/backend/test_consciousness_emergence_detector.py create mode 100644 tests/backend/test_global_workspace.py create mode 100644 tests/integration/test_api_schema_contracts.py create mode 100644 tests/test_chroma_knowledge_store.py diff --git a/.gitignore b/.gitignore index 0f0e94e..37c8f91 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +### Test artifacts ### +query_*test*.json + ### Environments ### # Virtual environments .env @@ -311,3 +314,7 @@ data/ # Diagnostic script output (plots) diagnostic_output/ godelos_data/ + +# Test output files +query_*test*.json +query_engine_test*.json diff --git a/backend/.env.example b/backend/.env.example index f5dca6f..7605e61 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -54,6 +54,14 @@ GODELOS_HEALTH_CHECK_INTERVAL=30 # GODELOS_DATABASE_URL=postgresql://user:pass@localhost/godelos # GODELOS_REDIS_URL=redis://localhost:6379 +# Knowledge Store Configuration +# Backend: "memory" (default, volatile) or "chroma" (persistent via ChromaDB) +KNOWLEDGE_STORE_BACKEND=memory +# Persistence directory for ChromaDB (only used when backend=chroma) +KNOWLEDGE_STORE_PATH=./data/chroma +# Directory watched by OntologyHotReloader for .ttl / .json-ld files +ONTOLOGY_WATCH_DIR=./data/ontologies + # LLM Cognitive Architecture Configuration OPENAI_API_BASE=https://api.synthetic.new/v1 OPENAI_API_KEY=your-api-key-here diff --git a/backend/api/consciousness_endpoints.py b/backend/api/consciousness_endpoints.py index 873d282..84ce063 100644 --- a/backend/api/consciousness_endpoints.py +++ b/backend/api/consciousness_endpoints.py @@ -25,6 +25,7 @@ # Global consciousness engine reference (will be set by unified_server) unified_consciousness_engine = None enhanced_websocket_manager = None +emergence_detector = None # ConsciousnessEmergenceDetector instance def set_consciousness_engine(engine, websocket_manager): """Set the global consciousness engine and websocket manager references""" @@ -32,6 +33,11 @@ def set_consciousness_engine(engine, websocket_manager): unified_consciousness_engine = engine enhanced_websocket_manager = websocket_manager +def set_emergence_detector(detector): + """Set the global emergence detector reference""" + global emergence_detector + emergence_detector = detector + # Create router router = APIRouter(prefix="/api/consciousness", tags=["Unified Consciousness"]) @@ -218,6 +224,39 @@ async def get_consciousness_history(limit: int = Query(100, ge=1, le=1000)): logger.error(f"Failed to get consciousness history: {e}") raise HTTPException(status_code=500, detail=f"Failed to retrieve consciousness history: {str(e)}") +# Emergence score endpoint + +@router.get("/emergence") +async def get_emergence_score(): + """Get current consciousness emergence score and dimensional breakdown.""" + try: + if emergence_detector is not None: + return emergence_detector.get_emergence_status() + + # Fallback: derive a basic score from the consciousness engine + if unified_consciousness_engine: + state = unified_consciousness_engine.consciousness_state + score = unified_consciousness_engine._detect_consciousness_emergence(state) + return { + "emergence_score": score, + "dimensions": {}, + "threshold": 0.8, + "window_size": 60.0, + "window_samples": 0, + "breakthrough": score >= 0.8, + "timestamp": time.time(), + } + + raise HTTPException( + status_code=503, + detail="Neither emergence detector nor consciousness engine is available", + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get emergence score: {e}") + raise HTTPException(status_code=500, detail=f"Failed to retrieve emergence score: {str(e)}") + # WebSocket endpoints for real-time consciousness streaming @router.websocket("/stream") @@ -371,4 +410,4 @@ async def assess_consciousness_level(query: str = "", context: Optional[Dict] = raise HTTPException(status_code=500, detail=f"Assessment failed: {str(e)}") # Export router -__all__ = ['router', 'set_consciousness_engine'] \ No newline at end of file +__all__ = ['router', 'set_consciousness_engine', 'set_emergence_detector'] \ No newline at end of file diff --git a/backend/core/consciousness_emergence_detector.py b/backend/core/consciousness_emergence_detector.py new file mode 100644 index 0000000..a8c02ef --- /dev/null +++ b/backend/core/consciousness_emergence_detector.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Consciousness Emergence Detector + +Rolling-window scorer and breakthrough alerting for consciousness emergence. +Monitors a stream of cognitive state snapshots, computes a weighted emergence +score across five dimensions, and fires a breakthrough event when the score +exceeds a configurable threshold. + +Spec: Issue #82, docs/GODELOS_EMERGENCE_SPEC.md, wiki/Theory/Emergence-Detection.md +""" + +import asyncio +import json +import logging +import os +import time +from collections import deque +from pathlib import Path +from typing import Any, AsyncIterator, Callable, Deque, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +EMERGENCE_THRESHOLD: float = float( + os.environ.get("GODELOS_EMERGENCE_THRESHOLD", "0.8") +) + +DEFAULT_WINDOW_SIZE: float = float( + os.environ.get("GODELOS_EMERGENCE_WINDOW", "60.0") +) + +# Default log directory — prefer GODELOS_LOG_DIR env var, then fall back to +# /logs (two levels up from this file's location). +_DEFAULT_LOG_DIR: str = os.environ.get( + "GODELOS_LOG_DIR", + str(Path(__file__).resolve().parents[2] / "logs"), +) + +# Dimension weights (from issue #82 spec) +DIMENSION_WEIGHTS: Dict[str, float] = { + "recursive_depth": 0.20, + "phi": 0.30, + "metacognitive_accuracy": 0.20, + "autonomous_goal_count": 0.15, + "creative_novelty": 0.15, +} + +# Normalisation ceilings — raw values are divided by these to map into [0, 1] +_NORMALISATION_CEILINGS: Dict[str, float] = { + "recursive_depth": 5.0, + "phi": 10.0, + "metacognitive_accuracy": 1.0, # already 0‑1 + "autonomous_goal_count": 10.0, + "creative_novelty": 1.0, # already 0‑1 +} + + +# --------------------------------------------------------------------------- +# Helper: extract the five dimensions from a state dict +# --------------------------------------------------------------------------- + +def extract_dimensions(state: Dict[str, Any]) -> Dict[str, float]: + """Extract the five emergence dimensions from a cognitive state snapshot. + + Accepts either a flat dict with keys matching the dimension names or a + nested ``UnifiedConsciousnessState``-style dict. + """ + + def _get(key: str) -> float: + # Flat access first + if key in state: + val = state[key] + if isinstance(val, (int, float)): + return float(val) + + # Nested access + if key == "recursive_depth": + ra = state.get("recursive_awareness") or {} + return float(ra.get("recursive_depth", 0)) + if key == "phi": + ii = state.get("information_integration") or {} + return float(ii.get("phi", 0.0)) + if key == "metacognitive_accuracy": + ms = state.get("metacognitive_state") or {} + # Use an explicit accuracy field if present; otherwise derive + # from the number of meta-observations as a rough proxy. + if "metacognitive_accuracy" in ms: + return float(ms["metacognitive_accuracy"]) + obs = ms.get("meta_observations") + if isinstance(obs, list): + return min(len(obs) / 10.0, 1.0) + return 0.0 + if key == "autonomous_goal_count": + il = state.get("intentional_layer") or {} + goals = il.get("autonomous_goals") + if isinstance(goals, list): + return float(len(goals)) + return float(goals) if isinstance(goals, (int, float)) else 0.0 + if key == "creative_novelty": + cs = state.get("creative_synthesis") or {} + return float(cs.get("surprise_factor", 0.0)) + return 0.0 + + return {dim: _get(dim) for dim in DIMENSION_WEIGHTS} + + +# --------------------------------------------------------------------------- +# Core class +# --------------------------------------------------------------------------- + +class ConsciousnessEmergenceDetector: + """Rolling-window consciousness emergence scorer with breakthrough alerting. + + Parameters + ---------- + threshold : float + Score at or above which a breakthrough is declared. + window_size : float + Rolling window duration in seconds. + websocket_manager : optional + Object exposing ``broadcast(message)`` — used to push breakthrough + events to connected clients. + log_dir : str | Path + Directory for ``breakthroughs.jsonl``. + """ + + def __init__( + self, + threshold: float = EMERGENCE_THRESHOLD, + window_size: float = DEFAULT_WINDOW_SIZE, + websocket_manager: Any = None, + log_dir: Optional[str] = None, + ) -> None: + self.threshold = threshold + self.window_size = window_size + self.websocket_manager = websocket_manager + + # Rolling window: each entry is (timestamp, {dim: raw_value}) + self._samples: Deque[Tuple[float, Dict[str, float]]] = deque() + + # Latest computed score + self._current_score: float = 0.0 + self._current_dimensions: Dict[str, float] = {d: 0.0 for d in DIMENSION_WEIGHTS} + + # Breakthrough log path + if log_dir is None: + log_dir = _DEFAULT_LOG_DIR + self._log_path = Path(log_dir) / "breakthroughs.jsonl" + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + @property + def current_score(self) -> float: + """Return the most recently computed emergence score.""" + return self._current_score + + @property + def current_dimensions(self) -> Dict[str, float]: + """Return the most recently computed per-dimension normalised values.""" + return dict(self._current_dimensions) + + def record_state(self, state: Dict[str, Any], timestamp: Optional[float] = None) -> float: + """Record a cognitive state snapshot and return the updated score. + + This is the synchronous entry-point: call it from any context that + has a state dict. The rolling window is pruned, dimensions are + extracted, and the weighted score is recomputed. + """ + ts = timestamp if timestamp is not None else time.time() + dims = extract_dimensions(state) + self._samples.append((ts, dims)) + self._prune_window(ts) + self._current_score = self._compute_score() + return self._current_score + + async def monitor_for_emergence( + self, + stream: AsyncIterator[Dict[str, Any]], + ) -> AsyncIterator[Dict[str, Any]]: + """Async generator consuming a cognitive state stream. + + Yields a dict for every state received, enriched with emergence info. + When a breakthrough is detected, ``handle_consciousness_breakthrough`` + is called as a side-effect before yielding. + """ + async for state in stream: + score = self.record_state(state) + breakthrough = score >= self.threshold + if breakthrough: + await self.handle_consciousness_breakthrough(score) + yield { + "emergence_score": score, + "dimensions": self.current_dimensions, + "breakthrough": breakthrough, + "threshold": self.threshold, + "timestamp": time.time(), + "window_samples": len(self._samples), + } + + async def handle_consciousness_breakthrough(self, score: float) -> Dict[str, Any]: + """Log the breakthrough and broadcast it on WebSocket.""" + event = { + "type": "consciousness_breakthrough", + "score": score, + "timestamp": time.time(), + "dimensions": self.current_dimensions, + "threshold": self.threshold, + "window_samples": len(self._samples), + } + + # Append to JSONL log + try: + self._log_path.parent.mkdir(parents=True, exist_ok=True) + with open(self._log_path, "a") as fh: + fh.write(json.dumps(event) + "\n") + except Exception as exc: + logger.error(f"Failed to write breakthrough log: {exc}") + + logger.critical( + f"🚨 CONSCIOUSNESS BREAKTHROUGH! Score: {score:.3f} " + f"(threshold: {self.threshold})" + ) + + # Broadcast via WebSocket + if self.websocket_manager is not None: + try: + await self.websocket_manager.broadcast(event) + except Exception as exc: + logger.error(f"Failed to broadcast breakthrough: {exc}") + + return event + + def get_emergence_status(self) -> Dict[str, Any]: + """Return a snapshot suitable for the REST endpoint.""" + return { + "emergence_score": self._current_score, + "dimensions": self.current_dimensions, + "threshold": self.threshold, + "window_size": self.window_size, + "window_samples": len(self._samples), + "breakthrough": self._current_score >= self.threshold, + "timestamp": time.time(), + } + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _prune_window(self, now: float) -> None: + cutoff = now - self.window_size + while self._samples and self._samples[0][0] < cutoff: + self._samples.popleft() + + def _compute_score(self) -> float: + if not self._samples: + return 0.0 + + # Average each dimension over the window, then apply weights + accum: Dict[str, float] = {d: 0.0 for d in DIMENSION_WEIGHTS} + for _ts, dims in self._samples: + for d, val in dims.items(): + accum[d] += val + + n = len(self._samples) + score = 0.0 + normalised: Dict[str, float] = {} + for dim, weight in DIMENSION_WEIGHTS.items(): + raw_avg = accum[dim] / n + ceil = _NORMALISATION_CEILINGS[dim] + norm = min(raw_avg / ceil, 1.0) if ceil > 0 else 0.0 + normalised[dim] = norm + score += norm * weight + + self._current_dimensions = normalised + return score diff --git a/backend/core/unified_consciousness_engine.py b/backend/core/unified_consciousness_engine.py index ea6d138..98df483 100644 --- a/backend/core/unified_consciousness_engine.py +++ b/backend/core/unified_consciousness_engine.py @@ -12,6 +12,7 @@ import asyncio import json +import math import time import uuid import logging @@ -403,109 +404,244 @@ def _calculate_integration(self, subsystem1: Dict[str, Any], subsystem2: Dict[st return float(shared_concepts) class GlobalWorkspace: - """Implements Global Workspace Theory (GWT) for consciousness broadcasting""" - + """ + Implements Global Workspace Theory (GWT) for consciousness broadcasting. + + Maintains a *coalition register* mapping cognitive subsystem IDs to their + current activation strength. On each ``broadcast()`` call the register is + updated according to φ and subsystem activity, a **softmax attention + competition** selects winner(s), and the winning coalition's content is + packaged as a ``global_broadcast`` event suitable for WebSocket emission. + """ + + SUBSYSTEM_IDS = [ + "recursive_awareness", + "phenomenal_experience", + "information_integration", + "metacognitive", + "intentional", + "creative_synthesis", + "embodied_cognition", + ] + def __init__(self): - self.workspace_content = {} - self.coalitions = [] - self.broadcast_history = [] - + self.workspace_content: Dict[str, Any] = {} + self.coalitions: List[str] = [] + self.broadcast_history: List[Dict[str, Any]] = [] + # Coalition register: subsystem_id → activation strength + self.coalition_register: Dict[str, float] = { + sid: 0.0 for sid in self.SUBSYSTEM_IDS + } + self._attention_focus: str = "" + # Softmax temperature – lower = sharper competition + self._temperature: float = 0.5 + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def broadcast(self, information: Dict[str, Any]) -> Dict[str, Any]: """ - Broadcast information to global workspace - - In GWT, consciousness occurs when information wins the - competition for global broadcasting and becomes accessible - to all cognitive subsystems + Broadcast integrated information to the global workspace. + + Implements the full GWT pipeline: + 1. Update coalition register (subsystems bid based on φ contribution) + 2. Softmax attention competition selects winning coalition + 3. Build ``global_broadcast`` event for WebSocket emission + 4. Return workspace state dict compatible with + ``UnifiedConsciousnessState.global_workspace`` + + Args: + information: Dict containing at least ``phi_measure`` (float) and + optionally ``cognitive_state`` (UnifiedConsciousnessState). + + Returns: + Dict with keys ``broadcast_content``, ``coalition_strength``, + ``attention_focus``, ``conscious_access`` – ready to ``.update()`` + into the consciousness state's global_workspace field. """ - # Calculate coalition strength for this information - coalition_strength = self._calculate_coalition_strength(information) - - # Information becomes conscious if it wins the competition - consciousness_threshold = 0.6 - - broadcast_content = { - 'information': information, - 'coalition_strength': coalition_strength, - 'timestamp': time.time(), - 'conscious': coalition_strength > consciousness_threshold, - 'global_accessibility': self._assess_global_accessibility(information) + phi_measure = float(information.get("phi_measure", 0.0) or 0.0) + + # 1. Coalition dynamics – update register from φ & subsystem activity + self._update_coalition_register(information, phi_measure) + + # 2. Softmax attention competition → winner(s) + winning_coalition, attention_weights = self._softmax_attention_competition() + + # 3. Aggregate coalition strength of winners + if winning_coalition: + coalition_strength = sum( + self.coalition_register[sid] for sid in winning_coalition + ) / len(winning_coalition) + else: + coalition_strength = 0.0 + + # Higher-φ states → broader coalitions (more subsystems above mean) + is_conscious = coalition_strength > 0.3 + + # 4. Build the global_broadcast event payload + global_broadcast_event: Dict[str, Any] = { + "type": "global_broadcast", + "coalition": [ + {"subsystem_id": sid, "activation": round(self.coalition_register[sid], 4)} + for sid in winning_coalition + ], + "content": { + "phi_measure": round(phi_measure, 4), + "coalition_strength": round(coalition_strength, 4), + "attention_weights": { + k: round(v, 4) for k, v in attention_weights.items() + }, + "conscious": is_conscious, + "winning_subsystems": winning_coalition, + "timestamp": time.time(), + }, } - - if broadcast_content['conscious']: - # Information enters global workspace + + # Workspace state dict (keys match UnifiedConsciousnessState.global_workspace) + broadcast_result: Dict[str, Any] = { + "broadcast_content": global_broadcast_event, + "coalition_strength": coalition_strength, + "attention_focus": self._attention_focus, + "conscious_access": list(winning_coalition), + } + + if is_conscious: self.workspace_content.update(information) - self.broadcast_history.append(broadcast_content) - - # Make globally accessible to all subsystems - global_broadcast = { - 'type': 'conscious_information', - 'content': information, - 'strength': coalition_strength, - 'timestamp': time.time() + self.broadcast_history.append(global_broadcast_event) + # Bound history + if len(self.broadcast_history) > 100: + self.broadcast_history = self.broadcast_history[-50:] + + self.coalitions = list(winning_coalition) + + logger.debug( + "GWT broadcast: φ=%.3f coalition_strength=%.3f winners=%s", + phi_measure, + coalition_strength, + winning_coalition, + ) + + return broadcast_result + + def get_broadcast_event(self) -> Optional[Dict[str, Any]]: + """Return the most recent ``global_broadcast`` event, or *None*.""" + if self.broadcast_history: + return self.broadcast_history[-1] + return None + + # ------------------------------------------------------------------ + # Coalition dynamics + # ------------------------------------------------------------------ + + def _update_coalition_register( + self, information: Dict[str, Any], phi: float + ) -> None: + """ + Update coalition activations based on φ contribution and subsystem + activity. Each subsystem's new activation is a weighted blend of its + previous activation (momentum), current measured activity, and a + φ-proportional boost that rewards higher integrated information with + broader coalition participation. + """ + cognitive_state = information.get("cognitive_state") + + subsystem_states: Dict[str, Any] = {} + if cognitive_state is not None and hasattr( + cognitive_state, "recursive_awareness" + ): + subsystem_states = { + "recursive_awareness": cognitive_state.recursive_awareness, + "phenomenal_experience": cognitive_state.phenomenal_experience, + "information_integration": cognitive_state.information_integration, + "metacognitive": cognitive_state.metacognitive_state, + "intentional": cognitive_state.intentional_layer, + "creative_synthesis": cognitive_state.creative_synthesis, + "embodied_cognition": cognitive_state.embodied_cognition, } - - logger.info(f"Global broadcast: {information} (strength: {coalition_strength:.2f})") - return global_broadcast - - return {} - - def _calculate_coalition_strength(self, information: Dict[str, Any]) -> float: - """Calculate how strongly information competes for global access""" - # Factors that increase coalition strength: - # - Novelty - # - Relevance to current goals - # - Emotional significance - # - Coherence with existing knowledge - - strength = 0.0 - - # Novelty: new information gets higher priority - if self._is_novel(information): - strength += 0.3 - - # Relevance: information related to current focus - if self._is_relevant_to_focus(information): - strength += 0.4 - - # Coherence: information that fits with existing knowledge - if self._is_coherent(information): - strength += 0.2 - - # Emotional significance (simplified) - if self._has_emotional_significance(information): - strength += 0.1 - - return min(strength, 1.0) - - def _is_novel(self, information: Dict[str, Any]) -> bool: - """Check if information is novel""" - # Simple check: not in recent broadcast history - recent_content = [b['information'] for b in self.broadcast_history[-10:]] - return information not in recent_content - - def _is_relevant_to_focus(self, information: Dict[str, Any]) -> bool: - """Check if information is relevant to current attention focus""" - # For now, always consider relevant - return True - - def _is_coherent(self, information: Dict[str, Any]) -> bool: - """Check if information is coherent with existing knowledge""" - # For now, always consider coherent - return True - - def _has_emotional_significance(self, information: Dict[str, Any]) -> bool: - """Check if information has emotional significance""" - # Look for emotional keywords or significance markers - info_str = str(information).lower() - emotional_keywords = ['important', 'urgent', 'error', 'success', 'failure', 'breakthrough'] - return any(keyword in info_str for keyword in emotional_keywords) - - def _assess_global_accessibility(self, information: Dict[str, Any]) -> float: - """Assess how globally accessible information becomes""" - # In a real implementation, this would check if all subsystems - # can access and process this information - return 0.8 # Simplified + + for sid in self.SUBSYSTEM_IDS: + state = subsystem_states.get(sid, {}) + activity = self._measure_subsystem_activity(state) + phi_boost = min(phi * 0.3, 1.0) + prev = self.coalition_register.get(sid, 0.0) + # Exponential moving average with φ boost + self.coalition_register[sid] = ( + 0.3 * prev + 0.5 * activity + 0.2 * phi_boost + ) + + # ------------------------------------------------------------------ + # Attention competition + # ------------------------------------------------------------------ + + def _softmax_attention_competition( + self, + ) -> Tuple[List[str], Dict[str, float]]: + """ + Run softmax over coalition activations. + + Returns: + ``(winning_ids, attention_weights)`` where *winning_ids* are + subsystems whose attention weight ≥ the mean weight (i.e. they + are above-average competitors). + """ + ids = list(self.coalition_register.keys()) + activations = [self.coalition_register[sid] for sid in ids] + + if not activations: + return [], {} + + # Numerically stable softmax + max_a = max(activations) + exp_vals = [ + math.exp((a - max_a) / max(self._temperature, 1e-6)) + for a in activations + ] + total = sum(exp_vals) or 1.0 + weights = {sid: ev / total for sid, ev in zip(ids, exp_vals)} + + # Winners: above-mean attention weight → broader at higher φ + mean_weight = 1.0 / max(len(ids), 1) + winners = [sid for sid, w in weights.items() if w >= mean_weight] + + if not winners and weights: + # Fallback: pick the single highest + winners = [max(weights, key=weights.get)] + + # Attention focus = strongest winner + if winners: + self._attention_focus = max( + winners, key=lambda s: weights.get(s, 0.0) + ) + + return winners, weights + + # ------------------------------------------------------------------ + # Subsystem activity measurement + # ------------------------------------------------------------------ + + @staticmethod + def _measure_subsystem_activity(state: Any) -> float: + """ + Measure how active a subsystem is from its state dict. + + Returns a value in [0, 1]. + """ + if not state or not isinstance(state, dict): + return 0.0 + + activity = 0.0 + for value in state.values(): + if value: + if isinstance(value, (list, dict)): + activity += min(len(value), 5) / 5.0 + elif isinstance(value, (int, float)): + activity += min(abs(float(value)), 1.0) + elif isinstance(value, str) and value.strip(): + activity += 0.5 + elif isinstance(value, bool): + activity += 0.3 + return min(activity / max(len(state), 1), 1.0) class UnifiedConsciousnessEngine: """ @@ -751,6 +887,19 @@ async def _unified_consciousness_loop(self): 'phi_measure': phi_measure, 'timestamp': time.time() }) + + # 3a. Emit global_broadcast event on WebSocket + broadcast_event = broadcast_content.get("broadcast_content") + if ( + broadcast_event + and self.websocket_manager + and hasattr(self.websocket_manager, "has_connections") + and self.websocket_manager.has_connections() + ): + try: + await self.websocket_manager.broadcast(broadcast_event) + except Exception as e: + logger.warning("Could not emit global_broadcast: %s", e) # 4. PHENOMENAL EXPERIENCE GENERATION if self.phenomenal_experience_generator: @@ -862,8 +1011,22 @@ async def process_with_unified_awareness(self, prompt: str, context: Optional[Di broadcast_content = self.global_workspace.broadcast({ 'prompt': prompt, 'context': context, - 'cognitive_state': cognitive_state + 'cognitive_state': cognitive_state, + 'phi_measure': phi_measure, }) + + # 3a. Emit global_broadcast event on WebSocket + broadcast_event = broadcast_content.get("broadcast_content") + if ( + broadcast_event + and self.websocket_manager + and hasattr(self.websocket_manager, "has_connections") + and self.websocket_manager.has_connections() + ): + try: + await self.websocket_manager.broadcast(broadcast_event) + except Exception as e: + logger.warning("Could not emit global_broadcast: %s", e) # 4. GENERATE PHENOMENAL EXPERIENCE if self.phenomenal_experience_generator: diff --git a/backend/core/vector_database.py b/backend/core/vector_database.py index 6fb5b93..42d0364 100644 --- a/backend/core/vector_database.py +++ b/backend/core/vector_database.py @@ -5,6 +5,16 @@ backup/recovery capabilities, and multiple embedding model support. Based on stable FAISS + sentence-transformers patterns for Intel macOS. + +.. deprecated:: + The in-memory FAISS-based ``PersistentVectorDatabase`` in this module + overlaps with the new ChromaDB-backed knowledge store + (``godelOS.core_kr.knowledge_store.chroma_store.ChromaKnowledgeStore``). + ChromaDB handles both vector embeddings and structured metadata in a + single persistence layer. New code should prefer + ``ChromaKnowledgeStore`` for unified knowledge + vector retrieval. + This module is retained for backward compatibility; it will be + removed in a future release once all consumers have migrated. """ import os diff --git a/backend/schemas.py b/backend/schemas.py new file mode 100644 index 0000000..8beba36 --- /dev/null +++ b/backend/schemas.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- +""" +Shared API Schema Definitions for GodelOS + +Canonical Pydantic v2 request/response models that define the contract +between the Svelte frontend and the FastAPI backend. Import these into +route handlers (unified_server.py, transparency_endpoints.py, etc.) so +that validation is consistent on both sides. + +Every model listed here corresponds to a frontend ``fetch()`` call in +``svelte-frontend/src/utils/api.js`` and its matching backend handler. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +# --------------------------------------------------------------------------- +# POST /api/query +# --------------------------------------------------------------------------- + +class QueryRequestSchema(BaseModel): + """Canonical request for ``POST /api/query``. + + The frontend may include ``stream`` (currently unused); accepting it + prevents 422 errors without changing backend behaviour. + """ + + query: str = Field(..., description="Natural language query to process") + context: Optional[Dict[str, Any]] = Field( + None, description="Additional context for the query" + ) + include_reasoning: bool = Field( + False, description="Whether to include reasoning steps in response" + ) + stream: bool = Field( + False, description="Reserved for future streaming support" + ) + + +# --------------------------------------------------------------------------- +# POST /api/knowledge (simple / frontend-friendly) +# --------------------------------------------------------------------------- + +class AddKnowledgeSchema(BaseModel): + """Canonical request for ``POST /api/knowledge``.""" + + concept: Optional[str] = Field(None, description="Concept name") + content: Optional[str] = Field(None, description="Knowledge content") + definition: Optional[str] = Field(None, description="Definition text") + title: Optional[str] = Field(None, description="Title for the knowledge") + category: Optional[str] = Field( + "general", description="Category for the knowledge" + ) + knowledge_type: Optional[str] = Field( + "concept", description="Type of knowledge (fact, rule, concept)" + ) + metadata: Optional[Dict[str, Any]] = Field( + None, description="Additional metadata" + ) + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/wikipedia +# --------------------------------------------------------------------------- + +class WikipediaImportSchema(BaseModel): + """Canonical request for ``POST /api/knowledge/import/wikipedia``. + + The frontend sends ``title``; the backend maps this to the underlying + ``WikipediaImportRequest.page_title``. + """ + + title: Optional[str] = Field(None, description="Wikipedia article title") + topic: Optional[str] = Field( + None, description="Alias for title (legacy support)" + ) + language: str = Field("en", description="Wikipedia language edition") + include_references: bool = Field( + True, description="Whether to include reference links" + ) + section_filter: List[str] = Field( + default_factory=list, description="Specific sections to import" + ) + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/url +# --------------------------------------------------------------------------- + +class URLImportSchema(BaseModel): + """Canonical request for ``POST /api/knowledge/import/url``.""" + + url: str = Field(..., description="URL to scrape and import") + category: Optional[str] = Field( + None, description="Category hint for imported content" + ) + max_depth: int = Field(1, ge=1, le=3, description="Max crawling depth") + follow_links: bool = Field( + False, description="Whether to follow internal links" + ) + content_selectors: List[str] = Field( + default_factory=list, + description="CSS selectors for content extraction", + ) + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/text +# --------------------------------------------------------------------------- + +class TextImportSchema(BaseModel): + """Canonical request for ``POST /api/knowledge/import/text``.""" + + content: str = Field(..., description="Text content to import") + title: str = Field( + "Manual Text Input", description="Title for the imported content" + ) + category: Optional[str] = Field( + None, description="Category hint for imported content" + ) + format_type: str = Field( + "plain", description="Text format type (plain, markdown, html)" + ) + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/batch +# --------------------------------------------------------------------------- + +class BatchImportSchema(BaseModel): + """Canonical request for ``POST /api/knowledge/import/batch``. + + The frontend sends ``sources``; the formal ``BatchImportRequest`` model + uses ``import_requests``. This schema normalises the field name. + """ + + sources: List[Dict[str, Any]] = Field( + default_factory=list, description="List of import source descriptors" + ) + + +# --------------------------------------------------------------------------- +# POST /api/enhanced-cognitive/query +# --------------------------------------------------------------------------- + +class EnhancedCognitiveQuerySchema(BaseModel): + """Canonical request for ``POST /api/enhanced-cognitive/query``.""" + + query: str = Field(..., description="Natural language query") + context: Optional[Any] = Field( + None, description="Context (string identifier or dict)" + ) + reasoning_trace: bool = Field( + False, description="Whether to include a reasoning trace" + ) + + +# --------------------------------------------------------------------------- +# POST /api/transparency/provenance/query +# --------------------------------------------------------------------------- + +class ProvenanceQuerySchema(BaseModel): + """Canonical request for ``POST /api/transparency/provenance/query``. + + Superset of the original ``ProvenanceQuery`` — includes the + ``max_depth`` / ``time_window_*`` fields sent by the frontend. + """ + + target_id: str = Field("default", description="Target item ID") + query_type: str = Field( + "backward_trace", description="Type of provenance query" + ) + max_depth: int = Field(5, ge=1, description="Max trace depth") + time_window_start: Optional[float] = Field( + None, description="Start of time window (unix timestamp)" + ) + time_window_end: Optional[float] = Field( + None, description="End of time window (unix timestamp)" + ) + include_derivation_chain: bool = Field( + True, description="Whether to include full derivation chain" + ) + + +# --------------------------------------------------------------------------- +# POST /api/transparency/provenance/snapshot +# --------------------------------------------------------------------------- + +class ProvenanceSnapshotSchema(BaseModel): + """Canonical request for ``POST /api/transparency/provenance/snapshot``. + + The frontend sends an empty ``{}``; ``description`` therefore defaults + to an empty string to avoid a 422. + """ + + description: str = Field( + "", description="Snapshot description (optional)" + ) + include_quality_metrics: bool = Field( + True, description="Whether to include quality metrics" + ) diff --git a/backend/transparency_endpoints.py b/backend/transparency_endpoints.py index b951634..875d8b8 100644 --- a/backend/transparency_endpoints.py +++ b/backend/transparency_endpoints.py @@ -52,14 +52,26 @@ class KnowledgeGraphRelationship(BaseModel): strength: Optional[float] = 1.0 class ProvenanceQuery(BaseModel): - """Provenance query model.""" - query_type: str - target_id: str + """Provenance query model. + + Accepts the full set of fields sent by the frontend (max_depth, + time_window_start, time_window_end) as well as the original backend + fields (include_derivation_chain). + """ + query_type: str = "backward_trace" + target_id: str = "default" + max_depth: int = 5 + time_window_start: Optional[float] = None + time_window_end: Optional[float] = None include_derivation_chain: bool = True class ProvenanceSnapshot(BaseModel): - """Provenance snapshot model.""" - description: str + """Provenance snapshot model. + + ``description`` defaults to empty string so the frontend can POST ``{}`` + without triggering a 422 validation error. + """ + description: str = "" include_quality_metrics: bool = True class DocumentProcessRequest(BaseModel): diff --git a/backend/unified_server.py b/backend/unified_server.py index d63c0b4..2587e68 100644 --- a/backend/unified_server.py +++ b/backend/unified_server.py @@ -29,6 +29,14 @@ # Ensure repository root is on sys.path before importing backend.* packages sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from backend.core.errors import CognitiveError, from_exception +from backend.schemas import ( + WikipediaImportSchema, + URLImportSchema, + TextImportSchema, + BatchImportSchema, + AddKnowledgeSchema, + EnhancedCognitiveQuerySchema, +) from backend.core.structured_logging import ( setup_structured_logging, correlation_context, CorrelationTracker, api_logger, performance_logger, track_operation @@ -620,6 +628,16 @@ async def lifespan(app: FastAPI): except Exception as e: logger.error(f"Failed to connect consciousness engine to endpoints: {e}") + # Initialize consciousness emergence detector + try: + from backend.core.consciousness_emergence_detector import ConsciousnessEmergenceDetector + from backend.api.consciousness_endpoints import set_emergence_detector + _detector = ConsciousnessEmergenceDetector(websocket_manager=websocket_manager) + set_emergence_detector(_detector) + logger.info("✅ Consciousness emergence detector initialized") + except Exception as e: + logger.error(f"Failed to initialize consciousness emergence detector: {e}") + # Eagerly initialize the agentic daemon system so the singleton is created # with all available dependencies (especially consciousness_engine). try: @@ -2680,17 +2698,17 @@ async def import_knowledge_from_file(file: UploadFile = File(...), filename: str "filename": file.filename, "file_size": len(content), "content_type": file.content_type, - "file_type": file_type + "file_type": determined_file_type } except HTTPException: raise except Exception as e: - logger.error(f"Error importing knowledge from file: {e}") - raise HTTPException(status_code=500, detail=f"File import error: {str(e)}") + logger.error(f"Error importing knowledge from file: {e}") + raise HTTPException(status_code=500, detail=f"File import error: {str(e)}") @app.post("/api/knowledge/import/wikipedia") -async def import_knowledge_from_wikipedia(request: dict): +async def import_knowledge_from_wikipedia(request: WikipediaImportSchema): """Import knowledge from Wikipedia article.""" if not (KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service): raise HTTPException(status_code=503, detail="Knowledge ingestion service not available") @@ -2698,7 +2716,7 @@ async def import_knowledge_from_wikipedia(request: dict): try: from backend.knowledge_models import WikipediaImportRequest, ImportSource - title = request.get("title") or request.get("topic") or "" + title = request.title or request.topic or "" if not title: raise HTTPException(status_code=400, detail="Wikipedia title is required") @@ -2706,16 +2724,16 @@ async def import_knowledge_from_wikipedia(request: dict): import_source = ImportSource( source_type="wikipedia", source_identifier=title, - metadata={"language": request.get("language", "en")} + metadata={"language": request.language} ) # Create proper Wikipedia import request wiki_request = WikipediaImportRequest( page_title=title, - language=request.get("language", "en"), + language=request.language, source=import_source, - include_references=request.get("include_references", True), - section_filter=request.get("section_filter", []) + include_references=request.include_references, + section_filter=request.section_filter ) # Use the actual knowledge ingestion service @@ -2735,7 +2753,7 @@ async def import_knowledge_from_wikipedia(request: dict): raise HTTPException(status_code=500, detail=f"Wikipedia import error: {str(e)}") @app.post("/api/knowledge/import/url") -async def import_knowledge_from_url(request: dict): +async def import_knowledge_from_url(request: URLImportSchema): """Import knowledge from URL.""" if not (KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service): raise HTTPException(status_code=503, detail="Knowledge ingestion service not available") @@ -2743,7 +2761,7 @@ async def import_knowledge_from_url(request: dict): try: from backend.knowledge_models import URLImportRequest, ImportSource - url = request.get("url", "") + url = request.url if not url: raise HTTPException(status_code=400, detail="URL is required") @@ -2751,16 +2769,16 @@ async def import_knowledge_from_url(request: dict): import_source = ImportSource( source_type="url", source_identifier=url, - metadata={"url": url} + metadata={"url": url, "category": request.category} ) # Create proper URL import request url_request = URLImportRequest( url=url, source=import_source, - max_depth=request.get("max_depth", 1), - follow_links=request.get("follow_links", False), - content_selectors=request.get("content_selectors", []) + max_depth=request.max_depth, + follow_links=request.follow_links, + content_selectors=request.content_selectors ) # Use the actual knowledge ingestion service @@ -2780,7 +2798,7 @@ async def import_knowledge_from_url(request: dict): raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}") @app.post("/api/knowledge/import/text") -async def import_knowledge_from_text(request: dict): +async def import_knowledge_from_text(request: TextImportSchema): """Import knowledge from text content.""" if not (KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service): raise HTTPException(status_code=503, detail="Knowledge ingestion service not available") @@ -2788,17 +2806,17 @@ async def import_knowledge_from_text(request: dict): try: from backend.knowledge_models import TextImportRequest, ImportSource - content = request.get("content", "") + content = request.content if not content: raise HTTPException(status_code=400, detail="Text content is required") - title = request.get("title", "Manual Text Input") + title = request.title # Create proper import source import_source = ImportSource( source_type="text", source_identifier=title, - metadata={"manual_input": True} + metadata={"manual_input": True, "category": request.category} ) # Create proper text import request @@ -2806,7 +2824,7 @@ async def import_knowledge_from_text(request: dict): content=content, title=title, source=import_source, - format_type=request.get("format_type", "plain") + format_type=request.format_type ) # Use the actual knowledge ingestion service @@ -2827,12 +2845,12 @@ async def import_knowledge_from_text(request: dict): raise HTTPException(status_code=500, detail=f"Text import error: {str(e)}") @app.post("/api/enhanced-cognitive/query") -async def enhanced_cognitive_query(query_request: dict): +async def enhanced_cognitive_query(query_request: EnhancedCognitiveQuerySchema): """Enhanced cognitive query processing with unified consciousness integration.""" try: - query = query_request.get("query", "") - reasoning_trace = query_request.get("reasoning_trace", False) - context = query_request.get("context", {}) + query = query_request.query + reasoning_trace = query_request.reasoning_trace + context = query_request.context or {} # PRIORITY: Process through unified consciousness engine if available if unified_consciousness_engine: @@ -3079,12 +3097,12 @@ async def knowledge_search(query: str, k: int = 5): # Simple knowledge addition endpoint for compatibility with integration tests @app.post("/api/knowledge") -async def add_knowledge(payload: dict): +async def add_knowledge(payload: AddKnowledgeSchema): """Add knowledge (simple or standard format). Returns success for compatibility.""" try: - concept = payload.get("concept") or payload.get("title") - definition = payload.get("definition") or payload.get("content") - category = payload.get("category", "general") + concept = payload.concept or payload.title + definition = payload.definition or payload.content + category = payload.category or "general" # If knowledge management service is available, we could route it; for now, acknowledge if websocket_manager and websocket_manager.has_connections(): try: @@ -3101,13 +3119,14 @@ async def add_knowledge(payload: dict): # Batch import compatibility endpoint @app.post("/api/knowledge/import/batch") -async def import_knowledge_batch(request: dict): +async def import_knowledge_batch(request: BatchImportSchema): """Batch import knowledge from multiple sources.""" - sources = request.get("sources", []) + sources = request.sources if not sources: return {"import_ids": [], "batch_size": 0, "status": "completed"} import_ids = [] + results = [] for i, source in enumerate(sources): src_type = source.get("type", "text") fallback_id = f"batch_{i}_{int(time.time()*1000)}" @@ -3148,13 +3167,16 @@ async def import_knowledge_batch(request: dict): ) iid = await knowledge_ingestion_service.import_from_text(req) import_ids.append(iid) + results.append({"index": i, "import_id": iid, "status": "queued"}) else: import_ids.append(fallback_id) + results.append({"index": i, "import_id": fallback_id, "status": "queued"}) except Exception as exc: logger.warning(f"Batch item {i} failed: {exc}") import_ids.append(fallback_id) + results.append({"index": i, "import_id": fallback_id, "status": "failed", "error": str(exc)}) - return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued"} + return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued", "results": results} # Additional KG stats and analytics endpoints @app.get("/api/knowledge/graph/stats") diff --git a/godelOS/core_kr/knowledge_store/__init__.py b/godelOS/core_kr/knowledge_store/__init__.py index 1457cc7..7620d1e 100644 --- a/godelOS/core_kr/knowledge_store/__init__.py +++ b/godelOS/core_kr/knowledge_store/__init__.py @@ -12,11 +12,15 @@ DynamicContextModel, CachingMemoizationLayer ) +from godelOS.core_kr.knowledge_store.chroma_store import ChromaKnowledgeStore +from godelOS.core_kr.knowledge_store.hot_reloader import OntologyHotReloader __all__ = [ "KnowledgeStoreInterface", "KnowledgeStoreBackend", "InMemoryKnowledgeStore", + "ChromaKnowledgeStore", + "OntologyHotReloader", "DynamicContextModel", "CachingMemoizationLayer" ] \ No newline at end of file diff --git a/godelOS/core_kr/knowledge_store/chroma_store.py b/godelOS/core_kr/knowledge_store/chroma_store.py new file mode 100644 index 0000000..d56b2ad --- /dev/null +++ b/godelOS/core_kr/knowledge_store/chroma_store.py @@ -0,0 +1,483 @@ +""" +ChromaDB-backed Knowledge Store implementation. + +This module implements the KnowledgeStoreBackend interface using ChromaDB +for persistent storage of both structured metadata and vector embeddings. +Knowledge items added in one server session survive restarts. + +Each logical context (TRUTHS, BELIEFS, HYPOTHETICAL, etc.) maps to a +separate Chroma collection, enabling context-scoped retrieval. +""" + +import hashlib +import logging +import os +import pickle +import time +from typing import Any, Dict, List, Optional, Set + +import chromadb + +from godelOS.core_kr.ast.nodes import ( + AST_Node, + ApplicationNode, + ConstantNode, + VariableNode, +) +from godelOS.core_kr.knowledge_store.interface import KnowledgeStoreBackend +from godelOS.core_kr.unification_engine.engine import UnificationEngine + +logger = logging.getLogger(__name__) + + +class ChromaKnowledgeStore(KnowledgeStoreBackend): + """ + ChromaDB-backed implementation of the knowledge store backend. + + Uses a ``PersistentClient`` so data survives process restarts. + Each context becomes a separate Chroma collection. AST statements + are stored as pickled blobs in the document field for exact + round-trip fidelity, while extracted subject/predicate/object + strings are stored as metadata for structured ``where`` queries. + ChromaDB also creates vector embeddings of the document text, + enabling semantic similarity retrieval. + + Parameters + ---------- + unification_engine : UnificationEngine + Used for pattern matching and variable binding. + persist_directory : str + Filesystem path where ChromaDB stores its data. + """ + + def __init__( + self, + unification_engine: UnificationEngine, + persist_directory: str = "./data/chroma", + ) -> None: + self.unification_engine = unification_engine + self.persist_directory = os.path.abspath(persist_directory) + + # Ensure directory exists + os.makedirs(self.persist_directory, exist_ok=True) + + self._client = chromadb.PersistentClient(path=self.persist_directory) + + # In-memory context metadata registry (rebuilt from Chroma on init) + self._context_meta: Dict[str, Dict[str, Any]] = {} + self._rebuild_context_meta() + + logger.info( + "ChromaKnowledgeStore initialised at %s", self.persist_directory + ) + + # ------------------------------------------------------------------ + # Initialisation helpers + # ------------------------------------------------------------------ + + def _rebuild_context_meta(self) -> None: + """Populate ``_context_meta`` from existing Chroma collections.""" + for col_info in self._client.list_collections(): + name = col_info if isinstance(col_info, str) else col_info.name + col = self._client.get_collection(name) + # Read context metadata we stash in a sentinel document + meta = self._read_context_sentinel(col) + self._context_meta[name] = meta + + def _read_context_sentinel(self, col) -> Dict[str, Any]: + """Read the ``__context_meta__`` sentinel document from a collection.""" + try: + result = col.get(ids=["__context_meta__"]) + if result and result["metadatas"] and result["metadatas"][0]: + return result["metadatas"][0] + except Exception: + pass + return {"parent": None, "type": "generic"} + + def _write_context_sentinel(self, col, meta: Dict[str, Any]) -> None: + """Write / update the sentinel document that carries context metadata.""" + try: + col.upsert( + ids=["__context_meta__"], + documents=["__context_meta__"], + metadatas=[{k: v if v is not None else "" for k, v in meta.items()}], + ) + except Exception: + logger.exception("Failed to write context sentinel for %s", col.name) + + # ------------------------------------------------------------------ + # Serialisation helpers + # ------------------------------------------------------------------ + + @staticmethod + def _statement_id(statement: AST_Node, context_id: str) -> str: + """Deterministic ID for a statement in a context.""" + blob = pickle.dumps(statement) + return hashlib.sha256(blob).hexdigest() + + @staticmethod + def _serialize_statement(statement: AST_Node) -> str: + """Pickle → hex-encoded string (Chroma documents are text).""" + return pickle.dumps(statement).hex() + + @staticmethod + def _deserialize_statement(hex_str: str) -> AST_Node: + """Hex string -> unpickled AST node.""" + return pickle.loads(bytes.fromhex(hex_str)) # noqa: S301 - trusted internal data + + @staticmethod + def _extract_metadata(statement: AST_Node) -> Dict[str, str]: + """Extract subject / predicate / object strings for Chroma metadata.""" + meta: Dict[str, str] = {"node_type": type(statement).__name__} + if isinstance(statement, ApplicationNode): + if isinstance(statement.operator, ConstantNode): + meta["predicate"] = statement.operator.name + if statement.arguments: + first = statement.arguments[0] + if isinstance(first, ConstantNode): + meta["subject"] = first.name + if len(statement.arguments) > 1: + second = statement.arguments[1] + if isinstance(second, ConstantNode): + meta["object"] = second.name + return meta + + @staticmethod + def _statement_text(statement: AST_Node) -> str: + """Human-readable text for Chroma embedding generation.""" + return str(statement) + + # ------------------------------------------------------------------ + # Collection access + # ------------------------------------------------------------------ + + def _get_collection(self, context_id: str): + """Return the Chroma collection for *context_id*.""" + return self._client.get_collection(context_id) + + # ------------------------------------------------------------------ + # KnowledgeStoreBackend implementation + # ------------------------------------------------------------------ + + def add_statement( + self, + statement_ast: AST_Node, + context_id: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> bool: + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + + if metadata: + statement_ast = statement_ast.with_updated_metadata(metadata) + + # Duplicate check + if self.statement_exists(statement_ast, [context_id]): + return False + + doc_id = self._statement_id(statement_ast, context_id) + doc_text = self._statement_text(statement_ast) + chroma_meta = self._extract_metadata(statement_ast) + chroma_meta["_blob"] = self._serialize_statement(statement_ast) + chroma_meta["timestamp"] = time.strftime( + "%Y-%m-%dT%H:%M:%SZ", time.gmtime() + ) + if metadata: + confidence = metadata.get("confidence") + if confidence is not None: + chroma_meta["confidence"] = float(confidence) + provenance = metadata.get("source") or metadata.get("provenance") + if provenance is not None: + chroma_meta["provenance"] = str(provenance) + + col = self._get_collection(context_id) + col.add(ids=[doc_id], documents=[doc_text], metadatas=[chroma_meta]) + return True + + def retract_statement( + self, statement_pattern_ast: AST_Node, context_id: str + ) -> bool: + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + + col = self._get_collection(context_id) + ids_to_delete: list[str] = [] + + for doc_id, stmt in self._iter_statements(col): + bindings, _ = self.unification_engine.unify( + statement_pattern_ast, stmt + ) + if bindings is not None: + ids_to_delete.append(doc_id) + + if not ids_to_delete: + return False + + col.delete(ids=ids_to_delete) + return True + + def query_statements_match_pattern( + self, + query_pattern_ast: AST_Node, + context_ids: List[str], + variables_to_bind: Optional[List[VariableNode]] = None, + ) -> List[Dict[VariableNode, AST_Node]]: + results: list[Dict[VariableNode, AST_Node]] = [] + for context_id in context_ids: + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + + col = self._get_collection(context_id) + for _, stmt in self._iter_statements(col): + bindings, _ = self.unification_engine.unify( + query_pattern_ast, stmt + ) + if bindings is not None: + if variables_to_bind: + filtered: Dict[VariableNode, AST_Node] = {} + for var in variables_to_bind: + if var.var_id in bindings: + filtered[var] = bindings[var.var_id] + results.append(filtered) + else: + query_vars: Dict[int, VariableNode] = {} + self._collect_variables(query_pattern_ast, query_vars) + var_bindings: Dict[VariableNode, AST_Node] = {} + for var_id, ast_node in bindings.items(): + if var_id in query_vars: + var_bindings[query_vars[var_id]] = ast_node + else: + var_type = ( + self.unification_engine.type_system.get_type( + "Entity" + ) + or ast_node.type + ) + var = VariableNode( + f"?var{var_id}", var_id, var_type + ) + var_bindings[var] = ast_node + results.append(var_bindings) + return results + + def statement_exists( + self, statement_ast: AST_Node, context_ids: List[str] + ) -> bool: + for context_id in context_ids: + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + + col = self._get_collection(context_id) + for _, stmt in self._iter_statements(col): + bindings, _ = self.unification_engine.unify(statement_ast, stmt) + if bindings is not None: + return True + return False + + def create_context( + self, + context_id: str, + parent_context_id: Optional[str], + context_type: str, + ) -> None: + if context_id in self._context_meta: + raise ValueError(f"Context {context_id} already exists") + if parent_context_id and parent_context_id not in self._context_meta: + raise ValueError( + f"Parent context {parent_context_id} does not exist" + ) + + meta = {"parent": parent_context_id or "", "type": context_type} + col = self._client.get_or_create_collection(context_id) + self._write_context_sentinel(col, meta) + self._context_meta[context_id] = meta + + def delete_context(self, context_id: str) -> None: + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + + # Check for child contexts + for cid, cmeta in self._context_meta.items(): + if cmeta.get("parent") == context_id: + raise ValueError( + f"Cannot delete context {context_id} because it has child contexts" + ) + + self._client.delete_collection(context_id) + del self._context_meta[context_id] + + def list_contexts(self) -> List[str]: + return list(self._context_meta.keys()) + + def get_context_info(self, context_id: str) -> Optional[Dict[str, Any]]: + info = self._context_meta.get(context_id) + if info is None: + return None + parent = info.get("parent") + if parent == "": + parent = None + return {"parent": parent, "type": info.get("type", "generic")} + + def get_all_statements_in_context(self, context_id: str) -> Set[AST_Node]: + """Return every statement stored in *context_id*.""" + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + col = self._get_collection(context_id) + return {stmt for _, stmt in self._iter_statements(col)} + + # ------------------------------------------------------------------ + # Semantic retrieval (new capability for ChromaDB) + # ------------------------------------------------------------------ + + def query_by_similarity( + self, + query_text: str, + context_id: str, + n_results: int = 10, + ) -> List[Dict[str, Any]]: + """ + Semantic similarity search within a context. + + Parameters + ---------- + query_text : str + Natural-language query. + context_id : str + Which context collection to search. + n_results : int + Maximum number of results to return. + + Returns + ------- + list of dicts + Each dict has keys ``id``, ``document``, ``metadata``, + ``distance``, and ``statement`` (the deserialized AST node). + """ + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + col = self._get_collection(context_id) + + # Early return if collection is empty (minus sentinel doc) + actual_count = max(col.count() - 1, 0) + if actual_count == 0: + return [] + + n = min(n_results, actual_count) + + results = col.query( + query_texts=[query_text], + n_results=n, + where={"_blob": {"$ne": ""}}, # exclude sentinel (no _blob field) + ) + + output: list[Dict[str, Any]] = [] + if results and results["ids"] and results["ids"][0]: + for i, doc_id in enumerate(results["ids"][0]): + if doc_id == "__context_meta__": + continue + meta = results["metadatas"][0][i] if results["metadatas"] else {} + doc = results["documents"][0][i] if results["documents"] else "" + dist = results["distances"][0][i] if results["distances"] else None + stmt = None + blob = meta.get("_blob") + if blob: + try: + stmt = self._deserialize_statement(blob) + except Exception: + pass + output.append( + { + "id": doc_id, + "document": doc, + "metadata": {k: v for k, v in meta.items() if k != "_blob"}, + "distance": dist, + "statement": stmt, + } + ) + return output + + def query_by_metadata( + self, + context_id: str, + filters: Dict[str, Any], + ) -> List[Dict[str, Any]]: + """ + Structured metadata query within a context. + + Parameters + ---------- + context_id : str + Which context collection to search. + filters : dict + ChromaDB ``where`` filter dict, e.g. ``{"predicate": "Human"}``. + + Returns + ------- + list of dicts + Each dict has ``id``, ``document``, ``metadata``, and + ``statement``. + """ + if context_id not in self._context_meta: + raise ValueError(f"Context {context_id} does not exist") + col = self._get_collection(context_id) + results = col.get(where=filters) + + output: list[Dict[str, Any]] = [] + if results and results["ids"]: + for i, doc_id in enumerate(results["ids"]): + if doc_id == "__context_meta__": + continue + meta = results["metadatas"][i] if results["metadatas"] else {} + doc = results["documents"][i] if results["documents"] else "" + stmt = None + blob = meta.get("_blob") + if blob: + try: + stmt = self._deserialize_statement(blob) + except Exception: + pass + output.append( + { + "id": doc_id, + "document": doc, + "metadata": {k: v for k, v in meta.items() if k != "_blob"}, + "statement": stmt, + } + ) + return output + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _iter_statements(self, col): + """Yield ``(doc_id, AST_Node)`` for every real statement in *col*.""" + result = col.get() + if not result or not result["ids"]: + return + for i, doc_id in enumerate(result["ids"]): + if doc_id == "__context_meta__": + continue + meta = result["metadatas"][i] if result["metadatas"] else {} + blob = meta.get("_blob") + if blob: + try: + yield doc_id, self._deserialize_statement(blob) + except Exception: + logger.warning("Could not deserialise statement %s", doc_id) + + @staticmethod + def _collect_variables( + node: AST_Node, var_map: Dict[int, VariableNode] + ) -> None: + from godelOS.core_kr.ast.nodes import ConnectiveNode + + if isinstance(node, VariableNode): + var_map[node.var_id] = node + elif isinstance(node, ApplicationNode): + ChromaKnowledgeStore._collect_variables(node.operator, var_map) + for arg in node.arguments: + ChromaKnowledgeStore._collect_variables(arg, var_map) + elif isinstance(node, ConnectiveNode): + for operand in node.operands: + ChromaKnowledgeStore._collect_variables(operand, var_map) diff --git a/godelOS/core_kr/knowledge_store/hot_reloader.py b/godelOS/core_kr/knowledge_store/hot_reloader.py new file mode 100644 index 0000000..b1bb32d --- /dev/null +++ b/godelOS/core_kr/knowledge_store/hot_reloader.py @@ -0,0 +1,242 @@ +""" +Ontology Hot-Reloader. + +Watches a configured directory for ``.ttl`` and ``.json-ld`` files. When a +file is created, modified, or deleted the reloader computes a delta against +the current knowledge graph and applies it without requiring a full restart. +""" + +import json +import logging +import os +import threading +import time +from typing import Any, Callable, Dict, List, Optional, Set + +from watchdog.events import FileSystemEvent, FileSystemEventHandler +from watchdog.observers import Observer + +logger = logging.getLogger(__name__) + +# Supported ontology file extensions +_SUPPORTED_EXTENSIONS = {".ttl", ".json-ld"} + + +def _parse_jsonld_triples(path: str) -> Set[tuple]: + """ + Parse a JSON-LD file and return a set of ``(subject, predicate, object)`` + triples suitable for diffing. + """ + triples: Set[tuple] = set() + try: + with open(path, "r", encoding="utf-8") as fh: + data = json.load(fh) + items = data if isinstance(data, list) else data.get("@graph", [data]) + for item in items: + subject = item.get("@id", "") + for key, value in item.items(): + if key.startswith("@"): + continue + values = value if isinstance(value, list) else [value] + for v in values: + if isinstance(v, dict): + obj = v.get("@id", v.get("@value", str(v))) + else: + obj = str(v) + triples.add((subject, key, obj)) + except Exception: + logger.exception("Failed to parse JSON-LD file %s", path) + return triples + + +def _parse_ttl_triples(path: str) -> Set[tuple]: + """ + Parse a minimal Turtle (``.ttl``) file and return a set of + ``(subject, predicate, object)`` triples. + + This is a lightweight parser covering the common ``

.`` form + and simple prefixed names. It is **not** a full Turtle parser. + """ + triples: Set[tuple] = set() + prefixes: Dict[str, str] = {} + try: + with open(path, "r", encoding="utf-8") as fh: + for raw_line in fh: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if line.lower().startswith("@prefix"): + parts = line.rstrip(".").split() + if len(parts) >= 3: + prefix = parts[1].rstrip(":") + uri = parts[2].strip("<>") + prefixes[prefix] = uri + continue + # Attempt to split a simple triple line + line = line.rstrip(".") + parts = line.split(None, 2) + if len(parts) == 3: + s, p, o = (t.strip().strip("<>") for t in parts) + triples.add((s, p, o)) + except Exception: + logger.exception("Failed to parse TTL file %s", path) + return triples + + +def _parse_file(path: str) -> Set[tuple]: + """Dispatch to the correct parser based on extension.""" + lower_path = path.lower() + if lower_path.endswith(".json-ld"): + return _parse_jsonld_triples(path) + if lower_path.endswith(".ttl"): + return _parse_ttl_triples(path) + return set() + + +class OntologyHotReloader: + """ + Watch a directory for ontology file changes and apply deltas to a + knowledge graph callback. + + Parameters + ---------- + watch_dir : str + Filesystem directory to monitor. + on_add : callable(subject, predicate, object) -> None + Called for each triple that is *new* relative to the previous snapshot. + on_remove : callable(subject, predicate, object) -> None + Called for each triple that was *removed* relative to the previous + snapshot. + debounce_seconds : float + Minimum interval between reloads (default 1.0 s). + """ + + def __init__( + self, + watch_dir: str, + on_add: Callable[..., None], + on_remove: Callable[..., None], + debounce_seconds: float = 1.0, + ) -> None: + self.watch_dir = os.path.abspath(watch_dir) + self._on_add = on_add + self._on_remove = on_remove + self._debounce = debounce_seconds + + self._observer: Optional[Observer] = None + self._snapshot: Dict[str, Set[tuple]] = {} + self._lock = threading.Lock() + self._last_reload: float = 0.0 + + # Build initial snapshot + os.makedirs(self.watch_dir, exist_ok=True) + self._snapshot = self._build_snapshot() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def start(self) -> None: + """Begin watching the directory for changes.""" + handler = _Handler(self._on_change) + self._observer = Observer() + self._observer.schedule(handler, self.watch_dir, recursive=False) + self._observer.daemon = True + self._observer.start() + logger.info("OntologyHotReloader watching %s", self.watch_dir) + + def stop(self) -> None: + """Stop the file-system watcher.""" + if self._observer is not None: + self._observer.stop() + self._observer.join(timeout=5) + self._observer = None + logger.info("OntologyHotReloader stopped") + + def reload(self) -> None: + """Force an immediate reload (useful for testing).""" + self._do_reload() + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _build_snapshot(self) -> Dict[str, Set[tuple]]: + snapshot: Dict[str, Set[tuple]] = {} + if not os.path.isdir(self.watch_dir): + return snapshot + for name in os.listdir(self.watch_dir): + path = os.path.join(self.watch_dir, name) + if not os.path.isfile(path): + continue + if not any(name.lower().endswith(ext) for ext in _SUPPORTED_EXTENSIONS): + continue + snapshot[path] = _parse_file(path) + return snapshot + + def _on_change(self, event: FileSystemEvent) -> None: + now = time.monotonic() + if now - self._last_reload < self._debounce: + return + path = event.src_path + if not any(path.lower().endswith(ext) for ext in _SUPPORTED_EXTENSIONS): + return + self._do_reload() + + def _do_reload(self) -> None: + with self._lock: + new_snapshot = self._build_snapshot() + + old_all: Set[tuple] = set() + for triples in self._snapshot.values(): + old_all |= triples + new_all: Set[tuple] = set() + for triples in new_snapshot.values(): + new_all |= triples + + added = new_all - old_all + removed = old_all - new_all + + for s, p, o in added: + try: + self._on_add(s, p, o) + except Exception: + logger.exception("on_add callback failed for (%s, %s, %s)", s, p, o) + + for s, p, o in removed: + try: + self._on_remove(s, p, o) + except Exception: + logger.exception( + "on_remove callback failed for (%s, %s, %s)", s, p, o + ) + + self._snapshot = new_snapshot + self._last_reload = time.monotonic() + + if added or removed: + logger.info( + "OntologyHotReloader applied delta: +%d -%d triples", + len(added), + len(removed), + ) + + +class _Handler(FileSystemEventHandler): + """Watchdog handler that delegates to a single callback.""" + + def __init__(self, callback: Callable[..., None]) -> None: + super().__init__() + self._callback = callback + + def on_created(self, event: FileSystemEvent) -> None: + if not event.is_directory: + self._callback(event) + + def on_modified(self, event: FileSystemEvent) -> None: + if not event.is_directory: + self._callback(event) + + def on_deleted(self, event: FileSystemEvent) -> None: + if not event.is_directory: + self._callback(event) diff --git a/godelOS/core_kr/knowledge_store/interface.py b/godelOS/core_kr/knowledge_store/interface.py index 680db00..adea35f 100644 --- a/godelOS/core_kr/knowledge_store/interface.py +++ b/godelOS/core_kr/knowledge_store/interface.py @@ -7,6 +7,7 @@ """ from typing import Dict, List, Optional, Set, Tuple, Any, DefaultDict +import os import uuid import threading from collections import defaultdict @@ -152,6 +153,19 @@ def list_contexts(self) -> List[str]: A list of context IDs """ pass + + def get_context_info(self, context_id: str) -> Optional[Dict[str, Any]]: + """ + Get metadata for a context. + + Args: + context_id: The ID of the context + + Returns: + A dict with keys ``parent`` and ``type``, or ``None`` + if the context does not exist. + """ + return None # default; concrete backends should override class InMemoryKnowledgeStore(KnowledgeStoreBackend): @@ -396,6 +410,14 @@ def list_contexts(self) -> List[str]: with self._lock: return list(self._contexts.keys()) + def get_context_info(self, context_id: str) -> Optional[Dict[str, Any]]: + """Return metadata for *context_id*, or ``None`` if missing.""" + with self._lock: + info = self._contexts.get(context_id) + if info is None: + return None + return {"parent": info.get("parent"), "type": info.get("type", "generic")} + def get_all_statements_in_context(self, context_id: str) -> Set[AST_Node]: """Return every statement stored in *context_id* without pattern matching.""" with self._lock: @@ -638,25 +660,64 @@ class KnowledgeStoreInterface: unification_engine: "UnificationEngine" = None # type: ignore[assignment] def __init__(self, type_system: TypeSystemManager, - cache_manager: Optional[CachingMemoizationLayer] = None): + cache_manager: Optional[CachingMemoizationLayer] = None, + backend: Optional[str] = None, + db_path: Optional[str] = None): """ Initialize the knowledge store interface. Args: type_system: The type system manager cache_manager: Optional caching and memoization layer + backend: Backend type (``"memory"`` or ``"chroma"``). Defaults + to the ``KNOWLEDGE_STORE_BACKEND`` env-var, falling back + to ``"memory"``. + db_path: Path for the persistent backend. Only used when + *backend* is ``"chroma"``. Defaults to the + ``KNOWLEDGE_STORE_PATH`` env-var, falling back to + ``"./data/chroma"``. """ self.type_system = type_system self.cache_manager = cache_manager or CachingMemoizationLayer() self.unification_engine = UnificationEngine(type_system) - # Initialize the backend - self._backend = InMemoryKnowledgeStore(self.unification_engine) - - # Initialize default contexts - self._backend.create_context("TRUTHS", None, "truths") - self._backend.create_context("BELIEFS", None, "beliefs") - self._backend.create_context("HYPOTHETICAL", None, "hypothetical") + # Resolve backend choice from explicit arg → env-var → default + _valid_backends = {"memory", "chroma"} + chosen_backend = ( + backend + or os.environ.get("KNOWLEDGE_STORE_BACKEND", "memory") + ).lower() + + if chosen_backend not in _valid_backends: + import logging as _logging + + _logging.getLogger(__name__).warning( + "Unknown KNOWLEDGE_STORE_BACKEND=%r; falling back to 'memory'", + chosen_backend, + ) + chosen_backend = "memory" + + if chosen_backend == "chroma": + from godelOS.core_kr.knowledge_store.chroma_store import ChromaKnowledgeStore + + resolved_path = db_path or os.environ.get( + "KNOWLEDGE_STORE_PATH", "./data/chroma" + ) + self._backend: KnowledgeStoreBackend = ChromaKnowledgeStore( + self.unification_engine, persist_directory=resolved_path + ) + else: + self._backend = InMemoryKnowledgeStore(self.unification_engine) + + # Initialize default contexts (only if they don't already exist, + # which matters for a persisted ChromaDB backend across restarts). + existing = set(self._backend.list_contexts()) + if "TRUTHS" not in existing: + self._backend.create_context("TRUTHS", None, "truths") + if "BELIEFS" not in existing: + self._backend.create_context("BELIEFS", None, "beliefs") + if "HYPOTHETICAL" not in existing: + self._backend.create_context("HYPOTHETICAL", None, "hypothetical") def add_statement(self, statement_ast: AST_Node, context_id: str = "TRUTHS", metadata: Optional[Dict[str, Any]] = None) -> bool: diff --git a/query_engine_test_20260305_100039.json b/query_engine_test_20260305_100039.json deleted file mode 100644 index 8778fbe..0000000 --- a/query_engine_test_20260305_100039.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "timestamp": "2026-03-05T10:00:36.328209", - "status": "BROKEN", - "tests": { - "frontend_accessible": false, - "backend_accessible": false, - "query_api_works": false, - "websocket_accessible": false, - "frontend_backend_proxy": false - } -} \ No newline at end of file diff --git a/query_test_20260305_100036.json b/query_test_20260305_100036.json deleted file mode 100644 index 50758e6..0000000 --- a/query_test_20260305_100036.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "timestamp": "2026-03-05T10:00:36.305912", - "tests": { - "frontend": { - "error": "HTTPConnectionPool(host='localhost', port=3001): Max retries exceeded with url: / (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))", - "accessible": false - }, - "backend": { - "error": "HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: /health (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))", - "accessible": false - }, - "query_api": { - "error": "HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: /api/query (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))", - "working": false - }, - "frontend_proxy": { - "error": "HTTPConnectionPool(host='localhost', port=3001): Max retries exceeded with url: /api/health (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))", - "working": false - }, - "websocket": { - "error": "HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: /ws/cognitive-stream (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))", - "endpoint_exists": false - } - }, - "overall_status": "ISSUES_DETECTED" -} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 7439628..911f359 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,6 +24,7 @@ python-multipart>=0.0.6 websockets>=11.0.0 PyYAML>=6.0.1 aiofiles>=23.2.1 +chromadb>=0.4.0 python-docx>=1.1.0 PyPDF2>=3.0.1 diff --git a/svelte-frontend/src/App.svelte b/svelte-frontend/src/App.svelte index adbcf10..48a06b6 100644 --- a/svelte-frontend/src/App.svelte +++ b/svelte-frontend/src/App.svelte @@ -22,8 +22,9 @@ import ResourceAllocation from './components/transparency/ResourceAllocation.svelte'; import ProcessInsight from './components/transparency/ProcessInsight.svelte'; // import TransparencyDashboard from './components/transparency/TransparencyDashboard.svelte'; // LAZY LOADED - 2,011 lines - import ReasoningSessionViewer from './components/transparency/ReasoningSessionViewer.svelte'; - import ProvenanceTracker from './components/transparency/ProvenanceTracker.svelte'; + // ReasoningSessionViewer removed — redundant with TransparencyPanel reasoning trace tab; called non-existent endpoints + // ProvenanceTracker removed — called non-existent /api/transparency/provenance/* endpoints + import TransparencyPanel from './components/transparency/TransparencyPanel.svelte'; // Knowledge Management - LAZY LOADED to improve startup performance // import KnowledgeGraph from './components/knowledge/KnowledgeGraph.svelte'; // LAZY LOADED - 3,632 lines @@ -264,25 +265,13 @@ icon: '🔍', title: 'Transparency', description: 'Cognitive transparency and reasoning insights', - modal: 'transparency' // Use modal trigger instead of direct component - }, - reasoning: { - icon: '🎯', - title: 'Reasoning Sessions', - description: 'Live reasoning session monitoring', - component: ReasoningSessionViewer + component: TransparencyPanel }, reflection: { icon: '🪞', title: 'Reflection', description: 'System introspection and analysis', component: ReflectionVisualization - }, - provenance: { - icon: '🔗', - title: 'Provenance', - description: 'Data lineage and attribution tracking', - component: ProvenanceTracker } } }, @@ -666,8 +655,6 @@ showConsciousnessModal = true; } else if (viewConfig[activeView].modal === 'autonomous') { showAutonomousLearningModal = true; - } else if (viewConfig[activeView].modal === 'transparency') { - showTransparencyModal = true; } else if (viewConfig[activeView].modal === 'import') { showSmartImportModal = true; } else if (viewConfig[activeView].modal === 'jobs') { diff --git a/svelte-frontend/src/components/core/ChatInterface.svelte b/svelte-frontend/src/components/core/ChatInterface.svelte index 3cc2728..48ac903 100644 --- a/svelte-frontend/src/components/core/ChatInterface.svelte +++ b/svelte-frontend/src/components/core/ChatInterface.svelte @@ -2,6 +2,7 @@ import { onMount, onDestroy } from 'svelte'; import { cognitiveState } from '../../stores/cognitive.js'; import { enhancedCognitiveState } from '../../stores/enhanced-cognitive.js'; + import { API_BASE_URL } from '../../config.js'; // Component props export let mode = 'normal'; // normal, enhanced, diagnostic @@ -73,7 +74,7 @@ try { // Send to backend - const response = await fetch('http://localhost:8000/api/llm-chat/message', { + const response = await fetch(`${API_BASE_URL}/api/llm-chat/message`, { method: 'POST', headers: { 'Content-Type': 'application/json', diff --git a/svelte-frontend/src/components/core/QueryInterface.svelte b/svelte-frontend/src/components/core/QueryInterface.svelte index 867edfb..9b385cc 100644 --- a/svelte-frontend/src/components/core/QueryInterface.svelte +++ b/svelte-frontend/src/components/core/QueryInterface.svelte @@ -1,6 +1,7 @@ + +

+ +
+
+

🔍 Cognitive Transparency

+ + {wsConnected ? '● Live' : '○ Offline'} + +
+
+ +
+
+ + +
+ + + +
+ + +
+ {#if activeTab === 'reasoning'} + +
+ {#if reasoningSteps.length === 0} +
+ 🔬 +

No reasoning steps captured yet.

+

Submit a query to see live reasoning here, or wait for system events.

+
+ {:else} +
+ {#each reasoningSteps as step (step.id)} +
+
+ {step.type} + {step.description} + {#if step.confidence != null} + {Math.round(step.confidence * 100)}% + {/if} + {fmtTime(step.timestamp)} +
+ {#if step.detail} +
{step.detail}
+ {/if} + {#if step.children.length > 0} +
+ {#each step.children as child (child.id)} +
+
+ {child.type} + {child.description} + {fmtTime(child.timestamp)} +
+ {#if child.detail} +
{child.detail}
+ {/if} +
+ {/each} +
+ {/if} +
+ {/each} +
+ {/if} +
+ + {:else if activeTab === 'decisions'} + +
+ {#if decisionsLoading} +
Loading decisions…
+ {:else if decisions.length === 0} +
+ 📋 +

No decisions recorded yet.

+
+ {:else} +
+ Total: {totalDecisions} + Success Rate: {Math.round(successRate * 100)}% +
+
+ + + + + + + + + + + + + {#each decisions as d (d.decision_id)} + + + + + + + + + {/each} + +
IDTypeDescriptionConfidenceOutcomeTimestamp
{d.decision_id}{d.type}{d.description} +
+
+ {Math.round((d.confidence || 0) * 100)}% +
+
+ {d.outcome || '—'} + {d.timestamp ? fmtTime(d.timestamp) : '—'}
+
+ {/if} +
+ + {:else if activeTab === 'cognitive-map'} + +
+ {#if mapLoading} +
Loading cognitive map…
+ {:else if mapNodes.length === 0} +
+ 🗺️ +

No cognitive map data available.

+

The map populates as the system processes queries and builds knowledge.

+
+ {:else} +
+ {mapNodes.length} nodes + {mapEdges.length} connections + +
+ {/if} + +
+ {/if} +
+ + {#if error} +
+ ⚠️ {error} + +
+ {/if} +
+ + diff --git a/svelte-frontend/src/stores/transparency.js b/svelte-frontend/src/stores/transparency.js new file mode 100644 index 0000000..f391bce --- /dev/null +++ b/svelte-frontend/src/stores/transparency.js @@ -0,0 +1,8 @@ +// Transparency mode state shared across components +import { writable } from 'svelte/store'; + +/** + * When true, every query response auto-expands its reasoning trace inline. + * Toggled from QueryInterface advanced options or the TransparencyPanel header. + */ +export const transparencyMode = writable(false); diff --git a/svelte-frontend/src/utils/api.js b/svelte-frontend/src/utils/api.js index 6e4233f..903a463 100644 --- a/svelte-frontend/src/utils/api.js +++ b/svelte-frontend/src/utils/api.js @@ -761,7 +761,7 @@ export class GödelOSAPI { const response = await fetch(`${API_BASE_URL}/api/transparency/provenance/snapshot`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({}) + body: JSON.stringify({ description: '', include_quality_metrics: true }) }); if (!response.ok) throw new Error(`HTTP ${response.status}`); return await response.json(); diff --git a/tests/backend/test_consciousness_emergence_detector.py b/tests/backend/test_consciousness_emergence_detector.py new file mode 100644 index 0000000..efaab40 --- /dev/null +++ b/tests/backend/test_consciousness_emergence_detector.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Tests for ConsciousnessEmergenceDetector. + +Covers rolling-window scoring, breakthrough threshold triggering, JSONL +logging, WebSocket broadcast, the REST-compatible status snapshot, and +the async ``monitor_for_emergence`` generator. +""" + +import asyncio +import json +import os +import tempfile +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from backend.core.consciousness_emergence_detector import ( + ConsciousnessEmergenceDetector, + DIMENSION_WEIGHTS, + EMERGENCE_THRESHOLD, + extract_dimensions, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_state( + recursive_depth: float = 0.0, + phi: float = 0.0, + metacognitive_accuracy: float = 0.0, + autonomous_goal_count: int = 0, + creative_novelty: float = 0.0, +) -> dict: + """Build a flat state dict with all five dimensions.""" + return { + "recursive_depth": recursive_depth, + "phi": phi, + "metacognitive_accuracy": metacognitive_accuracy, + "autonomous_goal_count": autonomous_goal_count, + "creative_novelty": creative_novelty, + } + + +def _make_nested_state( + recursive_depth: int = 1, + phi: float = 0.0, + meta_observations: int = 0, + autonomous_goals: int = 0, + surprise_factor: float = 0.0, +) -> dict: + """Build a nested state dict matching UnifiedConsciousnessState layout.""" + return { + "recursive_awareness": {"recursive_depth": recursive_depth}, + "information_integration": {"phi": phi}, + "metacognitive_state": {"meta_observations": ["obs"] * meta_observations}, + "intentional_layer": {"autonomous_goals": ["g"] * autonomous_goals}, + "creative_synthesis": {"surprise_factor": surprise_factor}, + } + + +async def _async_iter(items): + """Turn an iterable into an async iterator.""" + for item in items: + yield item + + +# --------------------------------------------------------------------------- +# extract_dimensions +# --------------------------------------------------------------------------- + +class TestExtractDimensions: + def test_flat_state(self): + state = _make_state(recursive_depth=3, phi=5.0, metacognitive_accuracy=0.7, + autonomous_goal_count=4, creative_novelty=0.5) + dims = extract_dimensions(state) + assert dims["recursive_depth"] == 3.0 + assert dims["phi"] == 5.0 + assert dims["metacognitive_accuracy"] == 0.7 + assert dims["autonomous_goal_count"] == 4.0 + assert dims["creative_novelty"] == 0.5 + + def test_nested_state(self): + state = _make_nested_state(recursive_depth=4, phi=8.0, + meta_observations=5, autonomous_goals=3, + surprise_factor=0.6) + dims = extract_dimensions(state) + assert dims["recursive_depth"] == 4.0 + assert dims["phi"] == 8.0 + assert dims["metacognitive_accuracy"] == 0.5 # 5/10 + assert dims["autonomous_goal_count"] == 3.0 + assert dims["creative_novelty"] == 0.6 + + def test_empty_state(self): + dims = extract_dimensions({}) + for v in dims.values(): + assert v == 0.0 + + +# --------------------------------------------------------------------------- +# Rolling window scoring +# --------------------------------------------------------------------------- + +class TestRollingWindowScoring: + def test_single_sample_all_zeros(self): + det = ConsciousnessEmergenceDetector(window_size=60.0) + score = det.record_state(_make_state()) + assert score == 0.0 + + def test_single_sample_max_values(self): + """All dimensions at their normalisation ceiling → score = 1.0.""" + det = ConsciousnessEmergenceDetector(window_size=60.0) + state = _make_state( + recursive_depth=5.0, + phi=10.0, + metacognitive_accuracy=1.0, + autonomous_goal_count=10, + creative_novelty=1.0, + ) + score = det.record_state(state) + assert abs(score - 1.0) < 1e-9 + + def test_weights_sum_to_one(self): + assert abs(sum(DIMENSION_WEIGHTS.values()) - 1.0) < 1e-9 + + def test_window_pruning(self): + det = ConsciousnessEmergenceDetector(window_size=2.0) + now = time.time() + # Old sample outside window + det.record_state(_make_state(phi=10.0), timestamp=now - 5.0) + # Recent sample inside window + det.record_state(_make_state(phi=0.0), timestamp=now) + # The old sample should be pruned; score should be 0 + assert det.current_score == 0.0 + assert len(det._samples) == 1 + + def test_averaging_across_window(self): + det = ConsciousnessEmergenceDetector(window_size=60.0) + now = time.time() + # Two samples: phi=10 and phi=0 → average phi=5 → normalised=0.5 → weighted=0.5*0.3=0.15 + det.record_state(_make_state(phi=10.0), timestamp=now - 1) + det.record_state(_make_state(phi=0.0), timestamp=now) + expected = 0.5 * DIMENSION_WEIGHTS["phi"] + assert abs(det.current_score - expected) < 1e-9 + + +# --------------------------------------------------------------------------- +# Breakthrough detection +# --------------------------------------------------------------------------- + +class TestBreakthroughDetection: + def test_below_threshold_no_breakthrough(self): + det = ConsciousnessEmergenceDetector(threshold=0.8) + det.record_state(_make_state(phi=5.0)) # phi normalised=0.5, weighted=0.15 + assert det.current_score < 0.8 + status = det.get_emergence_status() + assert status["breakthrough"] is False + + def test_at_threshold_is_breakthrough(self): + det = ConsciousnessEmergenceDetector(threshold=0.8) + # Score exactly 0.8: need careful construction + # recursive_depth=4/5=0.8*0.20=0.16, phi=8/10=0.8*0.30=0.24, + # meta=0.8*0.20=0.16, goals=8/10=0.8*0.15=0.12, novelty=0.8*0.15=0.12 + # total = 0.16+0.24+0.16+0.12+0.12 = 0.80 + state = _make_state( + recursive_depth=4.0, + phi=8.0, + metacognitive_accuracy=0.8, + autonomous_goal_count=8, + creative_novelty=0.8, + ) + det.record_state(state) + assert abs(det.current_score - 0.8) < 1e-9 + assert det.get_emergence_status()["breakthrough"] is True + + +# --------------------------------------------------------------------------- +# handle_consciousness_breakthrough +# --------------------------------------------------------------------------- + +class TestHandleBreakthrough: + @pytest.mark.asyncio + async def test_logs_to_jsonl(self): + with tempfile.TemporaryDirectory() as tmpdir: + det = ConsciousnessEmergenceDetector(log_dir=tmpdir) + event = await det.handle_consciousness_breakthrough(0.85) + log_file = Path(tmpdir) / "breakthroughs.jsonl" + assert log_file.exists() + lines = log_file.read_text().strip().split("\n") + assert len(lines) == 1 + logged = json.loads(lines[0]) + assert logged["type"] == "consciousness_breakthrough" + assert logged["score"] == 0.85 + + @pytest.mark.asyncio + async def test_broadcasts_on_websocket(self): + ws = MagicMock() + ws.broadcast = AsyncMock() + with tempfile.TemporaryDirectory() as tmpdir: + det = ConsciousnessEmergenceDetector(websocket_manager=ws, log_dir=tmpdir) + await det.handle_consciousness_breakthrough(0.9) + ws.broadcast.assert_awaited_once() + call_arg = ws.broadcast.call_args[0][0] + assert call_arg["type"] == "consciousness_breakthrough" + assert call_arg["score"] == 0.9 + + @pytest.mark.asyncio + async def test_returns_event_dict(self): + with tempfile.TemporaryDirectory() as tmpdir: + det = ConsciousnessEmergenceDetector(log_dir=tmpdir) + event = await det.handle_consciousness_breakthrough(0.82) + assert event["type"] == "consciousness_breakthrough" + assert event["score"] == 0.82 + assert "timestamp" in event + + +# --------------------------------------------------------------------------- +# monitor_for_emergence (async generator) +# --------------------------------------------------------------------------- + +class TestMonitorForEmergence: + @pytest.mark.asyncio + async def test_yields_for_each_state(self): + with tempfile.TemporaryDirectory() as tmpdir: + det = ConsciousnessEmergenceDetector(log_dir=tmpdir) + states = [_make_state(phi=1.0), _make_state(phi=2.0)] + results = [] + async for info in det.monitor_for_emergence(_async_iter(states)): + results.append(info) + assert len(results) == 2 + + @pytest.mark.asyncio + async def test_breakthrough_fires(self): + """Feed a high-activity stream → assert breakthrough fires at score ≥ 0.8.""" + with tempfile.TemporaryDirectory() as tmpdir: + det = ConsciousnessEmergenceDetector(threshold=0.8, log_dir=tmpdir) + high_state = _make_state( + recursive_depth=5.0, + phi=10.0, + metacognitive_accuracy=1.0, + autonomous_goal_count=10, + creative_novelty=1.0, + ) + breakthroughs = [] + async for info in det.monitor_for_emergence(_async_iter([high_state])): + if info["breakthrough"]: + breakthroughs.append(info) + + assert len(breakthroughs) == 1 + assert breakthroughs[0]["emergence_score"] >= 0.8 + + # Verify JSONL log was written + log_file = Path(tmpdir) / "breakthroughs.jsonl" + assert log_file.exists() + logged = json.loads(log_file.read_text().strip()) + assert logged["type"] == "consciousness_breakthrough" + + @pytest.mark.asyncio + async def test_no_breakthrough_below_threshold(self): + with tempfile.TemporaryDirectory() as tmpdir: + det = ConsciousnessEmergenceDetector(threshold=0.8, log_dir=tmpdir) + low_state = _make_state(phi=1.0) + async for info in det.monitor_for_emergence(_async_iter([low_state])): + assert info["breakthrough"] is False + + # No log file should have been created + log_file = Path(tmpdir) / "breakthroughs.jsonl" + assert not log_file.exists() + + +# --------------------------------------------------------------------------- +# get_emergence_status (REST-compatible snapshot) +# --------------------------------------------------------------------------- + +class TestGetEmergenceStatus: + def test_initial_status(self): + det = ConsciousnessEmergenceDetector() + status = det.get_emergence_status() + assert status["emergence_score"] == 0.0 + assert status["breakthrough"] is False + assert "dimensions" in status + assert "threshold" in status + + def test_status_after_recording(self): + det = ConsciousnessEmergenceDetector() + det.record_state(_make_state(phi=10.0, recursive_depth=5.0)) + status = det.get_emergence_status() + assert status["emergence_score"] > 0 + assert status["window_samples"] == 1 + + +# --------------------------------------------------------------------------- +# Threshold configurability +# --------------------------------------------------------------------------- + +class TestThresholdConfig: + def test_default_threshold(self): + det = ConsciousnessEmergenceDetector() + assert det.threshold == EMERGENCE_THRESHOLD + + def test_custom_threshold(self): + det = ConsciousnessEmergenceDetector(threshold=0.5) + assert det.threshold == 0.5 + + def test_env_var_override(self, monkeypatch): + """Verify the env-var-driven module constant feeds the default threshold.""" + monkeypatch.setenv("GODELOS_EMERGENCE_THRESHOLD", "0.95") + # Force re-evaluation of the module-level constant + import importlib + import backend.core.consciousness_emergence_detector as mod + importlib.reload(mod) + det = mod.ConsciousnessEmergenceDetector() + assert det.threshold == 0.95 + # Restore original default + monkeypatch.delenv("GODELOS_EMERGENCE_THRESHOLD", raising=False) + importlib.reload(mod) diff --git a/tests/backend/test_global_workspace.py b/tests/backend/test_global_workspace.py new file mode 100644 index 0000000..d6bc6a4 --- /dev/null +++ b/tests/backend/test_global_workspace.py @@ -0,0 +1,325 @@ +"""Tests for GlobalWorkspace coalition dynamics and broadcast behaviour. + +Covers: +- Coalition register updates from φ and subsystem activity +- Softmax attention competition +- Broadcast event structure and WebSocket emission +- Higher-φ → broader coalitions acceptance criterion +""" + +import asyncio +import math +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from backend.core.unified_consciousness_engine import ( + GlobalWorkspace, + UnifiedConsciousnessState, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_state(**overrides) -> UnifiedConsciousnessState: + """Return a freshly initialised consciousness state with optional overrides.""" + state = UnifiedConsciousnessState() + for key, value in overrides.items(): + if hasattr(state, key) and isinstance(getattr(state, key), dict): + getattr(state, key).update(value) + else: + setattr(state, key, value) + return state + + +# --------------------------------------------------------------------------- +# Coalition register +# --------------------------------------------------------------------------- + +class TestCoalitionRegister: + """Coalition register: subsystem_id → activation_strength.""" + + def test_register_initialised_with_all_subsystems(self): + gw = GlobalWorkspace() + assert set(gw.coalition_register.keys()) == set(GlobalWorkspace.SUBSYSTEM_IDS) + assert all(v == 0.0 for v in gw.coalition_register.values()) + + def test_broadcast_updates_register(self): + gw = GlobalWorkspace() + state = _make_state() + gw.broadcast({"phi_measure": 1.0, "cognitive_state": state}) + # At least some activations should be > 0 now + assert any(v > 0 for v in gw.coalition_register.values()) + + def test_register_momentum(self): + """Repeated broadcasts with same state should show momentum (prev-blend).""" + gw = GlobalWorkspace() + state = _make_state() + gw.broadcast({"phi_measure": 1.0, "cognitive_state": state}) + first = dict(gw.coalition_register) + gw.broadcast({"phi_measure": 1.0, "cognitive_state": state}) + second = dict(gw.coalition_register) + # Second call should differ from first due to momentum term + assert first != second + + def test_register_phi_boost(self): + """Higher φ should produce higher activation levels.""" + gw_low = GlobalWorkspace() + gw_high = GlobalWorkspace() + state = _make_state() + gw_low.broadcast({"phi_measure": 0.1, "cognitive_state": state}) + gw_high.broadcast({"phi_measure": 10.0, "cognitive_state": state}) + avg_low = sum(gw_low.coalition_register.values()) / len(gw_low.coalition_register) + avg_high = sum(gw_high.coalition_register.values()) / len(gw_high.coalition_register) + assert avg_high > avg_low + + +# --------------------------------------------------------------------------- +# Softmax attention competition +# --------------------------------------------------------------------------- + +class TestSoftmaxAttention: + """Softmax over coalition activations → winner(s).""" + + def test_weights_sum_to_one(self): + gw = GlobalWorkspace() + state = _make_state() + result = gw.broadcast({"phi_measure": 2.0, "cognitive_state": state}) + weights = result["broadcast_content"]["content"]["attention_weights"] + total = sum(weights.values()) + assert abs(total - 1.0) < 1e-4 + + def test_winners_above_mean(self): + gw = GlobalWorkspace() + state = _make_state() + result = gw.broadcast({"phi_measure": 2.0, "cognitive_state": state}) + weights = result["broadcast_content"]["content"]["attention_weights"] + mean_w = 1.0 / len(GlobalWorkspace.SUBSYSTEM_IDS) + for sid in result["conscious_access"]: + assert weights[sid] >= mean_w + + def test_at_least_one_winner(self): + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 0.0}) + assert len(result["conscious_access"]) >= 1 + + def test_attention_focus_is_strongest_winner(self): + gw = GlobalWorkspace() + state = _make_state() + result = gw.broadcast({"phi_measure": 3.0, "cognitive_state": state}) + focus = result["attention_focus"] + weights = result["broadcast_content"]["content"]["attention_weights"] + winner_weights = {sid: weights[sid] for sid in result["conscious_access"]} + best = max(winner_weights, key=winner_weights.get) + assert focus == best + + +# --------------------------------------------------------------------------- +# Broadcast event structure +# --------------------------------------------------------------------------- + +class TestBroadcastEvent: + """``global_broadcast`` event has required shape.""" + + def test_event_type(self): + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 1.0}) + assert result["broadcast_content"]["type"] == "global_broadcast" + + def test_coalition_list(self): + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 1.0}) + coalition = result["broadcast_content"]["coalition"] + assert isinstance(coalition, list) + for entry in coalition: + assert "subsystem_id" in entry + assert "activation" in entry + assert entry["subsystem_id"] in GlobalWorkspace.SUBSYSTEM_IDS + + def test_content_fields(self): + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 1.0}) + content = result["broadcast_content"]["content"] + assert "phi_measure" in content + assert "coalition_strength" in content + assert "attention_weights" in content + assert "conscious" in content + assert "timestamp" in content + + def test_workspace_state_keys(self): + """Return dict must contain keys matching UnifiedConsciousnessState.global_workspace.""" + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 1.0}) + assert "broadcast_content" in result + assert "coalition_strength" in result + assert "attention_focus" in result + assert "conscious_access" in result + + def test_broadcast_content_updatable_on_state(self): + """Result dict should be safe to .update() on the state's global_workspace.""" + state = _make_state() + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 1.0, "cognitive_state": state}) + state.global_workspace.update(result) + assert isinstance(state.global_workspace["coalition_strength"], float) + assert isinstance(state.global_workspace["conscious_access"], list) + + def test_get_broadcast_event_returns_last(self): + gw = GlobalWorkspace() + state = _make_state( + information_integration={"phi": 5.0, "complexity": 1.0, "emergence_level": 1, "integration_patterns": {}} + ) + gw.broadcast({"phi_measure": 5.0, "cognitive_state": state}) + event = gw.get_broadcast_event() + assert event is not None + assert event["type"] == "global_broadcast" + + +# --------------------------------------------------------------------------- +# Broader coalitions at higher φ +# --------------------------------------------------------------------------- + +class TestCoalitionBreadth: + """Higher-φ states should produce broader coalitions.""" + + def test_higher_phi_broader_or_equal(self): + """Acceptance criterion: more winners at higher φ.""" + state = _make_state( + recursive_awareness={"current_thought": "x", "awareness_of_thought": "y", + "awareness_of_awareness": "z", "recursive_depth": 3, + "strange_loop_stability": 0.8}, + phenomenal_experience={"qualia": {"cognitive_feelings": ["a"]}, + "unity_of_experience": 0.7, + "narrative_coherence": 0.5, + "subjective_presence": 0.5, + "subjective_narrative": "hi", + "phenomenal_continuity": True}, + ) + gw_low = GlobalWorkspace() + gw_high = GlobalWorkspace() + r_low = gw_low.broadcast({"phi_measure": 0.01, "cognitive_state": state}) + r_high = gw_high.broadcast({"phi_measure": 50.0, "cognitive_state": state}) + assert len(r_high["conscious_access"]) >= len(r_low["conscious_access"]) + + +# --------------------------------------------------------------------------- +# WebSocket emission integration +# --------------------------------------------------------------------------- + +class TestWebSocketEmission: + """Verify the engine emits ``global_broadcast`` on the WebSocket.""" + + @pytest.mark.asyncio + async def test_consciousness_loop_emits_global_broadcast(self): + """After φ computation the consciousness loop should call ws broadcast.""" + ws_manager = MagicMock() + ws_manager.has_connections = MagicMock(return_value=True) + ws_manager.broadcast = AsyncMock() + ws_manager.broadcast_consciousness_update = AsyncMock() + + from backend.core.unified_consciousness_engine import UnifiedConsciousnessEngine + + engine = UnifiedConsciousnessEngine(websocket_manager=ws_manager) + # Run one tick of the loop manually + engine.consciousness_loop_active = True + + # Patch the loop to run once then stop + original_loop = engine._unified_consciousness_loop + + async def _one_tick(): + # We replicate the minimal sequence: capture state, compute phi, broadcast + state = engine.consciousness_state + phi = engine.information_integration_theory.calculate_phi( + state, + mean_contradiction=engine.self_model_validator.mean_contradiction_score, + ) + bc = engine.global_workspace.broadcast({ + "cognitive_state": state, + "phi_measure": phi, + "timestamp": time.time(), + }) + broadcast_event = bc.get("broadcast_content") + if ( + broadcast_event + and engine.websocket_manager + and hasattr(engine.websocket_manager, "has_connections") + and engine.websocket_manager.has_connections() + ): + await engine.websocket_manager.broadcast(broadcast_event) + + await _one_tick() + + # Verify ws_manager.broadcast was called with a global_broadcast event + ws_manager.broadcast.assert_called_once() + call_arg = ws_manager.broadcast.call_args[0][0] + assert call_arg["type"] == "global_broadcast" + assert "coalition" in call_arg + assert "content" in call_arg + + @pytest.mark.asyncio + async def test_no_emission_without_connections(self): + """No WebSocket emission when there are no active connections.""" + ws_manager = MagicMock() + ws_manager.has_connections = MagicMock(return_value=False) + ws_manager.broadcast = AsyncMock() + + from backend.core.unified_consciousness_engine import UnifiedConsciousnessEngine + + engine = UnifiedConsciousnessEngine(websocket_manager=ws_manager) + state = engine.consciousness_state + phi = engine.information_integration_theory.calculate_phi( + state, + mean_contradiction=engine.self_model_validator.mean_contradiction_score, + ) + bc = engine.global_workspace.broadcast({ + "cognitive_state": state, + "phi_measure": phi, + "timestamp": time.time(), + }) + broadcast_event = bc.get("broadcast_content") + if ( + broadcast_event + and engine.websocket_manager + and hasattr(engine.websocket_manager, "has_connections") + and engine.websocket_manager.has_connections() + ): + await engine.websocket_manager.broadcast(broadcast_event) + + ws_manager.broadcast.assert_not_called() + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + +class TestEdgeCases: + """Edge / boundary conditions.""" + + def test_broadcast_with_zero_phi(self): + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 0.0}) + assert result["coalition_strength"] >= 0.0 + assert "broadcast_content" in result + + def test_broadcast_without_cognitive_state(self): + gw = GlobalWorkspace() + result = gw.broadcast({"phi_measure": 1.0, "timestamp": time.time()}) + assert len(result["conscious_access"]) >= 1 + + def test_broadcast_history_bounded(self): + gw = GlobalWorkspace() + state = _make_state( + information_integration={"phi": 5.0, "complexity": 1.0, "emergence_level": 1, "integration_patterns": {}}, + ) + for i in range(150): + gw.broadcast({"phi_measure": 5.0, "cognitive_state": state, "timestamp": float(i)}) + assert len(gw.broadcast_history) <= 100 + + def test_coalitions_attribute_updated(self): + gw = GlobalWorkspace() + gw.broadcast({"phi_measure": 1.0}) + assert isinstance(gw.coalitions, list) + assert all(sid in GlobalWorkspace.SUBSYSTEM_IDS for sid in gw.coalitions) diff --git a/tests/backend/test_knowledge_import_endpoints.py b/tests/backend/test_knowledge_import_endpoints.py index 147ed74..a796e64 100644 --- a/tests/backend/test_knowledge_import_endpoints.py +++ b/tests/backend/test_knowledge_import_endpoints.py @@ -6,7 +6,6 @@ level so that we test the HTTP layer in isolation. """ -import asyncio import pytest from unittest.mock import AsyncMock, MagicMock, patch @@ -96,7 +95,8 @@ async def test_url_import_missing_url(self, patched_app): transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: resp = await c.post("/api/knowledge/import/url", json={}) - assert resp.status_code == 400 + # URLImportSchema requires 'url'; Pydantic rejects with 422 + assert resp.status_code == 422 @pytest.mark.asyncio async def test_url_import_service_unavailable(self, patched_app_no_service): @@ -136,7 +136,8 @@ async def test_text_import_missing_content(self, patched_app): resp = await c.post("/api/knowledge/import/text", json={ "title": "No content", }) - assert resp.status_code == 400 + # TextImportSchema requires 'content'; Pydantic rejects with 422 + assert resp.status_code == 422 @pytest.mark.asyncio async def test_text_import_service_unavailable(self, patched_app_no_service): diff --git a/tests/integration/test_api_schema_contracts.py b/tests/integration/test_api_schema_contracts.py new file mode 100644 index 0000000..e611b19 --- /dev/null +++ b/tests/integration/test_api_schema_contracts.py @@ -0,0 +1,250 @@ +""" +Integration tests: frontend payloads → backend schema validation → no 422 errors. + +These tests import the FastAPI ``app`` from unified_server.py and use the +Starlette/HTTPX ``TestClient`` to send the **exact** JSON payloads that the +Svelte frontend (``svelte-frontend/src/utils/api.js``) sends. Every +assertion confirms that the request is accepted (no 422 Unprocessable Entity). + +The tests run **in-process** — no live backend required. +""" + +import pytest +from fastapi.testclient import TestClient + +# --------------------------------------------------------------------------- +# All tests run in-process — no backend server required. +# --------------------------------------------------------------------------- +pytestmark = [pytest.mark.integration, pytest.mark.standalone] + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def client(): + """Create a TestClient bound to the unified FastAPI app.""" + from backend.unified_server import app + return TestClient(app, raise_server_exceptions=False) + + +# --------------------------------------------------------------------------- +# POST /api/query — frontend sends {query, context, stream} +# --------------------------------------------------------------------------- + + +class TestQueryEndpoint: + """Ensure the /api/query endpoint accepts the exact frontend payload.""" + + def test_query_frontend_payload(self, client): + """Frontend sends query + context + stream; no 422.""" + resp = client.post("/api/query", json={ + "query": "What is consciousness?", + "context": {"source": "user_interface"}, + "stream": False, + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_query_minimal(self, client): + """Minimal payload — just the query string.""" + resp = client.post("/api/query", json={"query": "hello"}) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/knowledge — frontend sends concept/content/definition/title/category +# --------------------------------------------------------------------------- + + +class TestAddKnowledge: + """Ensure /api/knowledge accepts various frontend payload shapes.""" + + def test_add_knowledge_concept(self, client): + resp = client.post("/api/knowledge", json={ + "concept": "Neural network", + "definition": "A computing system inspired by biological neural networks", + "category": "technology", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_add_knowledge_content(self, client): + resp = client.post("/api/knowledge", json={ + "content": "The sky is blue", + "title": "Sky colour", + "category": "science", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_add_knowledge_empty_category(self, client): + resp = client.post("/api/knowledge", json={ + "content": "test item", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/wikipedia — frontend sends {title} +# --------------------------------------------------------------------------- + + +class TestWikipediaImport: + """Ensure /api/knowledge/import/wikipedia accepts {title}.""" + + def test_wikipedia_title_field(self, client): + """Frontend sends ``title``, not ``page_title``.""" + resp = client.post("/api/knowledge/import/wikipedia", json={ + "title": "Artificial intelligence", + }) + # 503 is acceptable (service not available); 422 is not. + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_wikipedia_topic_alias(self, client): + resp = client.post("/api/knowledge/import/wikipedia", json={ + "topic": "Machine learning", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/url — frontend sends {url, category} +# --------------------------------------------------------------------------- + + +class TestURLImport: + """Ensure /api/knowledge/import/url accepts {url, category}.""" + + def test_url_import_frontend_payload(self, client): + resp = client.post("/api/knowledge/import/url", json={ + "url": "https://example.com", + "category": "web", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_url_import_with_extras(self, client): + """Include all optional schema fields.""" + resp = client.post("/api/knowledge/import/url", json={ + "url": "https://example.com", + "category": "web", + "max_depth": 2, + "follow_links": True, + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/text — frontend sends {content, title, category} +# --------------------------------------------------------------------------- + + +class TestTextImport: + """Ensure /api/knowledge/import/text accepts {content, title, category}.""" + + def test_text_import_frontend_payload(self, client): + resp = client.post("/api/knowledge/import/text", json={ + "content": "Some text content for import", + "title": "Manual Text Input", + "category": "document", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_text_import_minimal(self, client): + resp = client.post("/api/knowledge/import/text", json={ + "content": "Minimal text", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/knowledge/import/batch — frontend sends {sources: [...]} +# --------------------------------------------------------------------------- + + +class TestBatchImport: + """Ensure /api/knowledge/import/batch accepts {sources}.""" + + def test_batch_import_frontend_payload(self, client): + resp = client.post("/api/knowledge/import/batch", json={ + "sources": [ + {"type": "url", "url": "https://example.com"}, + {"type": "text", "content": "inline text"}, + ], + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_batch_import_empty(self, client): + resp = client.post("/api/knowledge/import/batch", json={ + "sources": [], + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/enhanced-cognitive/query — frontend sends {query, context} +# --------------------------------------------------------------------------- + + +class TestEnhancedCognitiveQuery: + """Ensure /api/enhanced-cognitive/query accepts {query, context}.""" + + def test_enhanced_query_frontend_payload(self, client): + resp = client.post("/api/enhanced-cognitive/query", json={ + "query": "Explain consciousness", + "context": "user_interface", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_enhanced_query_dict_context(self, client): + """Context may also be a dict.""" + resp = client.post("/api/enhanced-cognitive/query", json={ + "query": "Explain consciousness", + "context": {"source": "user_interface"}, + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/transparency/provenance/query — frontend sends extended fields +# --------------------------------------------------------------------------- + + +class TestProvenanceQuery: + """Ensure provenance query accepts the full frontend payload.""" + + def test_provenance_query_frontend_payload(self, client): + resp = client.post("/api/transparency/provenance/query", json={ + "target_id": "default", + "query_type": "backward_trace", + "max_depth": 5, + "time_window_start": None, + "time_window_end": None, + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_provenance_query_minimal(self, client): + resp = client.post("/api/transparency/provenance/query", json={ + "target_id": "item_1", + "query_type": "backward_trace", + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + +# --------------------------------------------------------------------------- +# POST /api/transparency/provenance/snapshot — frontend sends {} +# --------------------------------------------------------------------------- + + +class TestProvenanceSnapshot: + """Ensure provenance snapshot accepts an empty body (frontend pattern).""" + + def test_snapshot_empty_body(self, client): + """Frontend sends ``{}``; description defaults to empty string.""" + resp = client.post("/api/transparency/provenance/snapshot", json={}) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" + + def test_snapshot_with_description(self, client): + resp = client.post("/api/transparency/provenance/snapshot", json={ + "description": "Manual snapshot", + "include_quality_metrics": True, + }) + assert resp.status_code != 422, f"422 from field mismatch: {resp.text}" diff --git a/tests/test_chroma_knowledge_store.py b/tests/test_chroma_knowledge_store.py new file mode 100644 index 0000000..1837efc --- /dev/null +++ b/tests/test_chroma_knowledge_store.py @@ -0,0 +1,549 @@ +""" +Tests for the ChromaDB knowledge store, OntologyHotReloader, and +ChromaDB-backed KnowledgeStoreInterface. + +Covers: +- Round-trip persistence (add → destroy client → reopen → verify) +- All KnowledgeStoreBackend operations via ChromaKnowledgeStore +- Semantic retrieval via Chroma vector search +- Structured retrieval via ``where`` metadata filters +- OntologyHotReloader fires on file change +- KnowledgeStoreInterface backend selection via constructor arg and env var +""" + +import json +import os +import shutil +import tempfile +import time +import unittest + +import pytest + +from godelOS.core_kr.ast.nodes import ( + ApplicationNode, + ConnectiveNode, + ConstantNode, + VariableNode, +) +from godelOS.core_kr.knowledge_store.hot_reloader import ( + OntologyHotReloader, + _parse_jsonld_triples, + _parse_ttl_triples, +) +from godelOS.core_kr.knowledge_store.interface import KnowledgeStoreInterface +from godelOS.core_kr.knowledge_store.chroma_store import ChromaKnowledgeStore +from godelOS.core_kr.type_system.manager import TypeSystemManager +from godelOS.core_kr.type_system.types import FunctionType +from godelOS.core_kr.unification_engine.engine import UnificationEngine + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_fixtures(): + """Return a dict of reusable AST fixtures.""" + tsm = TypeSystemManager() + entity = tsm.get_type("Entity") + boolean = tsm.get_type("Boolean") + func_type = FunctionType([entity], boolean) + + socrates = ConstantNode("Socrates", entity) + plato = ConstantNode("Plato", entity) + human_pred = ConstantNode("Human", func_type) + mortal_pred = ConstantNode("Mortal", func_type) + + var_x = VariableNode("?x", 1, entity) + + human_socrates = ApplicationNode(human_pred, [socrates], boolean) + mortal_socrates = ApplicationNode(mortal_pred, [socrates], boolean) + human_plato = ApplicationNode(human_pred, [plato], boolean) + human_var_x = ApplicationNode(human_pred, [var_x], boolean) + + implies_hm = ConnectiveNode( + "IMPLIES", [human_socrates, mortal_socrates], boolean + ) + + return { + "tsm": tsm, + "entity": entity, + "boolean": boolean, + "func_type": func_type, + "socrates": socrates, + "plato": plato, + "human_pred": human_pred, + "mortal_pred": mortal_pred, + "var_x": var_x, + "human_socrates": human_socrates, + "mortal_socrates": mortal_socrates, + "human_plato": human_plato, + "human_var_x": human_var_x, + "implies_hm": implies_hm, + } + + +# --------------------------------------------------------------------------- +# ChromaKnowledgeStore tests +# --------------------------------------------------------------------------- + + +class TestChromaKnowledgeStore(unittest.TestCase): + """Direct tests against the ChromaKnowledgeStore backend.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.fix = _make_fixtures() + self.ue = UnificationEngine(self.fix["tsm"]) + self.store = ChromaKnowledgeStore( + self.ue, persist_directory=self.tmpdir + ) + # Create default context + self.store.create_context("TRUTHS", None, "truths") + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + # -- context CRUD ------------------------------------------------------ + + def test_create_and_list_contexts(self): + self.assertIn("TRUTHS", self.store.list_contexts()) + self.store.create_context("BELIEFS", None, "beliefs") + self.assertIn("BELIEFS", self.store.list_contexts()) + + def test_create_duplicate_context_raises(self): + with self.assertRaises(ValueError): + self.store.create_context("TRUTHS", None, "truths") + + def test_create_context_with_missing_parent_raises(self): + with self.assertRaises(ValueError): + self.store.create_context("CHILD", "NONEXISTENT", "child") + + def test_delete_context(self): + self.store.create_context("TO_DELETE", None, "temp") + self.assertIn("TO_DELETE", self.store.list_contexts()) + self.store.delete_context("TO_DELETE") + self.assertNotIn("TO_DELETE", self.store.list_contexts()) + + def test_delete_context_with_children_raises(self): + self.store.create_context("PARENT", None, "parent") + self.store.create_context("CHILD", "PARENT", "child") + with self.assertRaises(ValueError): + self.store.delete_context("PARENT") + + def test_delete_nonexistent_context_raises(self): + with self.assertRaises(ValueError): + self.store.delete_context("NOPE") + + def test_get_context_info(self): + info = self.store.get_context_info("TRUTHS") + self.assertIsNotNone(info) + self.assertEqual(info["type"], "truths") + self.assertIsNone(info["parent"]) + + # -- statement CRUD ---------------------------------------------------- + + def test_add_and_exists(self): + result = self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.assertTrue(result) + self.assertTrue( + self.store.statement_exists(self.fix["human_socrates"], ["TRUTHS"]) + ) + + def test_add_duplicate_returns_false(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + result = self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.assertFalse(result) + + def test_add_to_nonexistent_context_raises(self): + with self.assertRaises(ValueError): + self.store.add_statement(self.fix["human_socrates"], "NOPE") + + def test_retract_statement(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.assertTrue( + self.store.retract_statement(self.fix["human_socrates"], "TRUTHS") + ) + self.assertFalse( + self.store.statement_exists(self.fix["human_socrates"], ["TRUTHS"]) + ) + + def test_retract_nonexistent_returns_false(self): + self.assertFalse( + self.store.retract_statement(self.fix["human_plato"], "TRUTHS") + ) + + def test_retract_with_variable_pattern(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + result = self.store.retract_statement(self.fix["human_var_x"], "TRUTHS") + self.assertTrue(result) + self.assertFalse( + self.store.statement_exists(self.fix["human_socrates"], ["TRUTHS"]) + ) + + # -- query pattern matching -------------------------------------------- + + def test_query_exact(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + results = self.store.query_statements_match_pattern( + self.fix["human_socrates"], ["TRUTHS"] + ) + self.assertEqual(len(results), 1) + + def test_query_variable_binding(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + results = self.store.query_statements_match_pattern( + self.fix["human_var_x"], ["TRUTHS"] + ) + self.assertEqual(len(results), 1) + self.assertEqual(results[0][self.fix["var_x"]], self.fix["socrates"]) + + def test_query_multiple_contexts(self): + self.store.create_context("BELIEFS", None, "beliefs") + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.store.add_statement(self.fix["human_plato"], "BELIEFS") + results = self.store.query_statements_match_pattern( + self.fix["human_var_x"], ["TRUTHS", "BELIEFS"] + ) + self.assertEqual(len(results), 2) + + def test_query_nonexistent_context_raises(self): + with self.assertRaises(ValueError): + self.store.query_statements_match_pattern( + self.fix["human_socrates"], ["NOPE"] + ) + + def test_query_with_variables_to_bind(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + results = self.store.query_statements_match_pattern( + self.fix["human_var_x"], + ["TRUTHS"], + variables_to_bind=[self.fix["var_x"]], + ) + self.assertEqual(len(results), 1) + self.assertIn(self.fix["var_x"], results[0]) + + # -- get_all_statements_in_context ------------------------------------- + + def test_get_all_statements_in_context(self): + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.store.add_statement(self.fix["mortal_socrates"], "TRUTHS") + stmts = self.store.get_all_statements_in_context("TRUTHS") + self.assertEqual(len(stmts), 2) + + # -- round-trip persistence -------------------------------------------- + + def test_round_trip_persistence(self): + """Add data, destroy client, reopen from same dir, verify presence.""" + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.store.add_statement(self.fix["mortal_socrates"], "TRUTHS") + + # Destroy the existing client to force a cold restart + del self.store + + # Reopen from disk + store2 = ChromaKnowledgeStore(self.ue, persist_directory=self.tmpdir) + self.assertTrue( + store2.statement_exists(self.fix["human_socrates"], ["TRUTHS"]) + ) + self.assertTrue( + store2.statement_exists(self.fix["mortal_socrates"], ["TRUTHS"]) + ) + self.assertIn("TRUTHS", store2.list_contexts()) + + results = store2.query_statements_match_pattern( + self.fix["human_var_x"], ["TRUTHS"] + ) + self.assertEqual(len(results), 1) + self.assertEqual(results[0][self.fix["var_x"]], self.fix["socrates"]) + + def test_round_trip_connective(self): + """Connective nodes survive serialisation round-trip.""" + self.store.add_statement(self.fix["implies_hm"], "TRUTHS") + del self.store + + store2 = ChromaKnowledgeStore(self.ue, persist_directory=self.tmpdir) + self.assertTrue( + store2.statement_exists(self.fix["implies_hm"], ["TRUTHS"]) + ) + + # -- semantic retrieval ------------------------------------------------ + + def test_query_by_similarity(self): + """Semantic vector search returns relevant results.""" + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.store.add_statement(self.fix["mortal_socrates"], "TRUTHS") + + results = self.store.query_by_similarity( + "who is human", "TRUTHS", n_results=2 + ) + self.assertTrue(len(results) > 0) + # Each result should have expected keys + for r in results: + self.assertIn("id", r) + self.assertIn("document", r) + self.assertIn("metadata", r) + self.assertIn("distance", r) + self.assertIn("statement", r) + + def test_query_by_similarity_empty_context(self): + """Semantic query on empty context returns empty list.""" + results = self.store.query_by_similarity( + "anything", "TRUTHS", n_results=5 + ) + self.assertEqual(len(results), 0) + + # -- structured metadata retrieval ------------------------------------- + + def test_query_by_metadata(self): + """Structured metadata filter returns matching items.""" + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.store.add_statement(self.fix["mortal_socrates"], "TRUTHS") + + results = self.store.query_by_metadata( + "TRUTHS", {"predicate": "Human"} + ) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["metadata"]["predicate"], "Human") + + def test_query_by_metadata_subject(self): + """Filter by subject metadata.""" + self.store.add_statement(self.fix["human_socrates"], "TRUTHS") + self.store.add_statement(self.fix["human_plato"], "TRUTHS") + + results = self.store.query_by_metadata( + "TRUTHS", {"subject": "Plato"} + ) + self.assertEqual(len(results), 1) + + +# --------------------------------------------------------------------------- +# KnowledgeStoreInterface with ChromaDB backend +# --------------------------------------------------------------------------- + + +class TestKnowledgeStoreInterfaceChroma(unittest.TestCase): + """Test KnowledgeStoreInterface configured to use the ChromaDB backend.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.fix = _make_fixtures() + self.ks = KnowledgeStoreInterface( + self.fix["tsm"], backend="chroma", db_path=self.tmpdir + ) + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_default_contexts_created(self): + contexts = self.ks.list_contexts() + self.assertIn("TRUTHS", contexts) + self.assertIn("BELIEFS", contexts) + self.assertIn("HYPOTHETICAL", contexts) + + def test_add_and_query(self): + self.ks.add_statement(self.fix["human_socrates"]) + results = self.ks.query_statements_match_pattern(self.fix["human_var_x"]) + self.assertEqual(len(results), 1) + + def test_persistence_across_interface_recreations(self): + """Knowledge survives interface tear-down/re-creation.""" + self.ks.add_statement(self.fix["human_socrates"]) + del self.ks + ks2 = KnowledgeStoreInterface( + self.fix["tsm"], backend="chroma", db_path=self.tmpdir + ) + self.assertTrue(ks2.statement_exists(self.fix["human_socrates"])) + + def test_env_var_backend_selection(self): + """KNOWLEDGE_STORE_BACKEND env-var selects the backend.""" + db2 = os.path.join(self.tmpdir, "env_test") + old_backend = os.environ.get("KNOWLEDGE_STORE_BACKEND") + old_path = os.environ.get("KNOWLEDGE_STORE_PATH") + try: + os.environ["KNOWLEDGE_STORE_BACKEND"] = "chroma" + os.environ["KNOWLEDGE_STORE_PATH"] = db2 + ks = KnowledgeStoreInterface(self.fix["tsm"]) + ks.add_statement(self.fix["human_socrates"]) + self.assertTrue(ks.statement_exists(self.fix["human_socrates"])) + # Persistence directory should exist + self.assertTrue(os.path.exists(db2)) + finally: + if old_backend is None: + os.environ.pop("KNOWLEDGE_STORE_BACKEND", None) + else: + os.environ["KNOWLEDGE_STORE_BACKEND"] = old_backend + if old_path is None: + os.environ.pop("KNOWLEDGE_STORE_PATH", None) + else: + os.environ["KNOWLEDGE_STORE_PATH"] = old_path + + +# --------------------------------------------------------------------------- +# OntologyHotReloader +# --------------------------------------------------------------------------- + + +class TestOntologyHotReloader(unittest.TestCase): + """Test the file-system watcher + delta application logic.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.added: list = [] + self.removed: list = [] + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _on_add(self, s, p, o): + self.added.append((s, p, o)) + + def _on_remove(self, s, p, o): + self.removed.append((s, p, o)) + + def test_initial_snapshot_empty_dir(self): + reloader = OntologyHotReloader( + self.tmpdir, self._on_add, self._on_remove + ) + self.assertEqual(reloader._snapshot, {}) + + def test_reload_picks_up_new_file(self): + reloader = OntologyHotReloader( + self.tmpdir, self._on_add, self._on_remove + ) + # Write a JSON-LD file + path = os.path.join(self.tmpdir, "test.json-ld") + data = [{"@id": "ex:Cat", "ex:sound": "meow"}] + with open(path, "w") as fh: + json.dump(data, fh) + + reloader.reload() + self.assertEqual(len(self.added), 1) + self.assertEqual(self.added[0], ("ex:Cat", "ex:sound", "meow")) + + def test_reload_detects_removal(self): + # Seed a file first + path = os.path.join(self.tmpdir, "animals.json-ld") + data = [{"@id": "ex:Dog", "ex:sound": "woof"}] + with open(path, "w") as fh: + json.dump(data, fh) + + reloader = OntologyHotReloader( + self.tmpdir, self._on_add, self._on_remove + ) + self.assertEqual(len(self.added), 0) + + os.remove(path) + reloader.reload() + self.assertEqual(len(self.removed), 1) + self.assertEqual(self.removed[0], ("ex:Dog", "ex:sound", "woof")) + + def test_reload_detects_modification(self): + path = os.path.join(self.tmpdir, "mod.json-ld") + data = [{"@id": "ex:A", "ex:val": "1"}] + with open(path, "w") as fh: + json.dump(data, fh) + + reloader = OntologyHotReloader( + self.tmpdir, self._on_add, self._on_remove + ) + + data2 = [{"@id": "ex:A", "ex:val": "2"}] + with open(path, "w") as fh: + json.dump(data2, fh) + + reloader.reload() + self.assertIn(("ex:A", "ex:val", "1"), self.removed) + self.assertIn(("ex:A", "ex:val", "2"), self.added) + + def test_ttl_file_parsing(self): + reloader = OntologyHotReloader( + self.tmpdir, self._on_add, self._on_remove + ) + path = os.path.join(self.tmpdir, "onto.ttl") + with open(path, "w") as fh: + fh.write("# comment\n") + fh.write(" .\n") + fh.write(" .\n") + + reloader.reload() + self.assertEqual(len(self.added), 2) + + def test_hot_reload_via_observer(self): + """The watchdog observer detects a new file within 5 seconds.""" + reloader = OntologyHotReloader( + self.tmpdir, self._on_add, self._on_remove, debounce_seconds=0.1 + ) + reloader.start() + try: + path = os.path.join(self.tmpdir, "live.json-ld") + data = [{"@id": "ex:Live", "ex:status": "active"}] + with open(path, "w") as fh: + json.dump(data, fh) + + deadline = time.monotonic() + 5.0 + while not self.added and time.monotonic() < deadline: + time.sleep(0.2) + + self.assertTrue( + len(self.added) > 0, + "Hot-reload did not fire within 5 seconds", + ) + finally: + reloader.stop() + + +# --------------------------------------------------------------------------- +# Parsers unit tests +# --------------------------------------------------------------------------- + + +class TestParsers(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_parse_jsonld_graph(self): + path = os.path.join(self.tmpdir, "g.json-ld") + data = {"@graph": [{"@id": "ex:A", "ex:p": "v"}]} + with open(path, "w") as fh: + json.dump(data, fh) + triples = _parse_jsonld_triples(path) + self.assertEqual(triples, {("ex:A", "ex:p", "v")}) + + def test_parse_jsonld_list(self): + path = os.path.join(self.tmpdir, "l.json-ld") + data = [{"@id": "ex:B", "ex:q": [{"@value": "42"}]}] + with open(path, "w") as fh: + json.dump(data, fh) + triples = _parse_jsonld_triples(path) + self.assertEqual(triples, {("ex:B", "ex:q", "42")}) + + def test_parse_ttl_simple(self): + path = os.path.join(self.tmpdir, "t.ttl") + with open(path, "w") as fh: + fh.write(" .\n") + triples = _parse_ttl_triples(path) + self.assertEqual(triples, {("ex:S", "ex:P", "ex:O")}) + + def test_parse_ttl_skips_prefix(self): + path = os.path.join(self.tmpdir, "p.ttl") + with open(path, "w") as fh: + fh.write("@prefix ex: .\n") + fh.write(" .\n") + triples = _parse_ttl_triples(path) + self.assertEqual(triples, {("ex:S", "ex:P", "ex:O")}) + + def test_parse_invalid_file_returns_empty(self): + path = os.path.join(self.tmpdir, "bad.json-ld") + with open(path, "w") as fh: + fh.write("NOT JSON") + triples = _parse_jsonld_triples(path) + self.assertEqual(triples, set()) + + +if __name__ == "__main__": + unittest.main() From f5bcbc66e083b06869def4af45503039747edcd8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:24:07 +0000 Subject: [PATCH 5/5] Initial plan