diff --git a/src/memos/mem_feedback/feedback.py b/src/memos/mem_feedback/feedback.py index b8019004d..135058a7d 100644 --- a/src/memos/mem_feedback/feedback.py +++ b/src/memos/mem_feedback/feedback.py @@ -243,7 +243,6 @@ def _single_add_operation( datetime.now().isoformat() ) to_add_memory.metadata.background = new_memory_item.metadata.background - to_add_memory.metadata.sources = [] added_ids = self._retry_db_operation( lambda: self.memory_manager.add([to_add_memory], user_name=user_name, use_batch=False) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 4c0d4dcd0..2745a1bee 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -982,6 +982,9 @@ def _process_multi_modal_data( # Use MultiModalParser to parse the scene data # If it's a list, parse each item; otherwise parse as single message if isinstance(scene_data_info, list): + # Pre-expand multimodal messages + expanded_messages = self._expand_multimodal_messages(scene_data_info) + # Parse each message in the list all_memory_items = [] # Use thread pool to parse each message in parallel, but keep the original order @@ -996,7 +999,7 @@ def _process_multi_modal_data( need_emb=False, **kwargs, ) - for msg in scene_data_info + for msg in expanded_messages ] # collect results in original order for future in futures: @@ -1014,20 +1017,23 @@ def _process_multi_modal_data( if mode == "fast": return fast_memory_items else: + non_file_url_fast_items = [ + item for item in fast_memory_items if not self._is_file_url_only_item(item) + ] + # Part A: call llm in parallel using thread pool fine_memory_items = [] with ContextThreadPoolExecutor(max_workers=4) as executor: future_string = executor.submit( - self._process_string_fine, fast_memory_items, info, custom_tags, **kwargs + self._process_string_fine, non_file_url_fast_items, info, custom_tags, **kwargs ) future_tool = executor.submit( - self._process_tool_trajectory_fine, fast_memory_items, info, **kwargs + self._process_tool_trajectory_fine, non_file_url_fast_items, info, **kwargs ) - # Use general_llm for skill memory extraction (not fine-tuned for this task) future_skill = executor.submit( process_skill_memory_fine, - fast_memory_items=fast_memory_items, + fast_memory_items=non_file_url_fast_items, info=info, searcher=self.searcher, graph_db=self.graph_db, @@ -1039,7 +1045,7 @@ def _process_multi_modal_data( ) future_pref = executor.submit( process_preference_fine, - fast_memory_items, + non_file_url_fast_items, info, self.llm, self.embedder, @@ -1094,19 +1100,21 @@ def _process_transfer_multi_modal_data( **(raw_nodes[0].metadata.info or {}), } + # Filter out file-URL-only items for Part A fine processing (same as _process_multi_modal_data) + non_file_url_nodes = [node for node in raw_nodes if not self._is_file_url_only_item(node)] + fine_memory_items = [] # Part A: call llm in parallel using thread pool with ContextThreadPoolExecutor(max_workers=4) as executor: future_string = executor.submit( - self._process_string_fine, raw_nodes, info, custom_tags, **kwargs + self._process_string_fine, non_file_url_nodes, info, custom_tags, **kwargs ) future_tool = executor.submit( - self._process_tool_trajectory_fine, raw_nodes, info, **kwargs + self._process_tool_trajectory_fine, non_file_url_nodes, info, **kwargs ) - # Use general_llm for skill memory extraction (not fine-tuned for this task) future_skill = executor.submit( process_skill_memory_fine, - raw_nodes, + non_file_url_nodes, info, searcher=self.searcher, llm=self.general_llm, @@ -1118,7 +1126,7 @@ def _process_transfer_multi_modal_data( ) # Add preference memory extraction future_pref = executor.submit( - process_preference_fine, raw_nodes, info, self.general_llm, self.embedder, **kwargs + process_preference_fine, non_file_url_nodes, info, self.llm, self.embedder, **kwargs ) # Collect results @@ -1148,6 +1156,90 @@ def _process_transfer_multi_modal_data( fine_memory_items.extend(items) return fine_memory_items + @staticmethod + def _expand_multimodal_messages(messages: list) -> list: + """ + Expand messages whose ``content`` is a list into individual + sub-messages so that each modality is routed to its specialised + parser during fast-mode parsing. + + For a message like:: + + { + "content": [ + {"type": "text", "text": "Analyze this file"}, + {"type": "file", "file": {"file_data": "https://...", ...}}, + {"type": "image_url", "image_url": {"url": "https://..."}}, + ], + "role": "user", + "chat_time": "03:14 PM on 13 March, 2026", + } + + The result will be:: + + [ + {"content": "Analyze this file", "role": "user", "chat_time": "..."}, + {"type": "file", "file": {"file_data": "https://...", ...}}, + {"type": "image_url", "image_url": {"url": "https://..."}}, + ] + + Messages whose ``content`` is already a plain string (or that are + not dicts) are passed through unchanged. + """ + expanded: list = [] + for msg in messages: + if not isinstance(msg, dict): + expanded.append(msg) + continue + + content = msg.get("content") + if not isinstance(content, list): + expanded.append(msg) + continue + + # ---- content is a list: split by modality ---- + text_parts: list[str] = [] + for part in content: + if not isinstance(part, dict): + text_parts.append(str(part)) + continue + + part_type = part.get("type", "") + if part_type == "text": + text_parts.append(part.get("text", "")) + elif part_type in ("file", "image", "image_url"): + # Extract as a standalone message for its specialised parser + expanded.append(part) + else: + text_parts.append(f"[{part_type}]") + + # Reconstruct a text-only version of the original message + # (preserving role, chat_time, message_id, etc.) + text_content = "\n".join(t for t in text_parts if t.strip()) + if text_content.strip(): + text_msg = {k: v for k, v in msg.items() if k != "content"} + text_msg["content"] = text_content + expanded.append(text_msg) + + return expanded + + @staticmethod + def _is_file_url_only_item(item: TextualMemoryItem) -> bool: + """ + Check if a fast memory item contains only file-URL sources. + Args: + item: TextualMemoryItem to check + + Returns: + True if all sources are file-type with URL info (metadata only) + """ + sources = item.metadata.sources or [] + if not sources: + return False + return all( + getattr(s, "type", None) == "file" and getattr(s, "file_info", None) for s in sources + ) + def get_scene_data_info(self, scene_data: list, type: str) -> list[list[Any]]: """ Convert normalized MessagesType scenes into scene data info.