-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathunified_server.py
More file actions
4345 lines (3804 loc) · 186 KB
/
unified_server.py
File metadata and controls
4345 lines (3804 loc) · 186 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
GödelOS Unified Backend Server
A consolidated server that combines the stability of the minimal server
with the advanced cognitive capabilities of the main server.
This server provides complete functionality with reliable dependencies.
"""
import asyncio
import json
import logging
import os
import sys
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Dict, List, Optional, Any, Union
import uvicorn
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, File, UploadFile, Form, Query, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, HTMLResponse, Response
from pydantic import BaseModel
from dotenv import load_dotenv
# Ensure repository root is on sys.path before importing backend.* packages
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from backend.core.errors import CognitiveError, from_exception
from backend.schemas import (
WikipediaImportSchema,
URLImportSchema,
TextImportSchema,
BatchImportSchema,
AddKnowledgeSchema,
EnhancedCognitiveQuerySchema,
)
from backend.core.structured_logging import (
setup_structured_logging, correlation_context, CorrelationTracker,
api_logger, performance_logger, track_operation
)
from backend.core.enhanced_metrics import metrics_collector, operation_timer, collect_metrics
# Load environment variables from .env file
load_dotenv()
# Setup enhanced logging
setup_structured_logging(
log_level=os.getenv("LOG_LEVEL", "INFO"),
log_file=os.getenv("LOG_FILE"),
enable_json=os.getenv("ENABLE_JSON_LOGGING", "true").lower() == "true",
enable_console=True
)
logger = logging.getLogger(__name__)
# (PYTHONPATH insertion is done above, before importing backend.*)
def _structured_http_error(status: int, *, code: str, message: str, recoverable: bool = False, service: Optional[str] = None, **details) -> HTTPException:
"""Create a standardized HTTPException detail using CognitiveError."""
err = CognitiveError(code=code, message=message, recoverable=recoverable, details={**({"service": service} if service else {}), **details})
return HTTPException(status_code=status, detail=err.to_dict())
# Core model definitions
class QueryRequest(BaseModel):
query: str
context: Optional[Dict[str, Any]] = None
stream: Optional[bool] = False
class QueryResponse(BaseModel):
response: str
confidence: Optional[float] = None
reasoning_trace: Optional[List[str]] = None
sources: Optional[List[str]] = None
inference_time_ms: Optional[float] = None
knowledge_used: Optional[List[str]] = None
class KnowledgeRequest(BaseModel):
content: str
source: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class CognitiveStreamConfig(BaseModel):
enable_reasoning_trace: bool = True
enable_transparency: bool = True
stream_interval: int = 1000
class ChatMessage(BaseModel):
message: str
context: Optional[Dict[str, Any]] = None
class ChatResponse(BaseModel):
response: str
tool_calls: Optional[List[Dict[str, Any]]] = None
reasoning: Optional[List[str]] = None
# Import GödelOS components - with fallback handling for reliability
try:
from backend.godelos_integration import GödelOSIntegration
GODELOS_AVAILABLE = True
except ImportError as e:
logger.warning(f"GödelOS integration not available: {e}")
GödelOSIntegration = None
GODELOS_AVAILABLE = False
# Use unified WebSocket manager (no external dependency)
class WebSocketManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: Union[str, dict]):
if isinstance(message, dict):
message = json.dumps(message)
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
pass # Connection closed
async def broadcast_cognitive_update(self, event: dict):
"""Broadcast cognitive update event to all connected clients"""
# Allow callers to send either a raw event dict or an already-wrapped
# { type: 'cognitive_event', data: {...} } message. Normalize to raw event.
try:
inner_event = event
if isinstance(event, dict) and event.get("type") == "cognitive_event" and isinstance(event.get("data"), dict):
inner_event = event.get("data")
message = {
"type": "cognitive_event",
"timestamp": inner_event.get("timestamp", ""),
"data": inner_event
}
except Exception:
# Fallback if anything unexpected
message = {
"type": "cognitive_event",
"timestamp": event.get("timestamp", ""),
"data": event
}
await self.broadcast(message)
async def broadcast_consciousness_update(self, consciousness_data: dict):
"""Broadcast consciousness update to all connected clients"""
try:
message = {
"type": "consciousness_update",
"timestamp": consciousness_data.get("timestamp", time.time()),
"data": consciousness_data
}
await self.broadcast(message)
except Exception as e:
logger.error(f"Error broadcasting consciousness update: {e}")
def has_connections(self) -> bool:
return len(self.active_connections) > 0
WEBSOCKET_MANAGER_AVAILABLE = True
# Import LLM tool integration
try:
from backend.llm_tool_integration import ToolBasedLLMIntegration
LLM_INTEGRATION_AVAILABLE = True
except ImportError as e:
logger.warning(f"LLM integration not available: {e}")
# Create a mock LLM integration for basic functionality
class MockToolBasedLLMIntegration:
def __init__(self, godelos_integration):
self.godelos_integration = godelos_integration
self.tools = []
async def test_integration(self):
return {"test_successful": True, "tool_calls": 0}
async def process_query(self, query):
return {
"response": f"Processing query: '{query}' - Basic cognitive processing active (mock LLM mode)",
"confidence": 0.8,
"reasoning_trace": ["Query received", "Basic processing applied", "Response generated"],
"sources": ["internal_reasoning"]
}
ToolBasedLLMIntegration = MockToolBasedLLMIntegration
LLM_INTEGRATION_AVAILABLE = True
# Import LLM cognitive driver for consciousness assessment
try:
from backend.llm_cognitive_driver import LLMCognitiveDriver
LLM_COGNITIVE_DRIVER_AVAILABLE = True
except ImportError as e:
logger.warning(f"LLM cognitive driver not available: {e}")
LLM_COGNITIVE_DRIVER_AVAILABLE = False
# Import additional services with fallbacks
# Import each service independently so a failure in one (e.g. thinc/spaCy for
# knowledge_management) doesn't take down the ingestion service.
knowledge_ingestion_service = None
knowledge_management_service = None
knowledge_pipeline_service = None
KNOWLEDGE_SERVICES_AVAILABLE = False
try:
from backend.knowledge_ingestion import knowledge_ingestion_service as _kis
knowledge_ingestion_service = _kis
KNOWLEDGE_SERVICES_AVAILABLE = True
except ImportError as e:
logger.warning(f"Knowledge ingestion service not available: {e}")
try:
from backend.knowledge_management import knowledge_management_service as _kms
knowledge_management_service = _kms
except ImportError as e:
logger.warning(f"Knowledge management service not available: {e}")
try:
from backend.knowledge_pipeline_service import knowledge_pipeline_service as _kps
knowledge_pipeline_service = _kps
except ImportError as e:
logger.warning(f"Knowledge pipeline service not available: {e}")
# Import production vector database
try:
from backend.core.vector_service import get_vector_database, init_vector_database
from backend.core.vector_endpoints import router as vector_db_router
VECTOR_DATABASE_AVAILABLE = True
logger.info("Production vector database available")
except ImportError as e:
logger.warning(f"Production vector database not available, using fallback: {e}")
get_vector_database = None
init_vector_database = None
vector_db_router = None
VECTOR_DATABASE_AVAILABLE = False
# Import distributed vector database
try:
from backend.api.distributed_vector_router import router as distributed_vector_router
DISTRIBUTED_VECTOR_AVAILABLE = True
logger.info("Distributed vector database available")
except ImportError as e:
logger.warning(f"Distributed vector database not available: {e}")
distributed_vector_router = None
DISTRIBUTED_VECTOR_AVAILABLE = False
try:
from backend.enhanced_cognitive_api import router as enhanced_cognitive_router
from backend.transparency_endpoints import router as transparency_router, initialize_transparency_system
ENHANCED_APIS_AVAILABLE = True
except ImportError as e:
logger.warning(f"Enhanced APIs not available: {e}")
enhanced_cognitive_router = None
transparency_router = None
ENHANCED_APIS_AVAILABLE = False
# Import consciousness engine and cognitive manager
try:
from backend.core.consciousness_engine import ConsciousnessEngine
from backend.core.cognitive_manager import CognitiveManager
from backend.core.cognitive_transparency import transparency_engine, initialize_transparency_engine
CONSCIOUSNESS_AVAILABLE = True
except ImportError as e:
logger.warning(f"Consciousness engine not available: {e}")
ConsciousnessEngine = None
CognitiveManager = None
CONSCIOUSNESS_AVAILABLE = False
# Import unified consciousness engine
try:
from backend.core.unified_consciousness_engine import UnifiedConsciousnessEngine
from backend.core.enhanced_websocket_manager import EnhancedWebSocketManager
UNIFIED_CONSCIOUSNESS_AVAILABLE = True
logger.info("✅ Unified consciousness engine available")
except ImportError as e:
logger.warning(f"Unified consciousness engine not available: {e}")
UnifiedConsciousnessEngine = None
EnhancedWebSocketManager = None
UNIFIED_CONSCIOUSNESS_AVAILABLE = False
# Import dormant module manager
try:
from backend.core.dormant_module_manager import DormantModuleManager
DORMANT_MODULE_MANAGER_AVAILABLE = True
except ImportError as e:
logger.warning(f"DormantModuleManager not available: {e}")
DormantModuleManager = None
DORMANT_MODULE_MANAGER_AVAILABLE = False
# Global service instances - using Any to avoid type annotation issues
godelos_integration = None
websocket_manager = None
enhanced_websocket_manager = None
unified_consciousness_engine = None
tool_based_llm = None
cognitive_manager = None
dormant_module_manager = None
self_modification_engine = None
# Removed cognitive_streaming_task - no longer using synthetic streaming
# Observability instances
correlation_tracker = CorrelationTracker()
# Health normalization (single source of truth)
def score_to_label(score: Optional[float]) -> str:
"""Convert numeric health score (0.0-1.0) to categorical label."""
if score is None:
return "unknown"
# Handle NaN values
if isinstance(score, float) and (score != score): # NaN check
return "unknown"
if score >= 0.8:
return "healthy"
if score >= 0.4:
return "degraded"
return "down"
def get_system_health_with_labels() -> Dict[str, Any]:
"""Get system health with both numeric values and derived labels."""
# Get actual health scores from components
health_scores = {
"websocketConnection": 1.0 if websocket_manager and len(websocket_manager.active_connections) > 0 else 0.0,
"pipeline": 0.85, # Mock value, should come from pipeline service
"knowledgeStore": 0.92, # Mock value, should come from knowledge store
"vectorIndex": 0.88, # Mock value, should come from vector index
}
# Compute labels from scores
labels = {key: score_to_label(value) for key, value in health_scores.items()}
return {
**health_scores,
"_labels": labels
}
def get_manifest_consciousness_canonical() -> Dict[str, Any]:
"""Get manifest consciousness in canonical camelCase format."""
return {
"attention": {
"intensity": 0.7,
"focus": ["System monitoring"],
"coverage": 0.85
},
"awareness": {
"level": 0.8,
"breadth": 0.75
},
"metaReflection": {
"depth": 0.6,
"coherence": 0.85
},
"processMonitoring": {
"latency": 150.0, # ms
"throughput": 0.9
}
}
def get_knowledge_stats() -> Dict[str, Any]:
"""Get knowledge statistics."""
return {
"totalConcepts": 0,
"totalConnections": 0,
"totalDocuments": 0
}
# Simulated cognitive state for fallback (legacy format)
cognitive_state = {
"processing_load": 0.65,
"active_queries": 0,
"attention_focus": {
"primary": "System monitoring",
"secondary": ["Background processing", "Memory consolidation"],
"intensity": 0.7
},
"working_memory": {
"capacity": 7,
"current_items": 3,
"items": ["Query processing", "Knowledge retrieval", "Response generation"]
},
"metacognitive_status": {
"self_awareness": 0.8,
"confidence": 0.75,
"uncertainty": 0.25,
"learning_rate": 0.6
}
}
async def initialize_core_services():
"""Initialize core services with proper error handling."""
global godelos_integration, websocket_manager, enhanced_websocket_manager, unified_consciousness_engine, tool_based_llm, cognitive_manager, transparency_engine
# Initialize WebSocket manager
websocket_manager = WebSocketManager()
logger.info("✅ WebSocket manager initialized")
# Initialize enhanced WebSocket manager for consciousness streaming
if UNIFIED_CONSCIOUSNESS_AVAILABLE:
try:
enhanced_websocket_manager = EnhancedWebSocketManager()
logger.info("✅ Enhanced WebSocket manager initialized for consciousness streaming")
except Exception as e:
logger.error(f"❌ Failed to initialize enhanced WebSocket manager: {e}")
enhanced_websocket_manager = websocket_manager # Fallback to basic manager
else:
enhanced_websocket_manager = websocket_manager
# Initialize transparency engine with websocket manager
transparency_engine = initialize_transparency_engine(enhanced_websocket_manager)
logger.info("✅ Cognitive transparency engine initialized with WebSocket integration")
# Initialize GödelOS integration if available
if GODELOS_AVAILABLE:
try:
godelos_integration = GödelOSIntegration()
await godelos_integration.initialize()
logger.info("✅ GödelOS integration initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize GödelOS integration: {e}")
godelos_integration = None
# Initialize LLM tool integration if available
if LLM_INTEGRATION_AVAILABLE:
try:
tool_based_llm = ToolBasedLLMIntegration(godelos_integration)
test_result = await tool_based_llm.test_integration()
if test_result.get("test_successful", False):
logger.info(f"✅ Tool-based LLM integration initialized - {test_result.get('tool_calls', 0)} tools available")
else:
logger.warning("⚠️ Tool-based LLM integration test failed, but system is operational")
except Exception as e:
logger.error(f"❌ Failed to initialize LLM integration: {e}")
tool_based_llm = None
# Initialize LLM cognitive driver for consciousness assessment
llm_cognitive_driver = None
if LLM_COGNITIVE_DRIVER_AVAILABLE:
try:
llm_cognitive_driver = LLMCognitiveDriver()
logger.info("✅ LLM cognitive driver initialized for consciousness assessment")
except Exception as e:
logger.error(f"❌ Failed to initialize LLM cognitive driver: {e}")
llm_cognitive_driver = None
# Initialize cognitive manager with consciousness engine if available
if CONSCIOUSNESS_AVAILABLE and (llm_cognitive_driver or tool_based_llm):
try:
# Use LLM cognitive driver for consciousness if available, otherwise fall back to tool-based LLM
llm_driver_for_consciousness = llm_cognitive_driver if llm_cognitive_driver else tool_based_llm
# Correct argument order: (godelos_integration, llm_driver, knowledge_pipeline, websocket_manager)
cognitive_manager = CognitiveManager(
godelos_integration=godelos_integration,
llm_driver=llm_driver_for_consciousness,
knowledge_pipeline=None,
websocket_manager=enhanced_websocket_manager,
)
await cognitive_manager.initialize()
driver_type = "LLM cognitive driver" if llm_cognitive_driver else "tool-based LLM"
logger.info(f"✅ Cognitive manager with consciousness engine initialized successfully using {driver_type}")
# Bootstrap consciousness after initialization
if hasattr(cognitive_manager, 'consciousness_engine') and cognitive_manager.consciousness_engine:
try:
ce = cognitive_manager.consciousness_engine
if not ce.is_bootstrap_complete():
logger.info("🌅 Bootstrapping consciousness in cognitive manager...")
await ce.bootstrap_consciousness()
logger.info("✅ Consciousness engine bootstrapped successfully")
else:
logger.info("🟡 Consciousness engine bootstrap already completed; skipping duplicate call.")
except Exception as bootstrap_error:
logger.warning(f"⚠️ Consciousness bootstrap warning (non-fatal): {bootstrap_error}")
# Update replay endpoints with cognitive manager
try:
from backend.api.replay_endpoints import setup_replay_endpoints
setup_replay_endpoints(app, cognitive_manager)
logger.info("✅ Replay endpoints updated with cognitive manager")
except Exception as e:
logger.warning(f"Failed to update replay endpoints: {e}")
except Exception as e:
logger.error(f"❌ Failed to initialize cognitive manager: {e}")
cognitive_manager = None
# Initialize unified consciousness engine if available
if UNIFIED_CONSCIOUSNESS_AVAILABLE:
try:
# Use the enhanced websocket manager and LLM driver
llm_driver_for_consciousness = llm_cognitive_driver if llm_cognitive_driver else tool_based_llm
unified_consciousness_engine = UnifiedConsciousnessEngine(
websocket_manager=enhanced_websocket_manager,
llm_driver=llm_driver_for_consciousness
)
await unified_consciousness_engine.initialize_components()
logger.info("✅ Unified consciousness engine initialized successfully")
# Set the consciousness engine reference in the enhanced websocket manager for real-time data
if hasattr(enhanced_websocket_manager, 'set_consciousness_engine'):
enhanced_websocket_manager.set_consciousness_engine(unified_consciousness_engine)
# Start the consciousness loop
await unified_consciousness_engine.start_consciousness_loop()
logger.info("🧠 Unified consciousness loop started")
# Note: Consciousness bootstrap is handled in cognitive_manager initialization above
# to avoid duplicate bootstrap calls
except Exception as e:
logger.error(f"❌ Failed to initialize unified consciousness engine: {e}")
unified_consciousness_engine = None
async def initialize_optional_services():
"""Initialize optional advanced services."""
global godelos_integration
# Initialize knowledge services if available
if KNOWLEDGE_SERVICES_AVAILABLE and knowledge_ingestion_service and knowledge_management_service:
try:
# Initialize knowledge ingestion service with websocket manager
logger.info(f"🔍 UNIFIED_SERVER: Initializing knowledge_ingestion_service with websocket_manager: {websocket_manager is not None}")
await knowledge_ingestion_service.initialize(websocket_manager)
await knowledge_management_service.initialize()
if knowledge_pipeline_service and websocket_manager:
await knowledge_pipeline_service.initialize(websocket_manager)
# Wire into cognitive manager if available
if cognitive_manager is not None:
cognitive_manager.knowledge_pipeline = knowledge_pipeline_service
logger.info("✅ Knowledge services initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize knowledge services: {e}")
# Initialize production vector database (synchronous initialization)
if VECTOR_DATABASE_AVAILABLE:
try:
# Use ThreadPoolExecutor with timeout for resilient model initialization
import asyncio
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def _init_vector_db():
"""Initialize vector database in thread."""
if init_vector_database:
init_vector_database()
elif get_vector_database:
get_vector_database()
# Initialize with timeout to avoid hanging on model downloads
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=1) as executor:
try:
await asyncio.wait_for(
loop.run_in_executor(executor, _init_vector_db),
timeout=30.0 # 30 second timeout
)
logger.info("✅ Production vector database initialized successfully!")
except asyncio.TimeoutError:
logger.warning("⚠️ Vector database initialization timed out - will retry on demand")
except Exception as e:
logger.warning(f"⚠️ Vector database initialization failed: {e} - will retry on demand")
# Wire telemetry notifier for vector DB recoverable errors
try:
from backend.core.vector_service import set_telemetry_notifier
if websocket_manager is not None:
def _notify(event: dict):
# Schedule async broadcast without blocking
try:
if websocket_manager:
asyncio.create_task(websocket_manager.broadcast_cognitive_update(event))
except Exception:
pass
set_telemetry_notifier(_notify)
logger.info("✅ Vector DB telemetry notifier wired to WebSocket manager")
except Exception as e:
logger.warning(f"Could not wire Vector DB telemetry notifier: {e}")
except Exception as e:
logger.error(f"❌ Failed to initialize vector database: {e}")
import traceback
logger.error(f"❌ Detailed error: {traceback.format_exc()}")
# Initialize cognitive transparency API - CRITICAL FOR UNIFIED KG!
if ENHANCED_APIS_AVAILABLE:
try:
from backend.cognitive_transparency_integration import cognitive_transparency_api
# Initialize the cognitive transparency API with GödelOS integration
logger.info("🔍 UNIFIED_SERVER: Initializing cognitive transparency API for unified KG...")
await cognitive_transparency_api.initialize(godelos_integration)
logger.info("✅ Cognitive transparency API initialized successfully - unified KG is ready!")
# Also initialize the transparency system
if initialize_transparency_system:
await initialize_transparency_system()
logger.info("✅ Transparency system initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize cognitive transparency system: {e}")
# Log more details about the failure
import traceback
logger.error(f"❌ Detailed error: {traceback.format_exc()}")
# Initialize dormant module manager
global dormant_module_manager
if DORMANT_MODULE_MANAGER_AVAILABLE and DormantModuleManager is not None and godelos_integration is not None:
try:
dormant_module_manager = DormantModuleManager()
dormant_module_manager.initialize(godelos_integration, enhanced_websocket_manager or websocket_manager)
logger.info("✅ Dormant module manager initialized — 8 cognitive subsystems activated")
except Exception as e:
logger.error(f"❌ Failed to initialize dormant module manager: {e}")
dormant_module_manager = None
# REMOVED: continuous_cognitive_streaming() function
# This function was generating synthetic cognitive events every 4 seconds with hardcoded values.
# Real cognitive events should be generated by actual system state changes, not periodic broadcasting.
# Hot-reloader for ontology files (Issue #97)
_hot_reloader = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
global startup_time, self_modification_engine, _hot_reloader
# Startup
startup_time = time.time()
logger.info("🚀 Starting GödelOS Unified Server...")
# Initialize SelfModificationEngine
try:
from backend.core.self_modification_engine import SelfModificationEngine
self_modification_engine = SelfModificationEngine()
logger.info("✅ SelfModificationEngine initialized")
except Exception as e:
logger.exception("Failed to initialize SelfModificationEngine: %s", e)
self_modification_engine = None
# Initialize core services first
await initialize_core_services()
# Initialize optional services
await initialize_optional_services()
# Set up consciousness engine in endpoints after initialization
if UNIFIED_CONSCIOUSNESS_AVAILABLE and unified_consciousness_engine and enhanced_websocket_manager:
try:
from backend.api.consciousness_endpoints import set_consciousness_engine
set_consciousness_engine(unified_consciousness_engine, enhanced_websocket_manager)
logger.info("✅ Consciousness engine connected to API endpoints")
except Exception as e:
logger.error(f"Failed to connect consciousness engine to endpoints: {e}")
# Initialize consciousness emergence detector
try:
from backend.core.consciousness_emergence_detector import (
ConsciousnessEmergenceDetector,
UnifiedConsciousnessObservatory,
)
from backend.api.consciousness_endpoints import set_emergence_detector, set_observatory
_detector = ConsciousnessEmergenceDetector(websocket_manager=websocket_manager)
set_emergence_detector(_detector)
_observatory = UnifiedConsciousnessObservatory(_detector)
await _observatory.start()
set_observatory(_observatory)
logger.info("✅ Consciousness emergence detector and observatory initialized")
except Exception as e:
logger.error(f"Failed to initialize consciousness emergence detector: {e}")
# Initialize autonomous goal generator and creative synthesis engine
try:
from backend.core.autonomous_goal_engine import AutonomousGoalGenerator, CreativeSynthesisEngine
from backend.api.consciousness_endpoints import set_goal_engine
_goal_generator = AutonomousGoalGenerator()
_creative_engine = CreativeSynthesisEngine()
set_goal_engine(_goal_generator, _creative_engine)
# Seed with a baseline generation so goals exist immediately
await _goal_generator.generate({})
logger.info("✅ Autonomous goal generator and creative synthesis engine initialized")
except Exception as e:
logger.error(f"Failed to initialize autonomous goal engine: {e}")
# Initialize ontology hot-reloader for knowledge graph persistence (Issue #97)
try:
import os as _os
ontology_dir = _os.environ.get("GODELOS_ONTOLOGY_DIR", "")
if ontology_dir:
from godelOS.core_kr.knowledge_store.hot_reloader import OntologyHotReloader
def _on_triple_add(subject, predicate, obj):
logger.debug("Hot-reload: +triple (%s, %s, %s)", subject, predicate, obj)
def _on_triple_remove(subject, predicate, obj):
logger.debug("Hot-reload: -triple (%s, %s, %s)", subject, predicate, obj)
_hot_reloader = OntologyHotReloader(
watch_dir=ontology_dir,
on_add=_on_triple_add,
on_remove=_on_triple_remove,
)
_hot_reloader.start()
logger.info("✅ Ontology hot-reloader watching %s", ontology_dir)
else:
logger.info("ℹ Ontology hot-reloader inactive (set GODELOS_ONTOLOGY_DIR to enable)")
except Exception as e:
logger.error(f"Failed to initialize ontology hot-reloader: {e}")
# Eagerly initialize the agentic daemon system so the singleton is created
# with all available dependencies (especially consciousness_engine).
try:
from backend.core.agentic_daemon_system import get_agentic_daemon_system
await get_agentic_daemon_system(
cognitive_manager=cognitive_manager,
knowledge_pipeline=knowledge_pipeline_service if KNOWLEDGE_SERVICES_AVAILABLE else None,
websocket_manager=websocket_manager,
consciousness_engine=unified_consciousness_engine,
)
logger.info("✅ Agentic daemon system initialized with consciousness engine")
except Exception as e:
logger.error(f"Failed to initialize agentic daemon system: {e}")
# REMOVED: Synthetic cognitive streaming - replaced with real event-driven updates
# cognitive_streaming_task = asyncio.create_task(continuous_cognitive_streaming())
logger.info("✅ Synthetic cognitive streaming disabled - using event-driven updates only")
# Start dormant-module ticker background task
# Initialise to None before the conditional block so the shutdown section
# can always safely check it regardless of whether startup succeeded.
_dormant_ticker_task = None
if dormant_module_manager is not None:
async def _dormant_modules_ticker():
"""Tick all active dormant modules every 2 seconds (same cadence as consciousness loop)."""
while True:
try:
await dormant_module_manager.tick()
except Exception as exc:
logger.debug("Dormant module ticker error: %s", exc)
await asyncio.sleep(2.0)
_dormant_ticker_task = asyncio.create_task(_dormant_modules_ticker())
logger.info("🔄 Dormant module ticker started")
logger.info("🎉 GödelOS Unified Server fully initialized!")
yield
# Shutdown
logger.info("🛑 Shutting down GödelOS Unified Server...")
if _dormant_ticker_task is not None:
_dormant_ticker_task.cancel()
try:
await _dormant_ticker_task
except asyncio.CancelledError:
logger.debug("✅ Dormant module ticker stopped cleanly")
# Stop the consciousness observatory if it was started
try:
from backend.api.consciousness_endpoints import get_observatory
_obs = get_observatory()
if _obs is not None:
await _obs.stop()
except Exception:
pass
# Stop the ontology hot-reloader if active
try:
if _hot_reloader is not None:
_hot_reloader.stop()
except Exception:
pass
# No synthetic streaming task to cancel
logger.info("✅ Shutdown complete")
# Server start time for metrics
server_start_time = time.time()
# Create FastAPI app
app = FastAPI(
title="GödelOS Unified Cognitive API",
description="Consolidated cognitive architecture API with full functionality",
version="2.0.0",
lifespan=lifespan
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include enhanced routers if available
# NOTE: Disabling external enhanced_cognitive_router as we have local implementations
if ENHANCED_APIS_AVAILABLE:
# Skip enhanced_cognitive_router to avoid conflicts with local endpoints
# if enhanced_cognitive_router:
# app.include_router(enhanced_cognitive_router, prefix="/api/enhanced-cognitive", tags=["Enhanced Cognitive API"])
if transparency_router:
app.include_router(transparency_router)
# Include vector database router
if VECTOR_DATABASE_AVAILABLE and vector_db_router:
app.include_router(vector_db_router, tags=["Vector Database Management"])
# Include distributed vector database router
if DISTRIBUTED_VECTOR_AVAILABLE and distributed_vector_router:
app.include_router(distributed_vector_router, prefix="/api/distributed-vector", tags=["Distributed Vector Search"])
# Include adaptive ingestion router
try:
from backend.api.adaptive_ingestion_endpoints import router as adaptive_ingestion_router
app.include_router(adaptive_ingestion_router, tags=["Adaptive Ingestion"])
logger.info("Adaptive ingestion endpoints included")
except ImportError as e:
logger.warning(f"Adaptive ingestion endpoints not available: {e}")
except Exception as e:
logger.error(f"Failed to setup adaptive ingestion endpoints: {e}")
# Include agentic daemon management router
try:
from backend.api.agentic_daemon_endpoints import router as agentic_daemon_router
app.include_router(agentic_daemon_router, tags=["Agentic Daemon System"])
AGENTIC_DAEMON_AVAILABLE = True
logger.info("Agentic daemon management endpoints available")
except ImportError as e:
logger.warning(f"Agentic daemon endpoints not available: {e}")
AGENTIC_DAEMON_AVAILABLE = False
except Exception as e:
logger.error(f"Failed to setup agentic daemon endpoints: {e}")
AGENTIC_DAEMON_AVAILABLE = False
# Include enhanced knowledge management router
try:
from backend.api.knowledge_management_endpoints import router as knowledge_management_router
app.include_router(knowledge_management_router, tags=["Knowledge Management"])
KNOWLEDGE_MANAGEMENT_AVAILABLE = True
logger.info("Enhanced knowledge management endpoints available")
except ImportError as e:
logger.warning(f"Knowledge management endpoints not available: {e}")
KNOWLEDGE_MANAGEMENT_AVAILABLE = False
except Exception as e:
logger.error(f"Failed to setup knowledge management endpoints: {e}")
KNOWLEDGE_MANAGEMENT_AVAILABLE = False
# Include unified consciousness endpoints
try:
from backend.api.consciousness_endpoints import router as consciousness_router, set_consciousness_engine
app.include_router(consciousness_router, tags=["Unified Consciousness"])
# Set consciousness engine reference after initialization
if UNIFIED_CONSCIOUSNESS_AVAILABLE and unified_consciousness_engine and enhanced_websocket_manager:
set_consciousness_engine(unified_consciousness_engine, enhanced_websocket_manager)
logger.info("✅ Unified consciousness endpoints available with engine integration")
else:
logger.info("✅ Unified consciousness endpoints available (engine will be set later)")
CONSCIOUSNESS_ENDPOINTS_AVAILABLE = True
except ImportError as e:
logger.warning(f"Consciousness endpoints not available: {e}")
CONSCIOUSNESS_ENDPOINTS_AVAILABLE = False
except Exception as e:
logger.error(f"Failed to setup consciousness endpoints: {e}")
CONSCIOUSNESS_ENDPOINTS_AVAILABLE = False
# Setup replay harness endpoints
try:
from backend.api.replay_endpoints import setup_replay_endpoints
setup_replay_endpoints(app, None) # Will be updated with cognitive_manager once available
logger.info("Replay harness endpoints initialized")
except ImportError as e:
logger.warning(f"Replay endpoints not available: {e}")
except Exception as e:
logger.error(f"Failed to setup replay endpoints: {e}")
# Include external API router (REST + WebSocket surface for external consumers)
try:
from backend.api.external_api import router as external_api_router, configure as configure_external_api
app.include_router(external_api_router, tags=["External API"])
# Wire runtime dependencies into the external API module.
configure_external_api(
godelos_integration=godelos_integration,
websocket_manager=websocket_manager,
cognitive_state=cognitive_state,
startup_time=server_start_time,
tool_based_llm=tool_based_llm,
)
EXTERNAL_API_AVAILABLE = True
logger.info("External API endpoints available at /api/v1/external/*")
except ImportError as e:
logger.warning(f"External API endpoints not available: {e}")
EXTERNAL_API_AVAILABLE = False
except Exception as e:
logger.error(f"Failed to setup external API endpoints: {e}")
EXTERNAL_API_AVAILABLE = False
# Consciousness bootstrap endpoint (manual trigger for 6-phase bootstrap)
@app.post("/api/consciousness/bootstrap")
async def api_consciousness_bootstrap(force: bool = Query(default=False)):
"""Trigger the consciousness bootstrap sequence and stream progress.
Uses CognitiveManager.consciousness_engine if available.
Returns skipped if already completed and force is False.
"""
try:
if not cognitive_manager or not hasattr(cognitive_manager, 'consciousness_engine') or cognitive_manager.consciousness_engine is None:
raise HTTPException(status_code=503, detail="Consciousness engine not available")
ce = cognitive_manager.consciousness_engine
# Skip duplicate unless force
if hasattr(ce, 'is_bootstrap_complete') and ce.is_bootstrap_complete() and not force:
# Notify clients that bootstrap is already complete
try:
if enhanced_websocket_manager and hasattr(enhanced_websocket_manager, 'broadcast_consciousness_update'):
await enhanced_websocket_manager.broadcast_consciousness_update({
'type': 'bootstrap_progress',
'phase': 'Already Completed',
'awareness_level': getattr(ce.current_state, 'awareness_level', 1.0) or 1.0,
'timestamp': time.time(),
'message': 'Bootstrap already completed'
})
except Exception:
pass
return {"status": "skipped", "message": "Bootstrap already completed"}
# Announce initiation for immediate UI feedback
try:
if enhanced_websocket_manager and hasattr(enhanced_websocket_manager, 'broadcast_consciousness_update'):
await enhanced_websocket_manager.broadcast_consciousness_update({
'type': 'bootstrap_progress',
'phase': 'Bootstrap Initiated',
'awareness_level': getattr(ce.current_state, 'awareness_level', 0.0) or 0.0,
'timestamp': time.time(),
'message': 'Starting 6-phase consciousness bootstrap'
})
except Exception:
pass
await ce.bootstrap_consciousness()
return {"status": "started", "message": "Bootstrap sequence executed"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error triggering consciousness bootstrap: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Root and health endpoints
@app.get("/")
async def root():
"""Root endpoint providing comprehensive API information."""
return {
"name": "GödelOS Unified Cognitive API",
"version": "2.0.0",
"status": "operational",
"services": {
"godelos_integration": GODELOS_AVAILABLE and godelos_integration is not None,
"llm_integration": LLM_INTEGRATION_AVAILABLE and tool_based_llm is not None,
"knowledge_services": KNOWLEDGE_SERVICES_AVAILABLE,
"enhanced_apis": ENHANCED_APIS_AVAILABLE,
"websocket_streaming": websocket_manager is not None
},
"endpoints": {
"core": ["/", "/health", "/api/health"],
"cognitive": ["/cognitive/state", "/api/cognitive/state"],
"llm": ["/api/llm-chat/message", "/api/llm-tools/test", "/api/llm-tools/available"],
"streaming": ["/ws/cognitive-stream"],
"enhanced": ["/api/enhanced-cognitive/*", "/api/transparency/*"] if ENHANCED_APIS_AVAILABLE else []
},
"features": [
"Unified server architecture",
"Tool-based LLM integration",
"Real-time cognitive streaming",
"Advanced knowledge processing",
"Cognitive transparency",
"WebSocket live updates"
]
}
@app.get("/health")
async def health_check():
"""Comprehensive health check endpoint with subsystem probes."""
# Base service status
services = {
"godelos": "active" if godelos_integration else "inactive",
"llm_tools": "active" if tool_based_llm else "inactive",