diff --git a/src/powermem/core/memory.py b/src/powermem/core/memory.py index 69f2aab..bb136dd 100644 --- a/src/powermem/core/memory.py +++ b/src/powermem/core/memory.py @@ -528,7 +528,7 @@ def add( def _simple_add( self, - messages, + messages: List[Union[str, Dict[str, Any]]], user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None, @@ -538,107 +538,122 @@ def _simple_add( memory_type: Optional[str] = None, prompt: Optional[str] = None, ) -> Dict[str, Any]: - """Simple add mode: direct storage without intelligence.""" - # Parse messages into content - if isinstance(messages, str): - content = messages - elif isinstance(messages, dict): - content = messages.get("content", "") - elif isinstance(messages, list): - content = "\n".join([msg.get("content", "") for msg in messages if isinstance(msg, dict) and msg.get("content")]) - else: - raise ValueError("messages must be str, dict, or list[dict]") - - # Validate content is not empty - if not content or not content.strip(): - logger.error(f"Cannot store empty content. Messages: {messages}") - raise ValueError(f"Cannot create memory with empty content. Original messages: {messages}") - - # Select embedding service based on metadata (for sub-store routing) - embedding_service = self._get_embedding_service(metadata) + """ + Add messages to memory by processing each message, + generating embedding, hashing, metadata, logging, telemetry, and graph storage. + """ - # Generate embedding - embedding = embedding_service.embed(content, memory_action="add") - - # Disabled LLM-based importance evaluation to save tokens - # Process with intelligence manager - # enhanced_metadata = self.intelligence.process_metadata(content, metadata) - enhanced_metadata = metadata # Use original metadata without LLM evaluation - - # Intelligent plugin annotations - extra_fields = {} - if self._intelligence_plugin and self._intelligence_plugin.enabled: - extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) - + created_results = [] - # Generate content hash for deduplication - content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() + for message in messages: + # Normalize message content and metadata + if isinstance(message, str): + message_content = message + message_metadata = {} + elif isinstance(message, dict): + message_content = message.get("content") or message.get("text") or "" + message_metadata = message.get("metadata", {}) + else: + raise ValueError("Each message must be a string or dict") - # Extract category from enhanced metadata if present - category = "" - if enhanced_metadata and isinstance(enhanced_metadata, dict): - category = enhanced_metadata.get("category", "") - # Remove category from metadata to avoid duplication - enhanced_metadata = {k: v for k, v in enhanced_metadata.items() if k != "category"} + # Combine global and message-level metadata and filters + combined_metadata = dict(metadata or {}) + combined_metadata.update(message_metadata) + if filters: + combined_metadata.update(filters) - # Final validation before storage - if not content or not content.strip(): - raise ValueError(f"Refusing to store empty content. Original messages: {messages}") - - # Use self.agent_id as fallback if agent_id is not provided - agent_id = agent_id or self.agent_id - - # Store in database - memory_data = { - "content": content, - "embedding": embedding, - "user_id": user_id, - "agent_id": agent_id, - "run_id": run_id, - "hash": content_hash, - "category": category, - "metadata": enhanced_metadata or {}, - "filters": filters or {}, - "created_at": datetime.utcnow(), - "updated_at": datetime.utcnow(), - } + _user_id = user_id or message.get("user_id") + _agent_id = agent_id or message.get("agent_id") or self.agent_id - if extra_fields: - memory_data.update(extra_fields) - - memory_id = self.storage.add_memory(memory_data) - - # Log audit event - self.audit.log_event("memory.add", { - "memory_id": memory_id, - "user_id": user_id, - "agent_id": agent_id, - "content_length": len(content) - }, user_id=user_id, agent_id=agent_id) - - # Capture telemetry - self.telemetry.capture_event("memory.add", { - "memory_id": memory_id, - "user_id": user_id, - "agent_id": agent_id - }) - - graph_result = self._add_to_graph(messages, filters, user_id, agent_id, run_id) - - result: Dict[str, Any] = { - "results": [{ - "id": memory_id, - "memory": content, - "event": "ADD", - "user_id": user_id, - "agent_id": agent_id, + content = message_content.strip() + if not content: + logger.error(f"Cannot store empty content. Message: {message}") + raise ValueError("Cannot create memory for empty content") + + # Select embedding service and generate embedding + embedding_service = self._get_embedding_service(combined_metadata) + embedding = embedding_service.embed(content, memory_action="add") + + # Metadata enhancement (LLM-based intelligence plugin) + enhanced_metadata = combined_metadata or {} + + extra_fields = {} + if self._intelligence_plugin and self._intelligence_plugin.enabled: + extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) + + # Content hash for deduplication + content_hash = hashlib.md5(content.encode("utf-8")).hexdigest() + + # Extract category from metadata if present + category = "" + if enhanced_metadata and isinstance(enhanced_metadata, dict): + category = enhanced_metadata.get("category", "") + # Remove category from enhanced metadata to avoid duplication + enhanced_metadata = {k: v for k, v in enhanced_metadata.items() if k != "category"} + + # Compose memory data for storage + memory_data = { + "content": content, + "embedding": embedding, + "user_id": _user_id, + "agent_id": _agent_id, "run_id": run_id, - "metadata": metadata, - "created_at": memory_data["created_at"].isoformat() if isinstance(memory_data["created_at"], datetime) else memory_data["created_at"], - }] - } + "hash": content_hash, + "category": category, + "metadata": enhanced_metadata, + "scope": scope, + "memory_type": memory_type, + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow(), + } + if extra_fields: + memory_data.update(extra_fields) + + # Store memory and get ID + memory_id = self.storage.add_memory(memory_data) + + # Audit logging + self.audit.log_event( + "memory.add", + { + "memory_id": memory_id, + "user_id": _user_id, + "agent_id": _agent_id, + "content_length": len(content), + }, + user_id=_user_id, + agent_id=_agent_id, + ) + + # Telemetry capture + self.telemetry.capture_event( + "memory.add", + { + "memory_id": memory_id, + "user_id": _user_id, + "agent_id": _agent_id, + }, + ) + + # Append creation result + created_results.append( + { + "id": memory_id, + "memory": content, + "user_id": _user_id, + "agent_id": _agent_id, + } + ) + + # Optional graph storage addition and relations + graph_result = None + if hasattr(self, "_add_to_graph"): + graph_result = self._add_to_graph(messages, filters, user_id, agent_id, run_id) + + result = {"results": created_results} if graph_result: result["relations"] = graph_result + return result def _intelligent_add( @@ -888,65 +903,6 @@ def _add_to_graph( return self.graph_store.add(data, graph_filters) - def _create_memory( - self, - content: str, - user_id: Optional[str] = None, - agent_id: Optional[str] = None, - run_id: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, - filters: Optional[Dict[str, Any]] = None, - existing_embeddings: Optional[Dict[str, Any]] = None, - ) -> int: - """Create a memory with optional embeddings.""" - # Validate content is not empty - if not content or not content.strip(): - raise ValueError(f"Cannot create memory with empty content: '{content}'") - - # Select embedding service based on metadata (for sub-store routing) - embedding_service = self._get_embedding_service(metadata) - - # Generate or use existing embedding - if existing_embeddings and content in existing_embeddings: - embedding = existing_embeddings[content] - else: - embedding = embedding_service.embed(content, memory_action="add") - - # Disabled LLM-based importance evaluation to save tokens - # Process metadata - # enhanced_metadata = self.intelligence.process_metadata(content, metadata) - enhanced_metadata = metadata # Use original metadata without LLM evaluation - - # Generate content hash - content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() - - # Extract category - category = "" - if enhanced_metadata and isinstance(enhanced_metadata, dict): - category = enhanced_metadata.get("category", "") - enhanced_metadata = {k: v for k, v in enhanced_metadata.items() if k != "category"} - - # Use self.agent_id as fallback if agent_id is not provided - agent_id = agent_id or self.agent_id - - # Create memory data - memory_data = { - "content": content, - "embedding": embedding, - "user_id": user_id, - "agent_id": agent_id, - "run_id": run_id, - "hash": content_hash, - "category": category, - "metadata": enhanced_metadata or {}, - "filters": filters or {}, - "created_at": datetime.utcnow(), - "updated_at": datetime.utcnow(), - } - - memory_id = self.storage.add_memory(memory_data) - - return memory_id def _update_memory( self,