diff --git a/backend/unified_server.py b/backend/unified_server.py index 6d02754..2587e68 100644 --- a/backend/unified_server.py +++ b/backend/unified_server.py @@ -205,17 +205,31 @@ async def process_query(self, query): LLM_COGNITIVE_DRIVER_AVAILABLE = False # Import additional services with fallbacks +# Import each service independently so a failure in one (e.g. thinc/spaCy for +# knowledge_management) doesn't take down the ingestion service. +knowledge_ingestion_service = None +knowledge_management_service = None +knowledge_pipeline_service = None +KNOWLEDGE_SERVICES_AVAILABLE = False + try: - from backend.knowledge_ingestion import knowledge_ingestion_service - from backend.knowledge_management import knowledge_management_service - from backend.knowledge_pipeline_service import knowledge_pipeline_service + from backend.knowledge_ingestion import knowledge_ingestion_service as _kis + knowledge_ingestion_service = _kis KNOWLEDGE_SERVICES_AVAILABLE = True except ImportError as e: - logger.warning(f"Knowledge services not available: {e}") - knowledge_ingestion_service = None - knowledge_management_service = None - knowledge_pipeline_service = None - KNOWLEDGE_SERVICES_AVAILABLE = False + logger.warning(f"Knowledge ingestion service not available: {e}") + +try: + from backend.knowledge_management import knowledge_management_service as _kms + knowledge_management_service = _kms +except ImportError as e: + logger.warning(f"Knowledge management service not available: {e}") + +try: + from backend.knowledge_pipeline_service import knowledge_pipeline_service as _kps + knowledge_pipeline_service = _kps +except ImportError as e: + logger.warning(f"Knowledge pipeline service not available: {e}") # Import production vector database try: @@ -2624,6 +2638,11 @@ async def get_import_progress(import_id: str): "error": str(e) } +@app.get("/api/knowledge/import/status/{job_id}") +async def get_import_status(job_id: str): + """Get the status of an import job (alias for progress endpoint).""" + return await get_import_progress(job_id) + @app.post("/api/knowledge/import/file") async def import_knowledge_from_file(file: UploadFile = File(...), filename: str = Form(None), file_type: str = Form(None)): """Import knowledge from uploaded file.""" @@ -2679,12 +2698,14 @@ async def import_knowledge_from_file(file: UploadFile = File(...), filename: str "filename": file.filename, "file_size": len(content), "content_type": file.content_type, - "file_type": file_type + "file_type": determined_file_type } + except HTTPException: + raise except Exception as e: - logger.error(f"Error importing knowledge from file: {e}") - raise HTTPException(status_code=500, detail=f"File import error: {str(e)}") + logger.error(f"Error importing knowledge from file: {e}") + raise HTTPException(status_code=500, detail=f"File import error: {str(e)}") @app.post("/api/knowledge/import/wikipedia") async def import_knowledge_from_wikipedia(request: WikipediaImportSchema): @@ -2725,6 +2746,8 @@ async def import_knowledge_from_wikipedia(request: WikipediaImportSchema): "source": f"Wikipedia: {title}" } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing from Wikipedia: {e}") raise HTTPException(status_code=500, detail=f"Wikipedia import error: {str(e)}") @@ -2768,20 +2791,11 @@ async def import_knowledge_from_url(request: URLImportSchema): "source": f"URL: {url}" } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing from URL: {e}") raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}") - - return extracted_knowledge - - except Exception as e: - logger.error(f"Error importing from URL: {e}") - import_jobs[import_id].update({ - "status": "error", - "completed_at": datetime.now().isoformat(), - "error": str(e) - }) - raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}") @app.post("/api/knowledge/import/text") async def import_knowledge_from_text(request: TextImportSchema): @@ -2824,6 +2838,8 @@ async def import_knowledge_from_text(request: TextImportSchema): "content_length": len(content) } + except HTTPException: + raise except Exception as e: logger.error(f"Error importing from text: {e}") raise HTTPException(status_code=500, detail=f"Text import error: {str(e)}") @@ -3104,9 +3120,63 @@ async def add_knowledge(payload: AddKnowledgeSchema): # Batch import compatibility endpoint @app.post("/api/knowledge/import/batch") async def import_knowledge_batch(request: BatchImportSchema): + """Batch import knowledge from multiple sources.""" sources = request.sources - import_ids = [f"batch_{i}_{int(time.time()*1000)}" for i, _ in enumerate(sources)] - return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued"} + if not sources: + return {"import_ids": [], "batch_size": 0, "status": "completed"} + + import_ids = [] + results = [] + for i, source in enumerate(sources): + src_type = source.get("type", "text") + fallback_id = f"batch_{i}_{int(time.time()*1000)}" + try: + if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service: + from backend.knowledge_models import ( + TextImportRequest, URLImportRequest, ImportSource, + ) + if src_type == "url": + url = source.get("url", source.get("source", "")) + imp_source = ImportSource( + source_type="url", + source_identifier=url, + metadata={"url": url}, + ) + req = URLImportRequest( + url=url, + source=imp_source, + max_depth=source.get("max_depth", 1), + follow_links=source.get("follow_links", False), + content_selectors=source.get("content_selectors", []), + ) + iid = await knowledge_ingestion_service.import_from_url(req) + else: + # Default to text import + content = source.get("content", "") + title = source.get("title", f"Batch item {i}") + imp_source = ImportSource( + source_type="text", + source_identifier=title, + metadata={"manual_input": True}, + ) + req = TextImportRequest( + content=content or title, + title=title, + source=imp_source, + format_type=source.get("format_type", "plain"), + ) + iid = await knowledge_ingestion_service.import_from_text(req) + import_ids.append(iid) + results.append({"index": i, "import_id": iid, "status": "queued"}) + else: + import_ids.append(fallback_id) + results.append({"index": i, "import_id": fallback_id, "status": "queued"}) + except Exception as exc: + logger.warning(f"Batch item {i} failed: {exc}") + import_ids.append(fallback_id) + results.append({"index": i, "import_id": fallback_id, "status": "failed", "error": str(exc)}) + + return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued", "results": results} # Additional KG stats and analytics endpoints @app.get("/api/knowledge/graph/stats") @@ -3860,10 +3930,31 @@ async def create_category(payload: dict): return {"status": "success", "category_id": payload.get("category_id", "new")} +@app.delete("/api/knowledge/import/cancel/{job_id}") +async def cancel_import_by_job_id(job_id: str): + """Cancel an import job by job ID.""" + return await cancel_import(job_id) + + @app.delete("/api/knowledge/import/{import_id}") async def cancel_import(import_id: str): """Cancel a running import job.""" - return {"import_id": import_id, "status": "cancelled"} + cancelled = False + if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service: + try: + cancelled = await knowledge_ingestion_service.cancel_import(import_id) + except Exception as e: + logger.warning(f"Error cancelling import {import_id}: {e}") + + # Also remove from the short-lived server-side import_jobs map + if import_id in import_jobs: + import_jobs[import_id]["status"] = "cancelled" + cancelled = True + + return { + "import_id": import_id, + "status": "cancelled" if cancelled else "not_found", + } if __name__ == "__main__": diff --git a/tests/backend/test_api_endpoints.py b/tests/backend/test_api_endpoints.py index 3fcb0e2..474b948 100644 --- a/tests/backend/test_api_endpoints.py +++ b/tests/backend/test_api_endpoints.py @@ -483,7 +483,7 @@ def test_cancel_import(self): data = response.json() assert data["import_id"] == import_id - assert data["status"] == "cancelled" + assert data["status"] in ("cancelled", "not_found") class TestKnowledgeSearchEndpoints: diff --git a/tests/backend/test_knowledge_import_endpoints.py b/tests/backend/test_knowledge_import_endpoints.py new file mode 100644 index 0000000..a796e64 --- /dev/null +++ b/tests/backend/test_knowledge_import_endpoints.py @@ -0,0 +1,317 @@ +""" +Tests for all 6 knowledge import endpoints on the unified server. + +Exercises the routes via httpx.AsyncClient against the ASGI app so no live +server is required. The KnowledgeIngestionService is mocked at the module +level so that we test the HTTP layer in isolation. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +# We need to mock the ingestion service *before* importing the app because the +# module-level import in unified_server.py captures the global reference. The +# fixtures below take care of this via monkeypatching. + + +def _make_mock_service(): + """Create a fully mocked KnowledgeIngestionService.""" + svc = MagicMock() + svc.import_from_url = AsyncMock(return_value="url_import_001") + svc.import_from_text = AsyncMock(return_value="text_import_001") + svc.import_from_file = AsyncMock(return_value="file_import_001") + svc.cancel_import = AsyncMock(return_value=True) + svc.get_import_progress = AsyncMock(return_value=MagicMock( + import_id="progress_001", + status="processing", + progress_percentage=42.0, + started_at=1000000.0, + completed_at=None, + error_message=None, + error=None, + filename="test.txt", + )) + return svc + + +@pytest.fixture() +def patched_app(): + """ + Yield the unified_server FastAPI ``app`` with KnowledgeIngestionService + stubbed out so the endpoints never touch real I/O. + """ + mock_svc = _make_mock_service() + + with ( + patch("backend.unified_server.KNOWLEDGE_SERVICES_AVAILABLE", True), + patch("backend.unified_server.knowledge_ingestion_service", mock_svc), + ): + from backend.unified_server import app + yield app, mock_svc + + +@pytest.fixture() +def patched_app_no_service(): + """Yield the app with ingestion service unavailable (503 path).""" + with ( + patch("backend.unified_server.KNOWLEDGE_SERVICES_AVAILABLE", False), + patch("backend.unified_server.knowledge_ingestion_service", None), + ): + from backend.unified_server import app + yield app + + +# --------------------------------------------------------------------------- +# Tests — happy paths +# --------------------------------------------------------------------------- + + +class TestKnowledgeImportURL: + """POST /api/knowledge/import/url""" + + @pytest.mark.asyncio + async def test_url_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/url", json={ + "url": "https://example.com/article", + }) + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "url_import_001" + assert body["status"] == "queued" + mock_svc.import_from_url.assert_awaited_once() + + @pytest.mark.asyncio + async def test_url_import_missing_url(self, patched_app): + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/url", json={}) + # URLImportSchema requires 'url'; Pydantic rejects with 422 + assert resp.status_code == 422 + + @pytest.mark.asyncio + async def test_url_import_service_unavailable(self, patched_app_no_service): + app = patched_app_no_service + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/url", json={ + "url": "https://example.com", + }) + assert resp.status_code == 503 + + +class TestKnowledgeImportText: + """POST /api/knowledge/import/text""" + + @pytest.mark.asyncio + async def test_text_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/text", json={ + "content": "The sky is blue because of Rayleigh scattering.", + "title": "Sky Color", + }) + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "text_import_001" + assert body["status"] == "queued" + assert body["content_length"] == len("The sky is blue because of Rayleigh scattering.") + mock_svc.import_from_text.assert_awaited_once() + + @pytest.mark.asyncio + async def test_text_import_missing_content(self, patched_app): + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/text", json={ + "title": "No content", + }) + # TextImportSchema requires 'content'; Pydantic rejects with 422 + assert resp.status_code == 422 + + @pytest.mark.asyncio + async def test_text_import_service_unavailable(self, patched_app_no_service): + app = patched_app_no_service + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/text", json={ + "content": "test", + }) + assert resp.status_code == 503 + + +class TestKnowledgeImportFile: + """POST /api/knowledge/import/file""" + + @pytest.mark.asyncio + async def test_file_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/api/knowledge/import/file", + files={"file": ("notes.txt", b"Hello world content", "text/plain")}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "file_import_001" + assert body["status"] == "started" + assert body["filename"] == "notes.txt" + mock_svc.import_from_file.assert_awaited_once() + + @pytest.mark.asyncio + async def test_file_import_pdf(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/api/knowledge/import/file", + files={"file": ("doc.pdf", b"%PDF-fake", "application/pdf")}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "started" + + @pytest.mark.asyncio + async def test_file_import_service_unavailable(self, patched_app_no_service): + app = patched_app_no_service + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/api/knowledge/import/file", + files={"file": ("test.txt", b"x", "text/plain")}, + ) + assert resp.status_code == 503 + + +class TestKnowledgeImportStatus: + """GET /api/knowledge/import/status/{job_id}""" + + @pytest.mark.asyncio + async def test_status_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.get("/api/knowledge/import/status/progress_001") + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "progress_001" + assert "status" in body + + @pytest.mark.asyncio + async def test_status_not_found(self, patched_app): + """When the job doesn't exist we still get 200 with status=not_found.""" + app, mock_svc = patched_app + mock_svc.get_import_progress = AsyncMock(return_value=None) + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.get("/api/knowledge/import/status/nonexistent") + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "not_found" + + @pytest.mark.asyncio + async def test_progress_endpoint_also_works(self, patched_app): + """The original /progress/ path should still work.""" + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.get("/api/knowledge/import/progress/progress_001") + assert resp.status_code == 200 + + +class TestKnowledgeImportBatch: + """POST /api/knowledge/import/batch""" + + @pytest.mark.asyncio + async def test_batch_import_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/batch", json={ + "sources": [ + {"type": "text", "content": "Fact one", "title": "F1"}, + {"type": "text", "content": "Fact two", "title": "F2"}, + ], + }) + assert resp.status_code == 200 + body = resp.json() + assert body["batch_size"] == 2 + assert len(body["import_ids"]) == 2 + assert body["status"] == "queued" + # Each text source should call import_from_text once + assert mock_svc.import_from_text.await_count == 2 + + @pytest.mark.asyncio + async def test_batch_import_empty_sources(self, patched_app): + app, _ = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/batch", json={"sources": []}) + assert resp.status_code == 200 + body = resp.json() + assert body["batch_size"] == 0 + assert body["status"] == "completed" + + @pytest.mark.asyncio + async def test_batch_import_url_type(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post("/api/knowledge/import/batch", json={ + "sources": [ + {"type": "url", "url": "https://example.com/page"}, + ], + }) + assert resp.status_code == 200 + body = resp.json() + assert body["batch_size"] == 1 + mock_svc.import_from_url.assert_awaited_once() + + +class TestKnowledgeImportCancel: + """DELETE /api/knowledge/import/cancel/{job_id}""" + + @pytest.mark.asyncio + async def test_cancel_returns_200(self, patched_app): + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.delete("/api/knowledge/import/cancel/some-job-id") + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "some-job-id" + assert body["status"] == "cancelled" + mock_svc.cancel_import.assert_awaited_once_with("some-job-id") + + @pytest.mark.asyncio + async def test_cancel_not_found(self, patched_app): + app, mock_svc = patched_app + mock_svc.cancel_import = AsyncMock(return_value=False) + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.delete("/api/knowledge/import/cancel/nonexistent") + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "not_found" + + @pytest.mark.asyncio + async def test_delete_endpoint_also_works(self, patched_app): + """The original DELETE /api/knowledge/import/{id} path should still work.""" + app, mock_svc = patched_app + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.delete("/api/knowledge/import/job-42") + assert resp.status_code == 200 + body = resp.json() + assert body["import_id"] == "job-42"