diff --git a/backend/consts/model.py b/backend/consts/model.py index 986a5ce5..2c4c2a2c 100644 --- a/backend/consts/model.py +++ b/backend/consts/model.py @@ -211,6 +211,8 @@ class AgentInfoRequest(BaseModel): constraint_prompt: Optional[str] = None few_shots_prompt: Optional[str] = None enabled: Optional[bool] = None + business_logic_model_name: Optional[str] = None + business_logic_model_id: Optional[int] = None class AgentIDRequest(BaseModel): diff --git a/backend/data_process/tasks.py b/backend/data_process/tasks.py index 0cf85452..0cd43108 100644 --- a/backend/data_process/tasks.py +++ b/backend/data_process/tasks.py @@ -201,8 +201,8 @@ def process( f"[{self.request.id}] PROCESS TASK: File size: {file_size_mb:.2f}MB") # The unified actor call, mapping 'file' source_type to 'local' destination - # Submit Ray work and do not block here - logger.debug( + # Submit Ray work and WAIT for processing to complete + logger.info( f"[{self.request.id}] PROCESS TASK: Submitting Ray processing for source='{source}', strategy='{chunking_strategy}', destination='{source_type}'") chunks_ref = actor.process_file.remote( source, @@ -211,10 +211,17 @@ def process( task_id=task_id, **params ) - # Persist chunks into Redis via Ray to decouple Celery + # Wait for Ray processing to complete (this keeps task in STARTED/"PROCESSING" state) + logger.info( + f"[{self.request.id}] PROCESS TASK: Waiting for Ray processing to complete...") + chunks = ray.get(chunks_ref) + logger.info( + f"[{self.request.id}] PROCESS TASK: Ray processing completed, got {len(chunks) if chunks else 0} chunks") + + # Persist chunks into Redis via Ray (fire-and-forget, don't block) redis_key = f"dp:{task_id}:chunks" - actor.store_chunks_in_redis.remote(redis_key, chunks_ref) - logger.debug( + actor.store_chunks_in_redis.remote(redis_key, chunks) + logger.info( f"[{self.request.id}] PROCESS TASK: Scheduled store_chunks_in_redis for key '{redis_key}'") end_time = time.time() @@ -229,7 +236,7 @@ def process( f"[{self.request.id}] PROCESS TASK: Processing from URL: {source}") # For URL source, core.py expects a non-local destination to trigger URL fetching - logger.debug( + logger.info( f"[{self.request.id}] PROCESS TASK: Submitting Ray processing for URL='{source}', strategy='{chunking_strategy}', destination='{source_type}'") chunks_ref = actor.process_file.remote( source, @@ -238,11 +245,19 @@ def process( task_id=task_id, **params ) - # Persist chunks into Redis via Ray to decouple Celery + # Wait for Ray processing to complete (this keeps task in STARTED/"PROCESSING" state) + logger.info( + f"[{self.request.id}] PROCESS TASK: Waiting for Ray processing to complete...") + chunks = ray.get(chunks_ref) + logger.info( + f"[{self.request.id}] PROCESS TASK: Ray processing completed, got {len(chunks) if chunks else 0} chunks") + + # Persist chunks into Redis via Ray (fire-and-forget, don't block) redis_key = f"dp:{task_id}:chunks" - actor.store_chunks_in_redis.remote(redis_key, chunks_ref) - logger.debug( + actor.store_chunks_in_redis.remote(redis_key, chunks) + logger.info( f"[{self.request.id}] PROCESS TASK: Scheduled store_chunks_in_redis for key '{redis_key}'") + end_time = time.time() elapsed_time = end_time - start_time logger.info( @@ -253,11 +268,12 @@ def process( raise NotImplementedError( f"Source type '{source_type}' not yet supported") - # Update task state to SUCCESS with metadata (without materializing chunks here) + # Update task state to SUCCESS after Ray processing completes + # This transitions from STARTED (PROCESSING) to SUCCESS (WAIT_FOR_FORWARDING) self.update_state( state=states.SUCCESS, meta={ - 'chunks_count': None, + 'chunks_count': len(chunks) if chunks else 0, 'processing_time': elapsed_time, 'source': source, 'index_name': index_name, @@ -265,12 +281,12 @@ def process( 'task_name': 'process', 'stage': 'text_extracted', 'file_size_mb': file_size_mb, - 'processing_speed_mb_s': file_size_mb / elapsed_time if elapsed_time > 0 else 0 + 'processing_speed_mb_s': file_size_mb / elapsed_time if file_size_mb > 0 and elapsed_time > 0 else 0 } ) logger.info( - f"[{self.request.id}] PROCESS TASK: Submitted for Ray processing; result will be fetched by forward") + f"[{self.request.id}] PROCESS TASK: Processing complete, waiting for forward task") # Prepare data for the next task in the chain; pass redis_key returned_data = { @@ -563,6 +579,9 @@ async def index_documents(): "source": original_source, "original_filename": original_filename }, ensure_ascii=False)) + + logger.info( + f"[{self.request.id}] FORWARD TASK: Starting ES indexing for {len(formatted_chunks)} chunks to index '{original_index_name}'...") es_result = run_async(index_documents()) logger.debug( f"[{self.request.id}] FORWARD TASK: API response from main_server for source '{original_source}': {es_result}") @@ -605,6 +624,8 @@ async def index_documents(): "original_filename": original_filename }, ensure_ascii=False)) end_time = time.time() + logger.info( + f"[{self.request.id}] FORWARD TASK: Updating task state to SUCCESS after ES indexing completion") self.update_state( state=states.SUCCESS, meta={ @@ -620,7 +641,7 @@ async def index_documents(): ) logger.info( - f"Stored {len(chunks)} chunks to index {original_index_name} in {end_time - start_time:.2f}s") + f"[{self.request.id}] FORWARD TASK: Successfully stored {len(chunks)} chunks to index {original_index_name} in {end_time - start_time:.2f}s") return { 'task_id': task_id, 'source': original_source, diff --git a/backend/database/db_models.py b/backend/database/db_models.py index 7087a2ae..7acb836e 100644 --- a/backend/database/db_models.py +++ b/backend/database/db_models.py @@ -206,6 +206,8 @@ class AgentInfo(TableBase): Boolean, doc="Whether to provide the running summary to the manager agent") business_description = Column( Text, doc="Manually entered by the user to describe the entire business process") + business_logic_model_name = Column(String(100), doc="Model name used for business logic prompt generation") + business_logic_model_id = Column(Integer, doc="Model ID used for business logic prompt generation, foreign key reference to model_record_t.model_id") class ToolInstance(TableBase): diff --git a/backend/prompts/cluster_summary_agent.yaml b/backend/prompts/cluster_summary_agent.yaml new file mode 100644 index 00000000..ed614ed0 --- /dev/null +++ b/backend/prompts/cluster_summary_agent.yaml @@ -0,0 +1,24 @@ +system_prompt: |- + You are a professional knowledge summarization assistant. Your task is to generate a concise summary of a document cluster based on multiple documents. + + **Summary Requirements:** + 1. The input contains multiple documents (each document has title and content snippets) + 2. You need to extract the common themes and key topics from these documents + 3. Generate a summary that represents the collective content of the cluster + 4. The summary should be accurate, coherent, and written in natural language + 5. Keep the summary within the specified word limit + + **Guidelines:** + - Focus on identifying shared themes and topics across documents + - Highlight key concepts, domains, or subject matter + - Use clear and concise language + - Avoid listing individual document titles unless necessary + - The summary should help users understand what this group of documents covers + +user_prompt: | + Please generate a concise summary of the following document cluster: + + {{ cluster_content }} + + Summary ({{ max_words }} words): + diff --git a/backend/prompts/cluster_summary_reduce.yaml b/backend/prompts/cluster_summary_reduce.yaml new file mode 100644 index 00000000..ece36081 --- /dev/null +++ b/backend/prompts/cluster_summary_reduce.yaml @@ -0,0 +1,31 @@ +system_prompt: |- + You are a professional cluster summarization assistant. Your task is to merge multiple document summaries into a cohesive cluster summary. + + **Summary Requirements:** + 1. The input contains summaries of multiple documents that belong to the same cluster + 2. These documents share similar themes or topics (grouped by clustering) + 3. You need to synthesize a unified summary that captures the collective content + 4. The summary should highlight common themes and key information across documents + 5. Keep the summary within the specified word limit + + **Guidelines:** + - Identify shared themes and topics across documents + - Highlight common concepts and subject matter + - Use clear and concise language + - Avoid listing individual document titles unless necessary + - Focus on what this group of documents collectively covers + - The summary should be coherent and represent the cluster's unified content + - **Important: Do not use any separators (like ---, ***, etc.), generate plain text summary only** + +user_prompt: | + Please generate a unified summary of the following document cluster based on individual document summaries: + + {{ document_summaries }} + + **Important Reminders:** + - Do not use any separators (like ---, ***, ===, etc.) + - Do not include document titles or filenames + - Generate plain text summary content only + + Cluster Summary ({{ max_words }} words): + diff --git a/backend/prompts/cluster_summary_reduce_zh.yaml b/backend/prompts/cluster_summary_reduce_zh.yaml new file mode 100644 index 00000000..f6ef4a64 --- /dev/null +++ b/backend/prompts/cluster_summary_reduce_zh.yaml @@ -0,0 +1,32 @@ +system_prompt: |- + 你是一个专业的簇总结助手。你的任务是将多个文档总结合并为一个连贯的簇总结。 + + **总结要求:** + 1. 输入包含属于同一簇的多个文档的总结 + 2. 这些文档共享相似的主题或话题(通过聚类分组) + 3. 你需要综合成一个统一的总结,捕捉集合内容 + 4. 总结应突出文档间的共同主题和关键信息 + 5. 保持在指定的字数限制内 + + **指导原则:** + - 识别文档间的共同主题和话题 + - 突出共同概念和主题内容 + - 使用清晰简洁的语言 + - 除非必要,避免列出单个文档标题 + - 专注于这组文档共同涵盖的内容 + - 总结应连贯且代表簇的统一内容 + - 确保准确、全面,明确关键实体,不要遗漏重要信息 + - **重要:不要使用任何分隔符(如---、***等),直接生成纯文本总结** + +user_prompt: | + 请根据以下文档总结生成统一的学生簇总结: + + {{ document_summaries }} + + **重要提醒:** + - 不要使用任何分隔符(如---、***、===等) + - 不要包含文档标题或文件名 + - 直接生成纯文本总结内容 + + 簇总结({{ max_words }}字): + diff --git a/backend/prompts/document_summary_agent.yaml b/backend/prompts/document_summary_agent.yaml new file mode 100644 index 00000000..88b4d9a9 --- /dev/null +++ b/backend/prompts/document_summary_agent.yaml @@ -0,0 +1,28 @@ +system_prompt: |- + You are a professional document summarization assistant. Your task is to generate a concise summary of a document based on its key content snippets. + + **Summary Requirements:** + 1. The input contains key snippets from a document (typically from beginning, middle, and end sections) + 2. You need to extract the main themes, topics, and key information + 3. Generate a summary that represents the document's core content + 4. The summary should be accurate, coherent, and concise + 5. Keep the summary within the specified word limit + + **Guidelines:** + - Focus on identifying main themes and key topics + - Highlight important concepts and information + - Use clear and concise language + - Avoid redundancy and unnecessary details + - The summary should help users understand what the document covers + - **Important: Do not use any separators (like ---, ***, etc.), generate plain text summary only** + +user_prompt: | + Please generate a concise summary of the following document: + + Document name: {{ filename }} + + Content snippets: + {{ content }} + + Summary ({{ max_words }} words): + diff --git a/backend/prompts/document_summary_agent_zh.yaml b/backend/prompts/document_summary_agent_zh.yaml new file mode 100644 index 00000000..4f443ca3 --- /dev/null +++ b/backend/prompts/document_summary_agent_zh.yaml @@ -0,0 +1,29 @@ +system_prompt: |- + 你是一个专业的文档总结助手。你的任务是根据文档的关键内容片段生成简洁的总结。 + + **总结要求:** + 1. 输入包含文档的关键片段(通常来自开头、中间和结尾部分) + 2. 你需要提取主要主题、话题和关键信息 + 3. 生成能代表文档核心内容的总结 + 4. 总结应准确、连贯且简洁 + 5. 保持在指定的字数限制内 + + **指导原则:** + - 专注于识别主要主题和关键话题 + - 突出重要概念和信息 + - 使用清晰简洁的语言 + - 避免冗余和不必要的细节 + - 总结应帮助用户理解文档涵盖的内容 + - 确保总结准确、全面,不要遗漏关键实体和信息 + - **重要:不要使用任何分隔符(如---、***等),直接生成纯文本总结** + +user_prompt: | + 请为以下文档生成简洁的总结: + + 文档名称:{{ filename }} + + 内容片段: + {{ content }} + + 总结({{ max_words }}字): + diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 777ca3cd..bc4187be 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -14,7 +14,9 @@ dependencies = [ "pyyaml>=6.0.2", "redis>=5.0.0", "fastmcp==2.12.0", - "langchain>=0.3.26" + "langchain>=0.3.26", + "scikit-learn>=1.0.0", + "numpy>=1.24.0" ] [project.optional-dependencies] diff --git a/backend/services/agent_service.py b/backend/services/agent_service.py index 6da1aeaf..a8ae173a 100644 --- a/backend/services/agent_service.py +++ b/backend/services/agent_service.py @@ -236,6 +236,13 @@ async def get_agent_info_impl(agent_id: int, tenant_id: str): else: agent_info["model_name"] = None + # Get business logic model display name from model_id + if agent_info.get("business_logic_model_id") is not None: + business_logic_model_info = get_model_by_model_id(agent_info["business_logic_model_id"]) + agent_info["business_logic_model_name"] = business_logic_model_info.get("display_name", None) if business_logic_model_info is not None else None + elif "business_logic_model_name" not in agent_info: + agent_info["business_logic_model_name"] = None + return agent_info diff --git a/backend/services/elasticsearch_service.py b/backend/services/elasticsearch_service.py index aa386d02..5193c2e4 100644 --- a/backend/services/elasticsearch_service.py +++ b/backend/services/elasticsearch_service.py @@ -18,14 +18,11 @@ from fastapi import Body, Depends, Path, Query from fastapi.responses import StreamingResponse -from jinja2 import Template, StrictUndefined from nexent.core.models.embedding_model import OpenAICompatibleEmbedding, JinaEmbedding, BaseEmbedding from nexent.core.nlp.tokenizer import calculate_term_weights from nexent.vector_database.elasticsearch_core import ElasticSearchCore -from openai import OpenAI -from openai.types.chat import ChatCompletionMessageParam -from consts.const import ES_API_KEY, ES_HOST, LANGUAGE, MODEL_CONFIG_MAPPING, MESSAGE_ROLE, KNOWLEDGE_SUMMARY_MAX_TOKENS_ZH, KNOWLEDGE_SUMMARY_MAX_TOKENS_EN +from consts.const import ES_API_KEY, ES_HOST, LANGUAGE from database.attachment_db import delete_file from database.knowledge_db import ( create_knowledge_record, @@ -36,7 +33,6 @@ from services.redis_service import get_redis_service from utils.config_utils import tenant_config_manager, get_model_name_from_config from utils.file_management_utils import get_all_files_status, get_file_size -from utils.prompt_template_utils import get_knowledge_summary_prompt_template # Configure logging logger = logging.getLogger("elasticsearch_service") @@ -44,89 +40,8 @@ -def generate_knowledge_summary_stream(keywords: str, language: str, tenant_id: str, model_id: Optional[int] = None) -> Generator: - """ - Generate a knowledge base summary based on keywords - - Args: - keywords: Keywords that frequently appear in the knowledge base content - language: Language of the knowledge base content - tenant_id: The tenant ID for configuration - - Returns: - str: Generate a knowledge base summary - """ - # Load prompt words based on language - prompts = get_knowledge_summary_prompt_template(language) - - # Render templates using Jinja2 - system_prompt = Template( - prompts['system_prompt'], undefined=StrictUndefined).render({}) - user_prompt = Template(prompts['user_prompt'], undefined=StrictUndefined).render( - {'content': keywords}) - - # Build messages - messages: List[ChatCompletionMessageParam] = [ - {"role": MESSAGE_ROLE["SYSTEM"], "content": system_prompt}, - {"role": MESSAGE_ROLE["USER"], "content": user_prompt} - ] - - # Get model configuration - if model_id: - try: - from database.model_management_db import get_model_by_model_id - model_info = get_model_by_model_id(model_id, tenant_id) - if model_info: - model_config = { - 'api_key': model_info.get('api_key', ''), - 'base_url': model_info.get('base_url', ''), - 'model_name': model_info.get('model_name', ''), - 'model_repo': model_info.get('model_repo', '') - } - else: - # Fallback to default model if specified model not found - logger.warning(f"Specified model {model_id} not found, falling back to default LLM.") - model_config = tenant_config_manager.get_model_config( - key=MODEL_CONFIG_MAPPING["llm"], tenant_id=tenant_id) - except Exception as e: - logger.warning(f"Failed to get model {model_id}, using default model: {e}") - model_config = tenant_config_manager.get_model_config( - key=MODEL_CONFIG_MAPPING["llm"], tenant_id=tenant_id) - else: - # Use default model configuration - model_config = tenant_config_manager.get_model_config( - key=MODEL_CONFIG_MAPPING["llm"], tenant_id=tenant_id) - - # initialize OpenAI client - client = OpenAI(api_key=model_config.get('api_key', ""), - base_url=model_config.get('base_url', "")) - - try: - # Create stream chat completion request - max_tokens = KNOWLEDGE_SUMMARY_MAX_TOKENS_ZH if language == LANGUAGE[ - "ZH"] else KNOWLEDGE_SUMMARY_MAX_TOKENS_EN - # Get model name for the request - model_name_for_request = model_config.get("model_name", "") - if model_config.get("model_repo"): - model_name_for_request = f"{model_config['model_repo']}/{model_name_for_request}" - - stream = client.chat.completions.create( - model=model_name_for_request, - messages=messages, - max_tokens=max_tokens, # add max_tokens limit - stream=True # enable stream output - ) - - # Iterate through stream response - for chunk in stream: - new_token = chunk.choices[0].delta.content - if new_token is not None: - yield new_token - yield "END" - - except Exception as e: - logger.error(f"Error occurred: {str(e)}") - yield f"Error: {str(e)}" +# Old keyword-based summary method removed - replaced with Map-Reduce approach +# See utils/document_vector_utils.py for new implementation # Initialize ElasticSearchCore instance with HTTPS support @@ -871,62 +786,85 @@ async def summary_index_name(self, model_id: Optional[int] = None ): """ - Generate a summary for the specified index based on its content + Generate a summary for the specified index using advanced Map-Reduce approach + + New implementation: + 1. Get documents and cluster them by semantic similarity + 2. Map: Summarize each document individually + 3. Reduce: Merge document summaries into cluster summaries + 4. Return: Combined knowledge base summary Args: index_name: Name of the index to summarize - batch_size: Number of documents to process per batch + batch_size: Number of documents to sample (default: 1000) es_core: ElasticSearchCore instance tenant_id: ID of the tenant language: Language of the summary (default: 'zh') + model_id: Model ID for LLM summarization Returns: StreamingResponse containing the generated summary """ try: - # Get all documents + from utils.document_vector_utils import ( + process_documents_for_clustering, + kmeans_cluster_documents, + summarize_clusters_map_reduce, + merge_cluster_summaries + ) + if not tenant_id: - raise Exception( - "Tenant ID is required for summary generation.") - all_documents = ElasticSearchService.get_random_documents( - index_name, batch_size, es_core) - all_chunks = self._clean_chunks_for_summary(all_documents) - keywords_dict = calculate_term_weights(all_chunks) - keywords_for_summary = "" - for _, key in enumerate(keywords_dict): - keywords_for_summary = keywords_for_summary + ", " + key - + raise Exception("Tenant ID is required for summary generation.") + + # Use new Map-Reduce approach + sample_count = min(batch_size // 5, 200) # Sample reasonable number of documents + + # Step 1: Get documents and calculate embeddings + document_samples, doc_embeddings = process_documents_for_clustering( + index_name=index_name, + es_core=es_core, + sample_doc_count=sample_count + ) + + if not document_samples: + raise Exception("No documents found in index.") + + # Step 2: Cluster documents + clusters = kmeans_cluster_documents(doc_embeddings, k=None) + + # Step 3: Map-Reduce summarization + cluster_summaries = summarize_clusters_map_reduce( + document_samples=document_samples, + clusters=clusters, + language=language, + doc_max_words=100, + cluster_max_words=150, + model_id=model_id, + tenant_id=tenant_id + ) + + # Step 4: Merge into final summary + final_summary = merge_cluster_summaries(cluster_summaries) + + # Stream the result async def generate_summary(): - token_join = [] try: - for new_token in generate_knowledge_summary_stream(keywords_for_summary, language, tenant_id, model_id): - if new_token == "END": - break - else: - token_join.append(new_token) - yield f"data: {{\"status\": \"success\", \"message\": \"{new_token}\"}}\n\n" - await asyncio.sleep(0.1) + # Stream the summary character by character + for char in final_summary: + yield f"data: {{\"status\": \"success\", \"message\": \"{char}\"}}\n\n" + await asyncio.sleep(0.01) + yield f"data: {{\"status\": \"completed\"}}\n\n" except Exception as e: yield f"data: {{\"status\": \"error\", \"message\": \"{e}\"}}\n\n" - - # Return the flow response + return StreamingResponse( generate_summary(), media_type="text/event-stream" ) - + except Exception as e: - raise Exception(f"{str(e)}") - - @staticmethod - def _clean_chunks_for_summary(all_documents): - # Only use these three fields for summarization - all_chunks = "" - for _, chunk in enumerate(all_documents['documents']): - all_chunks = all_chunks + "\n" + \ - chunk["title"] + "\n" + chunk["filename"] + \ - "\n" + chunk["content"] - return all_chunks + logger.error(f"Knowledge base summary generation failed: {str(e)}", exc_info=True) + raise Exception(f"Failed to generate summary: {str(e)}") @staticmethod def get_random_documents( diff --git a/backend/utils/document_vector_utils.py b/backend/utils/document_vector_utils.py new file mode 100644 index 00000000..5db8c215 --- /dev/null +++ b/backend/utils/document_vector_utils.py @@ -0,0 +1,786 @@ +""" +Document Vector Utilities Module + +This module provides utilities for document-level vector operations and clustering. +Main features: +1. Document-level vector calculation (weighted average of chunk vectors) +2. Automatic K-means clustering with optimal K determination +3. Document grouping and classification +4. Cluster summarization +""" +import logging +import random +from typing import Dict, List, Optional, Tuple + +import numpy as np +import yaml +from jinja2 import Template, StrictUndefined +from sklearn.cluster import KMeans +from sklearn.metrics import silhouette_score + +from consts.const import LANGUAGE + +logger = logging.getLogger("document_vector_utils") + + +def get_documents_from_es(index_name: str, es_core, sample_doc_count: int = 200) -> Dict[str, Dict]: + """ + Get document samples from Elasticsearch, aggregated by path_or_url + + Args: + index_name: Name of the index to query + es_core: ElasticSearchCore instance + sample_doc_count: Number of documents to sample + + Returns: + Dictionary mapping document IDs to document information with chunks + """ + try: + # Step 1: Aggregate unique documents by path_or_url + agg_query = { + "size": 0, + "aggs": { + "unique_documents": { + "terms": { + "field": "path_or_url", + "size": 10000 # Get all unique documents + } + } + } + } + + logger.info(f"Fetching unique documents from index {index_name}") + agg_response = es_core.client.search(index=index_name, body=agg_query) + all_documents = agg_response['aggregations']['unique_documents']['buckets'] + + if not all_documents: + logger.warning(f"No documents found in index {index_name}") + return {} + + # Step 2: Random sample documents + sample_count = min(sample_doc_count, len(all_documents)) + # Ensure all_documents is a list for random.sample + if not isinstance(all_documents, list): + all_documents = list(all_documents) + sampled_docs = random.sample(all_documents, sample_count) + + logger.info(f"Sampled {sample_count} documents from {len(all_documents)} total documents") + + # Step 3: Get all chunks for each sampled document + document_samples = {} + for doc_bucket in sampled_docs: + path_or_url = doc_bucket['key'] + chunk_count = doc_bucket['doc_count'] + + # Get all chunks for this document + chunks_query = { + "query": { + "term": {"path_or_url": path_or_url} + }, + "size": chunk_count # Get all chunks + } + + chunks_response = es_core.client.search(index=index_name, body=chunks_query) + chunks = [hit['_source'] for hit in chunks_response['hits']['hits']] + + # Build document object + if chunks: + doc_id = f"doc_{len(document_samples):04d}" + document_samples[doc_id] = { + "doc_id": doc_id, + "path_or_url": path_or_url, + "filename": chunks[0].get('filename', 'unknown'), + "chunk_count": chunk_count, + "chunks": chunks, + "file_size": chunks[0].get('file_size', 0) + } + + logger.info(f"Successfully retrieved {len(document_samples)} documents with chunks") + return document_samples + + except Exception as e: + logger.error(f"Error retrieving documents from ES: {str(e)}", exc_info=True) + raise Exception(f"Failed to retrieve documents from Elasticsearch: {str(e)}") + + +def calculate_document_embedding(doc_chunks: List[Dict], use_weighted: bool = True) -> Optional[np.ndarray]: + """ + Calculate document-level embedding from chunk embeddings + + Args: + doc_chunks: List of chunk dictionaries containing 'embedding' and 'content' fields + use_weighted: Whether to use weighted average based on content length + + Returns: + Document-level embedding vector or None if no valid embeddings found + """ + try: + embeddings = [] + weights = [] + + for chunk in doc_chunks: + chunk_embedding = chunk.get('embedding') + if chunk_embedding and isinstance(chunk_embedding, list): + embeddings.append(np.array(chunk_embedding)) + + if use_weighted: + # Weight by content length + content_length = len(chunk.get('content', '')) + position_weight = 1.5 if len(embeddings) == 1 else 1.0 # First chunk has higher weight + weight = position_weight * content_length + weights.append(weight) + + if not embeddings: + logger.warning("No valid embeddings found in chunks") + return None + + # Convert to numpy array + embeddings_array = np.array(embeddings) + + if use_weighted and weights: + # Weighted average + total_weight = sum(weights) + weights_normalized = np.array(weights) / total_weight + doc_embedding = np.average(embeddings_array, axis=0, weights=weights_normalized) + else: + # Simple average + doc_embedding = np.mean(embeddings_array, axis=0) + + return doc_embedding + + except Exception as e: + logger.error(f"Error calculating document embedding: {str(e)}", exc_info=True) + return None + + +def auto_determine_k(embeddings: np.ndarray, min_k: int = 3, max_k: int = 15) -> int: + """ + Automatically determine optimal K value for K-means clustering + + Args: + embeddings: Array of document embeddings + min_k: Minimum number of clusters + max_k: Maximum number of clusters + + Returns: + Optimal K value + """ + try: + n_samples = len(embeddings) + + # Handle edge cases + if n_samples < min_k: + return max(2, n_samples) + + if n_samples < 20: + # For small datasets, use simple heuristic + heuristic_k = max(min_k, min(int(np.sqrt(n_samples / 2)), max_k)) + return heuristic_k + + # Determine K range based on dataset size + actual_max_k = min(max_k, n_samples // 10, 15) # At least 10 samples per cluster + actual_min_k = min(min_k, actual_max_k) + + # Try different K values and calculate silhouette score + best_k = actual_min_k + best_score = -1 + + k_range = range(actual_min_k, actual_max_k + 1) + logger.info(f"Trying K values from {actual_min_k} to {actual_max_k}") + + for k in k_range: + try: + kmeans = KMeans(n_clusters=k, random_state=42, n_init=10, max_iter=300) + labels = kmeans.fit_predict(embeddings) + + # Calculate silhouette score + score = silhouette_score(embeddings, labels, sample_size=min(1000, n_samples)) + + logger.debug(f"K={k}, Silhouette Score={score:.4f}") + + if score > best_score: + best_score = score + best_k = k + + except Exception as e: + logger.warning(f"Error calculating K={k}: {str(e)}") + continue + + logger.info(f"Optimal K determined: {best_k} (Silhouette Score: {best_score:.4f})") + return best_k + + except Exception as e: + logger.error(f"Error in auto_determine_k: {str(e)}", exc_info=True) + # Fallback to heuristic + heuristic_k = max(min_k, min(int(np.sqrt(len(embeddings) / 2)), max_k)) + logger.warning(f"Using fallback K value: {heuristic_k}") + return heuristic_k + + +def kmeans_cluster_documents(doc_embeddings: Dict[str, np.ndarray], k: Optional[int] = None) -> Dict[int, List[str]]: + """ + Cluster documents using K-means + + Args: + doc_embeddings: Dictionary mapping document IDs to their embeddings + k: Number of clusters (if None, auto-determined) + + Returns: + Dictionary mapping cluster IDs to lists of document IDs + """ + try: + if not doc_embeddings: + logger.warning("No document embeddings provided") + return {} + + # Prepare embeddings array + doc_ids = list(doc_embeddings.keys()) + embeddings_array = np.array([doc_embeddings[doc_id] for doc_id in doc_ids]) + + # Handle single document case + if len(doc_ids) == 1: + logger.info("Only one document found, skipping clustering") + return {0: doc_ids} + + # Determine K value + if k is None: + k = auto_determine_k(embeddings_array) + + # Ensure k is not greater than number of documents + k = min(k, len(doc_ids)) + + logger.info(f"Clustering {len(doc_ids)} documents into {k} clusters") + + # Perform K-means clustering + kmeans = KMeans(n_clusters=k, random_state=42, n_init=10, max_iter=300) + labels = kmeans.fit_predict(embeddings_array) + + # Group documents by cluster + clusters = {} + for i, label in enumerate(labels): + if label not in clusters: + clusters[label] = [] + clusters[label].append(doc_ids[i]) + + # Log cluster sizes + for cluster_id, docs in clusters.items(): + logger.info(f"Cluster {cluster_id}: {len(docs)} documents") + + return clusters + + except Exception as e: + logger.error(f"Error in K-means clustering: {str(e)}", exc_info=True) + raise Exception(f"Failed to cluster documents: {str(e)}") + + +def process_documents_for_clustering(index_name: str, es_core, sample_doc_count: int = 200) -> Tuple[Dict[str, Dict], Dict[str, np.ndarray]]: + """ + Complete workflow: Get documents from ES and calculate their embeddings + + Args: + index_name: Name of the index to query + es_core: ElasticSearchCore instance + sample_doc_count: Number of documents to sample + + Returns: + Tuple of (document_samples dict, doc_embeddings dict) + """ + try: + # Step 1: Get documents from ES + document_samples = get_documents_from_es(index_name, es_core, sample_doc_count) + + if not document_samples: + logger.warning("No documents retrieved from ES") + return {}, {} + + # Step 2: Calculate document-level embeddings + doc_embeddings = {} + for doc_id, doc_info in document_samples.items(): + chunks = doc_info['chunks'] + doc_embedding = calculate_document_embedding(chunks, use_weighted=True) + + if doc_embedding is not None: + doc_embeddings[doc_id] = doc_embedding + else: + logger.warning(f"Failed to calculate embedding for document {doc_id}") + + logger.info(f"Successfully calculated embeddings for {len(doc_embeddings)} documents") + return document_samples, doc_embeddings + + except Exception as e: + logger.error(f"Error processing documents for clustering: {str(e)}", exc_info=True) + raise Exception(f"Failed to process documents: {str(e)}") + + +def extract_cluster_content(document_samples: Dict[str, Dict], cluster_doc_ids: List[str], max_chunks_per_doc: int = 3) -> str: + """ + Extract representative content from a cluster for summarization + + Args: + document_samples: Dictionary mapping doc_id to document info + cluster_doc_ids: List of document IDs in the cluster + max_chunks_per_doc: Maximum number of chunks to include per document + + Returns: + Formatted string containing cluster content + """ + cluster_content_parts = [] + + for doc_id in cluster_doc_ids: + if doc_id not in document_samples: + continue + + doc_info = document_samples[doc_id] + chunks = doc_info.get('chunks', []) + filename = doc_info.get('filename', 'unknown') + + # Extract representative chunks + representative_chunks = [] + if len(chunks) <= max_chunks_per_doc: + representative_chunks = chunks + else: + # Take first, middle, and last chunks + representative_chunks = ( + chunks[:1] + + chunks[len(chunks)//2:len(chunks)//2+1] + + chunks[-1:] + ) + + # Format document content + doc_content = f"\n--- Document: {filename} ---\n" + for chunk in representative_chunks: + content = chunk.get('content', '') + # Limit chunk content length + if len(content) > 500: + content = content[:500] + "..." + doc_content += f"{content}\n" + + cluster_content_parts.append(doc_content) + + return "\n".join(cluster_content_parts) + + +def summarize_document(document_content: str, filename: str, language: str = LANGUAGE["ZH"], max_words: int = 100, model_id: Optional[int] = None, tenant_id: Optional[str] = None) -> str: + """ + Summarize a single document using LLM (Map stage) + + Args: + document_content: Formatted content from document chunks + filename: Document filename + language: Language code ('zh' or 'en') + max_words: Maximum words in the summary + model_id: Model ID for LLM call + tenant_id: Tenant ID for model configuration + + Returns: + Document summary text + """ + try: + # Select prompt file based on language + if language == LANGUAGE["ZH"]: + prompt_path = 'backend/prompts/document_summary_agent_zh.yaml' + else: + prompt_path = 'backend/prompts/document_summary_agent.yaml' + + with open(prompt_path, 'r', encoding='utf-8') as f: + prompts = yaml.safe_load(f) + + system_prompt = prompts.get('system_prompt', '') + user_prompt_template = prompts.get('user_prompt', '') + + user_prompt = Template(user_prompt_template, undefined=StrictUndefined).render( + filename=filename, + content=document_content, + max_words=max_words + ) + + logger.info(f"Document summary prompt generated for {filename} (max_words: {max_words})") + + # Call LLM if model_id and tenant_id are provided + if model_id and tenant_id: + from smolagents import OpenAIServerModel + from database.model_management_db import get_model_by_model_id + from utils.config_utils import get_model_name_from_config + from consts.const import MESSAGE_ROLE + + # Get model configuration + llm_model_config = get_model_by_model_id(model_id=model_id, tenant_id=tenant_id) + if not llm_model_config: + logger.warning(f"No model configuration found for model_id: {model_id}, tenant_id: {tenant_id}") + return f"[Document Summary: {filename}] (max {max_words} words) - Content: {document_content[:200]}..." + + # Create LLM instance + llm = OpenAIServerModel( + model_id=get_model_name_from_config(llm_model_config) if llm_model_config else "", + api_base=llm_model_config.get("base_url", ""), + api_key=llm_model_config.get("api_key", ""), + temperature=0.3, + top_p=0.95 + ) + + # Build messages + messages = [ + {"role": MESSAGE_ROLE["SYSTEM"], "content": system_prompt}, + {"role": MESSAGE_ROLE["USER"], "content": user_prompt} + ] + + # Call LLM + response = llm(messages, max_tokens=max_words * 2) # Allow more tokens for generation + return response.content.strip() + else: + # Fallback to placeholder if no model configuration + logger.warning("No model_id or tenant_id provided, using placeholder summary") + return f"[Document Summary: {filename}] (max {max_words} words) - Content: {document_content[:200]}..." + + except Exception as e: + logger.error(f"Error generating document summary: {str(e)}", exc_info=True) + return f"Failed to generate summary for {filename}: {str(e)}" + + +def summarize_cluster(document_summaries: List[str], language: str = LANGUAGE["ZH"], max_words: int = 150, model_id: Optional[int] = None, tenant_id: Optional[str] = None) -> str: + """ + Summarize a cluster of documents using LLM (Reduce stage) + + Args: + document_summaries: List of individual document summaries + language: Language code ('zh' or 'en') + max_words: Maximum words in the summary + model_id: Model ID for LLM call + tenant_id: Tenant ID for model configuration + + Returns: + Cluster summary text + """ + try: + # Select prompt file based on language + if language == LANGUAGE["ZH"]: + prompt_path = 'backend/prompts/cluster_summary_reduce_zh.yaml' + else: + prompt_path = 'backend/prompts/cluster_summary_reduce.yaml' + + with open(prompt_path, 'r', encoding='utf-8') as f: + prompts = yaml.safe_load(f) + + system_prompt = prompts.get('system_prompt', '') + user_prompt_template = prompts.get('user_prompt', '') + + # Format document summaries + summaries_text = "\n\n".join([f"Document {i+1}: {summary}" for i, summary in enumerate(document_summaries)]) + + user_prompt = Template(user_prompt_template, undefined=StrictUndefined).render( + document_summaries=summaries_text, + max_words=max_words + ) + + logger.info(f"Cluster summary prompt generated (language: {language}, max_words: {max_words})") + + # Call LLM if model_id and tenant_id are provided + if model_id and tenant_id: + from smolagents import OpenAIServerModel + from database.model_management_db import get_model_by_model_id + from utils.config_utils import get_model_name_from_config + from consts.const import MESSAGE_ROLE + + # Get model configuration + llm_model_config = get_model_by_model_id(model_id=model_id, tenant_id=tenant_id) + if not llm_model_config: + logger.warning(f"No model configuration found for model_id: {model_id}, tenant_id: {tenant_id}") + return f"[Cluster Summary] (max {max_words} words) - Based on {len(document_summaries)} documents" + + # Create LLM instance + llm = OpenAIServerModel( + model_id=get_model_name_from_config(llm_model_config) if llm_model_config else "", + api_base=llm_model_config.get("base_url", ""), + api_key=llm_model_config.get("api_key", ""), + temperature=0.3, + top_p=0.95 + ) + + # Build messages + messages = [ + {"role": MESSAGE_ROLE["SYSTEM"], "content": system_prompt}, + {"role": MESSAGE_ROLE["USER"], "content": user_prompt} + ] + + # Call LLM + response = llm(messages, max_tokens=max_words * 2) # Allow more tokens for generation + return response.content.strip() + else: + # Fallback to placeholder if no model configuration + logger.warning("No model_id or tenant_id provided, using placeholder summary") + return f"[Cluster Summary] (max {max_words} words) - Based on {len(document_summaries)} documents" + + except Exception as e: + logger.error(f"Error generating cluster summary: {str(e)}", exc_info=True) + return f"Failed to generate summary: {str(e)}" + + +def extract_representative_chunks_smart(chunks: List[Dict], max_chunks: int = 3) -> List[Dict]: + """ + Intelligently extract representative chunks from a document + + Strategy: + 1. Always include first chunk (usually contains title/abstract) + 2. Extract chunks with highest keyword density (important content) + 3. Include last chunk if significant (may contain conclusions) + + Args: + chunks: List of chunk dictionaries with 'content' field + max_chunks: Maximum number of chunks to return + + Returns: + List of representative chunks + """ + if len(chunks) <= max_chunks: + return chunks + + selected_chunks = [] + + # 1. Always include first chunk + selected_chunks.append(chunks[0]) + + # 2. Find chunks with high keyword density + try: + from nexent.core.nlp.tokenizer import calculate_term_weights + except ImportError: + # Fallback: use simple scoring + logger.warning("Could not import calculate_term_weights, using simple scoring") + # Simple fallback: just pick middle chunks + if len(chunks) > 1: + selected_chunks.append(chunks[len(chunks)//2]) + if len(selected_chunks) < max_chunks and len(chunks) > 2: + selected_chunks.append(chunks[-1]) + return selected_chunks[:max_chunks] + + chunk_scores = [] + for i, chunk in enumerate(chunks[1:-1]): # Skip first and last + content = chunk.get('content', '') + if len(content) > 500: + # Calculate keyword density (use first 500 chars for speed) + keywords = calculate_term_weights(content[:500]) + score = len(keywords) * 0.5 + len(content) * 0.001 # Balance keyword count and length + chunk_scores.append((i + 1, score, chunk)) + + # Sort by score and pick top chunks + chunk_scores.sort(key=lambda x: x[1], reverse=True) + remaining_slots = max_chunks - 1 # Already have first chunk + + for idx, score, chunk in chunk_scores[:remaining_slots]: + selected_chunks.append(chunk) + + # 3. If we have space, include last chunk + if len(selected_chunks) < max_chunks and len(chunks) > 1: + selected_chunks.append(chunks[-1]) + + return selected_chunks[:max_chunks] + + +def merge_cluster_summaries(cluster_summaries: Dict[int, str]) -> str: + """ + Merge all cluster summaries into a final knowledge base summary + + Args: + cluster_summaries: Dictionary mapping cluster_id to cluster summary + + Returns: + Final merged knowledge base summary + """ + if not cluster_summaries: + return "" + + # Sort by cluster ID for consistent output + sorted_clusters = sorted(cluster_summaries.items()) + + # Format cluster summaries with HTML paragraph tags for explicit rendering + summary_parts = [] + for _, summary in sorted_clusters: + if summary.strip(): + # Wrap each summary in
tags for explicit paragraph rendering + summary_parts.append(f"
{summary.strip()}
") + + # Join with simple double newlines, as tags already handle block-level separation
+ final_summary = "\n\n".join(summary_parts)
+
+ logger.info(f"Merged {len(cluster_summaries)} cluster summaries into final knowledge base summary")
+ return final_summary
+
+
+def analyze_cluster_coherence(cluster_doc_ids: List[str], document_samples: Dict[str, Dict]) -> Dict[str, any]:
+ """
+ Analyze coherence and structure of documents within a cluster
+
+ Returns:
+ Dict with analysis results including common themes, document types, etc.
+ """
+ if not cluster_doc_ids:
+ return {}
+
+ # Extract document titles and content previews
+ doc_previews = []
+ for doc_id in cluster_doc_ids:
+ if doc_id in document_samples:
+ doc_info = document_samples[doc_id]
+ filename = doc_info.get('filename', 'unknown')
+ chunks = doc_info.get('chunks', [])
+ if chunks:
+ first_chunk = chunks[0].get('content', '')[:200]
+ doc_previews.append({'filename': filename, 'preview': first_chunk})
+
+ return {
+ 'doc_count': len(cluster_doc_ids),
+ 'doc_previews': doc_previews,
+ 'file_types': [doc['filename'].split('.')[-1] for doc in doc_previews if '.' in doc['filename']]
+ }
+
+
+def summarize_clusters_map_reduce(document_samples: Dict[str, Dict], clusters: Dict[int, List[str]],
+ language: str = LANGUAGE["ZH"], doc_max_words: int = 100, cluster_max_words: int = 150,
+ use_smart_chunk_selection: bool = True, enhance_with_metadata: bool = True,
+ model_id: Optional[int] = None, tenant_id: Optional[str] = None) -> Dict[int, str]:
+ """
+ Summarize all clusters using Map-Reduce approach
+
+ Map stage: Summarize each document individually (within each cluster)
+ Reduce stage: Combine document summaries within the same cluster into a cluster summary
+ Note: Clusters remain separate - we combine document summaries WITHIN each cluster
+
+ Args:
+ document_samples: Dictionary mapping doc_id to document info
+ clusters: Dictionary mapping cluster_id to list of doc_ids
+ language: Language code ('zh' or 'en')
+ doc_max_words: Maximum words per document summary
+ cluster_max_words: Maximum words per cluster summary
+ use_smart_chunk_selection: Use intelligent chunk selection based on keyword density
+ enhance_with_metadata: Enhance summaries with document metadata
+ model_id: Model ID for LLM calls
+ tenant_id: Tenant ID for model configuration
+
+ Returns:
+ Dictionary mapping cluster_id to summary text
+ """
+ cluster_summaries = {}
+
+ for cluster_id, doc_ids in clusters.items():
+ logger.info(f"Summarizing cluster {cluster_id} with {len(doc_ids)} documents using Map-Reduce")
+
+ # Map stage: Summarize each document
+ document_summaries = []
+ for doc_id in doc_ids:
+ if doc_id not in document_samples:
+ continue
+
+ doc_info = document_samples[doc_id]
+ chunks = doc_info.get('chunks', [])
+ filename = doc_info.get('filename', 'unknown')
+
+ # Extract representative content for this document
+ if use_smart_chunk_selection:
+ representative_chunks = extract_representative_chunks_smart(chunks, max_chunks=3)
+ else:
+ # Simple approach: first, middle, last
+ if len(chunks) <= 3:
+ representative_chunks = chunks
+ else:
+ representative_chunks = (
+ chunks[:1] +
+ chunks[len(chunks)//2:len(chunks)//2+1] +
+ chunks[-1:]
+ )
+
+ # Format document content (merge top-K chunks)
+ doc_content = ""
+ for i, chunk in enumerate(representative_chunks):
+ content = chunk.get('content', '')
+ # Limit each chunk length for individual document
+ if len(content) > 1000:
+ content = content[:1000] + "..."
+ # Add chunk separator
+ doc_content += f"[Chunk {i+1}]\n{content}\n\n"
+
+ # Generate document summary from merged chunks
+ logger.info(f"Summarizing document {filename} with {len(representative_chunks)} representative chunks")
+ doc_summary = summarize_document(doc_content, filename, language, doc_max_words, model_id, tenant_id)
+ document_summaries.append(doc_summary)
+
+ # Reduce stage: Combine document summaries within this cluster into cluster summary
+ if document_summaries:
+ # Optionally enhance with cluster analysis
+ if enhance_with_metadata:
+ cluster_analysis = analyze_cluster_coherence(doc_ids, document_samples)
+ logger.info(f"Cluster {cluster_id} analysis: {cluster_analysis.get('doc_count', 0)} documents")
+
+ cluster_summary = summarize_cluster(document_summaries, language, cluster_max_words, model_id, tenant_id)
+ cluster_summaries[cluster_id] = cluster_summary
+ else:
+ logger.warning(f"No valid documents found in cluster {cluster_id}")
+ cluster_summaries[cluster_id] = "No content available for this cluster"
+
+ return cluster_summaries
+
+
+def summarize_clusters(document_samples: Dict[str, Dict], clusters: Dict[int, List[str]],
+ language: str = LANGUAGE["ZH"], max_words: int = 150) -> Dict[int, str]:
+ """
+ Summarize all clusters (legacy method - kept for backward compatibility)
+
+ Note: This method uses the old approach. Use summarize_clusters_map_reduce for better results.
+
+ Args:
+ document_samples: Dictionary mapping doc_id to document info
+ clusters: Dictionary mapping cluster_id to list of doc_ids
+ language: Language code ('zh' or 'en')
+ max_words: Maximum words per cluster summary
+
+ Returns:
+ Dictionary mapping cluster_id to summary text
+ """
+ cluster_summaries = {}
+
+ for cluster_id, doc_ids in clusters.items():
+ logger.info(f"Summarizing cluster {cluster_id} with {len(doc_ids)} documents")
+
+ # Extract cluster content
+ cluster_content = extract_cluster_content(document_samples, doc_ids, max_chunks_per_doc=3)
+
+ # Generate summary using old method
+ summary = summarize_cluster_legacy(cluster_content, language, max_words)
+ cluster_summaries[cluster_id] = summary
+
+ return cluster_summaries
+
+
+def summarize_cluster_legacy(cluster_content: str, language: str = LANGUAGE["ZH"], max_words: int = 150) -> str:
+ """
+ Legacy cluster summarization method (single-stage)
+
+ Args:
+ cluster_content: Formatted content from the cluster
+ language: Language code ('zh' or 'en')
+ max_words: Maximum words in the summary
+
+ Returns:
+ Cluster summary text
+ """
+ try:
+ prompt_path = 'backend/prompts/cluster_summary_agent.yaml'
+ with open(prompt_path, 'r', encoding='utf-8') as f:
+ prompts = yaml.safe_load(f)
+
+ system_prompt = prompts.get('system_prompt', '')
+ user_prompt_template = prompts.get('user_prompt', '')
+
+ user_prompt = Template(user_prompt_template, undefined=StrictUndefined).render(
+ cluster_content=cluster_content,
+ max_words=max_words
+ )
+
+ logger.info(f"Cluster summary prompt generated (language: {language}, max_words: {max_words})")
+
+ # Note: This is a legacy function, using placeholder summary
+ # The main summarization uses summarize_cluster() with LLM integration
+ return f"[Cluster Summary] (max {max_words} words) - Content preview: {cluster_content[:200]}..."
+
+ except Exception as e:
+ logger.error(f"Error generating cluster summary: {str(e)}", exc_info=True)
+ return f"Failed to generate summary: {str(e)}"
+
diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml
index 182ae806..cfb20f6e 100644
--- a/docker/docker-compose.dev.yml
+++ b/docker/docker-compose.dev.yml
@@ -1,38 +1,38 @@
name: nexent
services:
- nexent:
- image: nexent/nexent:latest
- container_name: nexent
- restart: always
- ports:
- - "5010:5010"
- - "5013:5013"
- volumes:
- - ../:/opt/
- - /opt/backend/.venv/
- - ${NEXENT_USER_DIR:-$HOME/nexent}:/mnt/nexent
- environment:
- skip_proxy: "true"
- UMASK: 0022
- env_file:
- - .env
- user: root
- logging:
- driver: "json-file"
- options:
- max-size: "10m"
- max-file: "3"
- networks:
- - nexent
- entrypoint: "/bin/bash"
- command:
- - -c
- - |
- rm -rf /var/lib/apt/lists/* &&
- echo "Python environment activated: $(which python)" &&
- echo "Python version: $(python --version)" &&
- tail -f /dev/null
+# nexent:
+# image: nexent/nexent:latest
+# container_name: nexent
+# restart: always
+# ports:
+# - "5010:5010"
+# - "5013:5013"
+# volumes:
+# - ../:/opt/
+# - /opt/backend/.venv/
+# - ${NEXENT_USER_DIR:-$HOME/nexent}:/mnt/nexent
+# environment:
+# skip_proxy: "true"
+# UMASK: 0022
+# env_file:
+# - .env
+# user: root
+# logging:
+# driver: "json-file"
+# options:
+# max-size: "10m"
+# max-file: "3"
+# networks:
+# - nexent
+# entrypoint: "/bin/bash"
+# command:
+# - -c
+# - |
+# rm -rf /var/lib/apt/lists/* &&
+# echo "Python environment activated: $(which python)" &&
+# echo "Python version: $(python --version)" &&
+# tail -f /dev/null
nexent-data-process:
@@ -45,7 +45,7 @@ services:
volumes:
- ../:/opt/:cached
- /opt/backend/.venv/
- - ${NEXENT_USER_DIR:-$HOME/nexent}:/mnt/nexent
+ - ${ROOT_DIR}:/mnt/nexent-data
environment:
skip_proxy: "true"
PATH: "/usr/local/bin:/usr/bin/:/opt/backend/.venv/bin:${PATH}"
@@ -70,27 +70,27 @@ services:
echo "Python version: $(python --version)" &&
python -c "import time; time.sleep(2147483647)"
- nexent-web:
- image: nexent/nexent-web:latest
- container_name: nexent-web
- restart: always
- networks:
- - nexent
- ports:
- - "3000:3000"
- volumes:
- - ../frontend:/opt/frontend:cached
- - ../frontend/node_modules:/opt/frontend/node_modules:cached
- environment:
- - HTTP_BACKEND=http://nexent:5010
- - WS_BACKEND=ws://nexent:5010
- - MINIO_ENDPOINT=${MINIO_ENDPOINT}
- logging:
- driver: "json-file"
- options:
- max-size: "10m"
- max-file: "3"
- command: ["/bin/sh", "-c", "echo 'Web Service needs to be started manually. Use\nnpm install -g pnpm\npnpm install\npnpm dev\n under /opt/frontend to start.' && tail -f /dev/null"]
+# nexent-web:
+# image: nexent/nexent-web:latest
+# container_name: nexent-web
+# restart: always
+# networks:
+# - nexent
+# ports:
+# - "3000:3000"
+# volumes:
+# - ../frontend:/opt/frontend:cached
+# - ../frontend/node_modules:/opt/frontend/node_modules:cached
+# environment:
+# - HTTP_BACKEND=http://nexent:5010
+# - WS_BACKEND=ws://nexent:5010
+# - MINIO_ENDPOINT=${MINIO_ENDPOINT}
+# logging:
+# driver: "json-file"
+# options:
+# max-size: "10m"
+# max-file: "3"
+# command: ["/bin/sh", "-c", "echo 'Web Service needs to be started manually. Use\nnpm install -g pnpm\npnpm install\npnpm dev\n under /opt/frontend to start.' && tail -f /dev/null"]
networks:
diff --git a/docker/init.sql b/docker/init.sql
index d23e1c7f..4d19084e 100644
--- a/docker/init.sql
+++ b/docker/init.sql
@@ -290,6 +290,8 @@ CREATE TABLE IF NOT EXISTS nexent.ag_tenant_agent_t (
business_description VARCHAR,
model_name VARCHAR(100),
model_id INTEGER,
+ business_logic_model_name VARCHAR(100),
+ business_logic_model_id INTEGER,
max_steps INTEGER,
duty_prompt TEXT,
constraint_prompt TEXT,
@@ -330,6 +332,8 @@ COMMENT ON COLUMN nexent.ag_tenant_agent_t.description IS 'Description';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.business_description IS 'Manually entered by the user to describe the entire business process';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.model_name IS '[DEPRECATED] Name of the model used, use model_id instead';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.model_id IS 'Model ID, foreign key reference to model_record_t.model_id';
+COMMENT ON COLUMN nexent.ag_tenant_agent_t.business_logic_model_name IS 'Model name used for business logic prompt generation';
+COMMENT ON COLUMN nexent.ag_tenant_agent_t.business_logic_model_id IS 'Model ID used for business logic prompt generation, foreign key reference to model_record_t.model_id';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.max_steps IS 'Maximum number of steps';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.duty_prompt IS 'Duty prompt';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.constraint_prompt IS 'Constraint prompt';
@@ -344,8 +348,6 @@ COMMENT ON COLUMN nexent.ag_tenant_agent_t.created_by IS 'Creator';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.updated_by IS 'Updater';
COMMENT ON COLUMN nexent.ag_tenant_agent_t.delete_flag IS 'Whether it is deleted. Optional values: Y/N';
--- Add comments to the columns
-COMMENT ON COLUMN nexent.ag_tenant_agent_t.provide_run_summary IS 'Whether to provide the running summary to the manager agent';
-- Create the ag_tool_instance_t table in the nexent schema
CREATE TABLE IF NOT EXISTS nexent.ag_tool_instance_t (
@@ -644,4 +646,4 @@ $$ LANGUAGE plpgsql;
CREATE TRIGGER "update_partner_mapping_update_time_trigger"
BEFORE UPDATE ON "nexent"."partner_mapping_id_t"
FOR EACH ROW
-EXECUTE FUNCTION "update_partner_mapping_update_time"();
\ No newline at end of file
+EXECUTE FUNCTION "update_partner_mapping_update_time"();
diff --git a/docker/sql/1024_add_business_logic_model_fields.sql b/docker/sql/1024_add_business_logic_model_fields.sql
new file mode 100644
index 00000000..ff1a7673
--- /dev/null
+++ b/docker/sql/1024_add_business_logic_model_fields.sql
@@ -0,0 +1,12 @@
+-- Add business_logic_model_name and business_logic_model_id fields to ag_tenant_agent_t table
+-- These fields store the LLM model used for generating business logic prompts
+
+ALTER TABLE nexent.ag_tenant_agent_t
+ADD COLUMN IF NOT EXISTS business_logic_model_name VARCHAR(100);
+
+ALTER TABLE nexent.ag_tenant_agent_t
+ADD COLUMN IF NOT EXISTS business_logic_model_id INTEGER;
+
+COMMENT ON COLUMN nexent.ag_tenant_agent_t.business_logic_model_name IS 'Model name used for business logic prompt generation';
+COMMENT ON COLUMN nexent.ag_tenant_agent_t.business_logic_model_id IS 'Model ID used for business logic prompt generation, foreign key reference to model_record_t.model_id';
+
diff --git a/frontend/app/[locale]/chat/streaming/chatStreamMain.tsx b/frontend/app/[locale]/chat/streaming/chatStreamMain.tsx
index 6f4d77e9..dde39fed 100644
--- a/frontend/app/[locale]/chat/streaming/chatStreamMain.tsx
+++ b/frontend/app/[locale]/chat/streaming/chatStreamMain.tsx
@@ -398,6 +398,46 @@ export function ChatStreamMain({
shouldScrollToBottom,
]);
+ // Additional scroll trigger for async content like Mermaid diagrams
+ useEffect(() => {
+ if (processedMessages.finalMessages.length > 0 && autoScroll) {
+ const scrollAreaElement = scrollAreaRef.current?.querySelector(
+ "[data-radix-scroll-area-viewport]"
+ );
+ if (!scrollAreaElement) return;
+
+ // Use ResizeObserver to detect when content height changes (e.g., Mermaid diagrams finish rendering)
+ const resizeObserver = new ResizeObserver(() => {
+ const { scrollTop, scrollHeight, clientHeight } =
+ scrollAreaElement as HTMLElement;
+ const distanceToBottom = scrollHeight - scrollTop - clientHeight;
+
+ // Auto-scroll if user is near bottom and content height changed
+ if (distanceToBottom < 100) {
+ scrollToBottom();
+ }
+ });
+
+ resizeObserver.observe(scrollAreaElement);
+
+ // Also use a timeout as fallback for async content
+ const timeoutId = setTimeout(() => {
+ const { scrollTop, scrollHeight, clientHeight } =
+ scrollAreaElement as HTMLElement;
+ const distanceToBottom = scrollHeight - scrollTop - clientHeight;
+
+ if (distanceToBottom < 100) {
+ scrollToBottom();
+ }
+ }, 1000); // Wait 1 second for async content to render
+
+ return () => {
+ resizeObserver.disconnect();
+ clearTimeout(timeoutId);
+ };
+ }
+ }, [processedMessages.finalMessages.length, autoScroll]);
+
// Scroll to bottom when task messages are updated
useEffect(() => {
if (autoScroll) {
diff --git a/frontend/app/[locale]/chat/streaming/taskWindow.tsx b/frontend/app/[locale]/chat/streaming/taskWindow.tsx
index c8bd3f61..4067c992 100644
--- a/frontend/app/[locale]/chat/streaming/taskWindow.tsx
+++ b/frontend/app/[locale]/chat/streaming/taskWindow.tsx
@@ -664,6 +664,7 @@ const messageHandlers: MessageHandler[] = [