Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "graphrag/graphrag_kg_creation/backend/ai-toolkit"]
path = graphrag/graphrag_kg_creation/backend/ai-toolkit
url = https://github.com/memgraph/ai-toolkit
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ Deploy Memgraph using methods that suit your environment, whether it's container
### Debugging
- [Generating a core dump with Memgraph in Docker Compose](./debugging/docker_compose_with_core_dump_generation/)

### GraphRAG
- [Knowledge graph creation with Unstructured2Graph and MCP Server](./graphrag/graphrag_kg_creation/)

### GraphQL
- [Simple application with Memgraph and Neo4j GraphQL](./graphql/simple_app/)

Expand Down
49 changes: 49 additions & 0 deletions graphrag/graphrag_kg_creation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Memgraph GraphRAG Demo

A demo application for creating knowledge graphs using Memgraph and GraphRAG.

## Prerequisites

- Docker and Docker Compose
- Python 3.x with `uv` package manager
- Node.js and npm

## Setup

### Memgraph

Run Memgraph using Docker Compose:

```bash
docker compose up
```

### Backend

1. Navigate to the `backend` directory
2. Create a `.env` file with your OpenAI API key:
```
OPENAI_API_KEY=your_api_key_here
```
3. Set up the environment:
```bash
uv venv
uv add -r requirements.txt
uv sync
```
4. Run the backend server:
```bash
uv run uvicorn main:app --reload --host 0.0.0.0 --port 8000
```

### Frontend

1. Navigate to the `frontend` directory
2. Install dependencies:
```bash
npm install
```
3. Start the development server:
```bash
npm run dev
```
13 changes: 13 additions & 0 deletions graphrag/graphrag_kg_creation/backend/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
lightrag_storage.out/
.env
*.log
.venv/
venv/
ENV/
env/

