Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 116 additions & 25 deletions backend/unified_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,31 @@ async def process_query(self, query):
LLM_COGNITIVE_DRIVER_AVAILABLE = False

# Import additional services with fallbacks
# Import each service independently so a failure in one (e.g. thinc/spaCy for
# knowledge_management) doesn't take down the ingestion service.
knowledge_ingestion_service = None
knowledge_management_service = None
knowledge_pipeline_service = None
KNOWLEDGE_SERVICES_AVAILABLE = False

try:
from backend.knowledge_ingestion import knowledge_ingestion_service
from backend.knowledge_management import knowledge_management_service
from backend.knowledge_pipeline_service import knowledge_pipeline_service
from backend.knowledge_ingestion import knowledge_ingestion_service as _kis
knowledge_ingestion_service = _kis
KNOWLEDGE_SERVICES_AVAILABLE = True
except ImportError as e:
logger.warning(f"Knowledge services not available: {e}")
knowledge_ingestion_service = None
knowledge_management_service = None
knowledge_pipeline_service = None
KNOWLEDGE_SERVICES_AVAILABLE = False
logger.warning(f"Knowledge ingestion service not available: {e}")

try:
from backend.knowledge_management import knowledge_management_service as _kms
knowledge_management_service = _kms
except ImportError as e:
logger.warning(f"Knowledge management service not available: {e}")

try:
from backend.knowledge_pipeline_service import knowledge_pipeline_service as _kps
knowledge_pipeline_service = _kps
except ImportError as e:
logger.warning(f"Knowledge pipeline service not available: {e}")

# Import production vector database
try:
Expand Down Expand Up @@ -2624,6 +2638,11 @@ async def get_import_progress(import_id: str):
"error": str(e)
}

@app.get("/api/knowledge/import/status/{job_id}")
async def get_import_status(job_id: str):
"""Get the status of an import job (alias for progress endpoint)."""
return await get_import_progress(job_id)

@app.post("/api/knowledge/import/file")
async def import_knowledge_from_file(file: UploadFile = File(...), filename: str = Form(None), file_type: str = Form(None)):
"""Import knowledge from uploaded file."""
Expand Down Expand Up @@ -2679,12 +2698,14 @@ async def import_knowledge_from_file(file: UploadFile = File(...), filename: str
"filename": file.filename,
"file_size": len(content),
"content_type": file.content_type,
"file_type": file_type
"file_type": determined_file_type
}

except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing knowledge from file: {e}")
raise HTTPException(status_code=500, detail=f"File import error: {str(e)}")
logger.error(f"Error importing knowledge from file: {e}")
raise HTTPException(status_code=500, detail=f"File import error: {str(e)}")

@app.post("/api/knowledge/import/wikipedia")
async def import_knowledge_from_wikipedia(request: WikipediaImportSchema):
Expand Down Expand Up @@ -2725,6 +2746,8 @@ async def import_knowledge_from_wikipedia(request: WikipediaImportSchema):
"source": f"Wikipedia: {title}"
}

except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing from Wikipedia: {e}")
raise HTTPException(status_code=500, detail=f"Wikipedia import error: {str(e)}")
Expand Down Expand Up @@ -2768,20 +2791,11 @@ async def import_knowledge_from_url(request: URLImportSchema):
"source": f"URL: {url}"
}

except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing from URL: {e}")
raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}")

return extracted_knowledge

except Exception as e:
logger.error(f"Error importing from URL: {e}")
import_jobs[import_id].update({
"status": "error",
"completed_at": datetime.now().isoformat(),
"error": str(e)
})
raise HTTPException(status_code=500, detail=f"URL import error: {str(e)}")

@app.post("/api/knowledge/import/text")
async def import_knowledge_from_text(request: TextImportSchema):
Expand Down Expand Up @@ -2824,6 +2838,8 @@ async def import_knowledge_from_text(request: TextImportSchema):
"content_length": len(content)
}

except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing from text: {e}")
raise HTTPException(status_code=500, detail=f"Text import error: {str(e)}")
Expand Down Expand Up @@ -3104,9 +3120,63 @@ async def add_knowledge(payload: AddKnowledgeSchema):
# Batch import compatibility endpoint
@app.post("/api/knowledge/import/batch")
async def import_knowledge_batch(request: BatchImportSchema):
"""Batch import knowledge from multiple sources."""
sources = request.sources
import_ids = [f"batch_{i}_{int(time.time()*1000)}" for i, _ in enumerate(sources)]
return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued"}
if not sources:
return {"import_ids": [], "batch_size": 0, "status": "completed"}

import_ids = []
results = []
for i, source in enumerate(sources):
src_type = source.get("type", "text")
fallback_id = f"batch_{i}_{int(time.time()*1000)}"
try:
if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service:
from backend.knowledge_models import (
TextImportRequest, URLImportRequest, ImportSource,
)
if src_type == "url":
url = source.get("url", source.get("source", ""))
imp_source = ImportSource(
source_type="url",
source_identifier=url,
metadata={"url": url},
)
req = URLImportRequest(
url=url,
source=imp_source,
max_depth=source.get("max_depth", 1),
follow_links=source.get("follow_links", False),
content_selectors=source.get("content_selectors", []),
)
iid = await knowledge_ingestion_service.import_from_url(req)
else:
# Default to text import
content = source.get("content", "")
title = source.get("title", f"Batch item {i}")
imp_source = ImportSource(
source_type="text",
source_identifier=title,
metadata={"manual_input": True},
)
req = TextImportRequest(
content=content or title,
title=title,
source=imp_source,
format_type=source.get("format_type", "plain"),
)
iid = await knowledge_ingestion_service.import_from_text(req)
import_ids.append(iid)
results.append({"index": i, "import_id": iid, "status": "queued"})
else:
import_ids.append(fallback_id)
results.append({"index": i, "import_id": fallback_id, "status": "queued"})
except Exception as exc:
logger.warning(f"Batch item {i} failed: {exc}")
import_ids.append(fallback_id)
results.append({"index": i, "import_id": fallback_id, "status": "failed", "error": str(exc)})

return {"import_ids": import_ids, "batch_size": len(import_ids), "status": "queued", "results": results}

# Additional KG stats and analytics endpoints
@app.get("/api/knowledge/graph/stats")
Expand Down Expand Up @@ -3860,10 +3930,31 @@ async def create_category(payload: dict):
return {"status": "success", "category_id": payload.get("category_id", "new")}


@app.delete("/api/knowledge/import/cancel/{job_id}")
async def cancel_import_by_job_id(job_id: str):
"""Cancel an import job by job ID."""
return await cancel_import(job_id)


@app.delete("/api/knowledge/import/{import_id}")
async def cancel_import(import_id: str):
"""Cancel a running import job."""
return {"import_id": import_id, "status": "cancelled"}
cancelled = False
if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service:
try:
cancelled = await knowledge_ingestion_service.cancel_import(import_id)
except Exception as e:
logger.warning(f"Error cancelling import {import_id}: {e}")

# Also remove from the short-lived server-side import_jobs map
if import_id in import_jobs:
import_jobs[import_id]["status"] = "cancelled"
cancelled = True

return {
"import_id": import_id,
"status": "cancelled" if cancelled else "not_found",
}


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tests/backend/test_api_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def test_cancel_import(self):

data = response.json()
assert data["import_id"] == import_id
assert data["status"] == "cancelled"
assert data["status"] in ("cancelled", "not_found")


class TestKnowledgeSearchEndpoints:
Expand Down
Loading