1 change: 1 addition & 0 deletions graphrag/graphrag_kg_creation/backend/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
Empty file.
1 change: 1 addition & 0 deletions graphrag/graphrag_kg_creation/backend/ai-toolkit
Submodule ai-toolkit added at a879b1
83 changes: 83 additions & 0 deletions graphrag/graphrag_kg_creation/backend/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Main FastAPI application entry point.
"""
import os
import logging
import sys

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv

# Add the ai-toolkit path to sys.path for imports
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
AI_TOOLKIT_DIR = os.path.join(SCRIPT_DIR, "ai-toolkit")
sys.path.insert(0, os.path.join(AI_TOOLKIT_DIR, "unstructured2graph", "src"))
sys.path.insert(0, os.path.join(AI_TOOLKIT_DIR, "integrations", "lightrag-memgraph", "src"))
sys.path.insert(0, os.path.join(AI_TOOLKIT_DIR, "memgraph-toolbox", "src"))

load_dotenv()

# Configure logging
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Create FastAPI app
app = FastAPI(title="GraphRAG KG Creation API", version="1.0.0")

# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://127.0.0.1:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Import and register routes
from routes import ingest, query, stats, mcp

app.include_router(ingest.router)
app.include_router(query.router)
app.include_router(stats.router)
app.include_router(mcp.router)

# Import OpenAI Agents router if available (requires openai-agents package)
try:
from routes import openai_agents
app.include_router(openai_agents.router)
except ImportError:
logger.warning("OpenAI Agents router not available. Install openai-agents package to enable.")

# Import OpenAI Agents With Planning router (multi-agent orchestration with planner-executor pattern)
try:
from routes import openai_agents_with_planning
app.include_router(openai_agents_with_planning.router)
except ImportError as e:
logger.warning(f"OpenAI Agents With Planning router not available: {e}")

# Import OpenAI Agents With Reasoning router (multi-agent orchestration with planning, execution, and reasoning)
try:
from routes import openai_agents_with_reasoning
app.include_router(openai_agents_with_reasoning.router)
except ImportError as e:
logger.warning(f"OpenAI Agents With Reasoning router not available: {e}")


@app.get("/")
async def root():
return {"message": "GraphRAG KG Creation API", "version": "1.0.0"}


@app.get("/health")
async def health():
return {"status": "healthy"}


if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

209 changes: 209 additions & 0 deletions graphrag/graphrag_kg_creation/backend/mcp_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
"""
MCP (Model Context Protocol) client for communicating with mcp-memgraph service.
Handles SSE parsing, session management, and initialization.
"""
import os
import json
import re
import logging
import httpx
from fastapi import HTTPException

logger = logging.getLogger(__name__)

# Module-level session ID storage for MCP streamable-http
_mcp_session_id = None
_mcp_initialized = False


def parse_sse_response(text: str) -> dict:
"""
Parse Server-Sent Events (SSE) format response from MCP streamable-http.
Format: event: <event-type>\ndata: <json-data>\n\n
"""
if not text or not text.strip():
raise ValueError("Empty SSE response")

# Check if it's SSE format
if "event:" in text or "data:" in text:
lines = text.split('\n')
json_data = None

for i, line in enumerate(lines):
if line.startswith('data:'):
# Extract JSON data after "data: "
data_str = line[5:].strip() # Remove "data: " prefix
if data_str:
try:
json_data = json.loads(data_str)
break
except json.JSONDecodeError:
# Try to find JSON in the line
continue

if json_data is None:
# Try to find JSON anywhere in the response
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
try:
json_data = json.loads(json_match.group())
except json.JSONDecodeError:
pass

if json_data is None:
raise ValueError(f"Could not extract JSON from SSE response: {text[:200]}")

return json_data
else:
# Not SSE format, try to parse as plain JSON
return json.loads(text)


async def initialize_mcp_session(mcp_url: str, timeout: float = 10.0) -> None:
"""
Initialize MCP session by sending an initialize request.
This must be called before making any tool calls.
"""
global _mcp_session_id, _mcp_initialized

if _mcp_initialized and _mcp_session_id:
return # Already initialized

async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}

# Send initialize request
init_payload = {
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "graphrag-backend",
"version": "1.0.0"
}
}
}

response = await client.post(mcp_url, json=init_payload, headers=headers)

# Extract session ID from response headers
if "mcp-session-id" in response.headers:
_mcp_session_id = response.headers["mcp-session-id"]
_mcp_initialized = True
logger.info(f"MCP session initialized with ID: {_mcp_session_id}")
else:
# Try to get session ID from error response (some servers send it even on errors)
if "mcp-session-id" in response.headers:
_mcp_session_id = response.headers["mcp-session-id"]

# Check if initialization was successful
if response.status_code == 200:
_mcp_initialized = True
else:
# Try to parse error
try:
error_data = parse_sse_response(response.text) if response.text else {}
error_msg = error_data.get("error", {}).get("message", "Unknown error")
logger.warning(f"MCP initialization returned status {response.status_code}: {error_msg}")
except:
pass


async def call_mcp_service(mcp_url: str, payload: dict, timeout: float = 60.0) -> dict:
"""
Helper function to call MCP service with session management.
Handles session ID extraction and inclusion for streamable-http transport.
Parses SSE (Server-Sent Events) format responses.
Automatically initializes the session if needed.
"""
global _mcp_session_id, _mcp_initialized

# Ensure session is initialized
if not _mcp_initialized or not _mcp_session_id:
await initialize_mcp_session(mcp_url, timeout=timeout)

# Create a timeout object with longer read timeout for SSE streams
timeout_config = httpx.Timeout(timeout, connect=10.0, read=timeout)

async with httpx.AsyncClient(timeout=timeout_config, follow_redirects=True) as client:
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}

# Always include session ID
if _mcp_session_id:
headers["mcp-session-id"] = _mcp_session_id

# Use regular POST - httpx will read the full SSE stream automatically
response = await client.post(mcp_url, json=payload, headers=headers)

# Extract session ID from response headers (in case it changed)
if "mcp-session-id" in response.headers:
_mcp_session_id = response.headers["mcp-session-id"]

# If we got a "Missing session ID" error, try to re-initialize
if response.status_code == 400:
try:
error_data = parse_sse_response(response.text) if response.text else {}
error_message = error_data.get("error", {}).get("message", "")
if "Missing session ID" in error_message or "No valid session ID" in error_message or "before initialization" in error_message.lower():
# Reset and re-initialize
_mcp_initialized = False
_mcp_session_id = None
await initialize_mcp_session(mcp_url, timeout=timeout)

# Retry the request with new session ID
if _mcp_session_id:
headers["mcp-session-id"] = _mcp_session_id
response = await client.post(mcp_url, json=payload, headers=headers)
if "mcp-session-id" in response.headers:
_mcp_session_id = response.headers["mcp-session-id"]
except (json.JSONDecodeError, ValueError):
pass # Continue with original error handling

# Check response status before parsing
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
# Try to parse error from SSE or JSON format
try:
error_data = parse_sse_response(e.response.text) if e.response.text else {}
error_message = error_data.get("error", {}).get("message", str(error_data))
raise HTTPException(
status_code=e.response.status_code,
detail=f"Error from MCP service: {error_message}"
)
except (json.JSONDecodeError, ValueError) as parse_err:
# If we can't parse, use the response text
error_text = e.response.text[:500] if e.response.text else "Unknown error"
raise HTTPException(
status_code=e.response.status_code,
detail=f"Error from MCP service (status {e.response.status_code}): {error_text}"
)

# Check if response has content
if not response.text or response.text.strip() == "":
logger.warning(f"Empty response from MCP service. Status: {response.status_code}, Headers: {dict(response.headers)}")
raise HTTPException(
status_code=500,
detail=f"Empty response from MCP service (status {response.status_code}). The service may be overloaded or the query timed out."
)

# Parse response (SSE or JSON format)
try:
return parse_sse_response(response.text)
except (json.JSONDecodeError, ValueError) as e:
# Log the actual response for debugging
logger.error(f"Failed to parse response from MCP service. Status: {response.status_code}, Content-Type: {response.headers.get('content-type')}, Body: {response.text[:500]}")
raise HTTPException(
status_code=500,
detail=f"Invalid response from MCP service: {str(e)}. Response: {response.text[:200]}"
)

20 changes: 20 additions & 0 deletions graphrag/graphrag_kg_creation/backend/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "backend"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"deepeval>=3.5.2",
"fastapi==0.109.0",
"httpx>=0.28.1",
"lightrag-hku[api]==1.4.8.2",
"litellm>=1.77.1",
"mcp>=1.14.1",
"neo4j>=5.28.1",
"numpy>=2.2.6",
"openai-agents==0.6.1",
"python-dotenv>=1.1.1",
"unstructured[all-docs]>=0.12.6",
"uvicorn[standard]>=0.31.1",
]
Loading