Conversation
- Add MetadataExtractor to collect user-related statements post-dedup and extract profile/behavioral metadata via independent LLM call - Add Celery task (extract_user_metadata) routed to memory_tasks queue - Add metadata models (UserMetadata, UserMetadataProfile, etc.) - Add metadata utility functions (clean, validate, merge with _op support) - Add Jinja2 prompt template for metadata extraction (zh/en) - Fix Lucene query parameter naming: rename `q` to `query` across all Cypher queries, graph_search functions, and callers - Escape `/` in Lucene queries to prevent TokenMgrError - Add `speaker` field to ChunkNode and persist it in Neo4j - Remove unused imports (argparse, os, UUID) in search.py - Fix unnecessary db context nesting in interest distribution task
…sed merge - Remove merge_metadata and its helper functions from metadata_utils.py - Pass existing_metadata to MetadataExtractor.extract_metadata() as LLM context - Add merge instructions to extract_user_metadata.jinja2 prompt (zh/en) - Update Celery task to read existing metadata before extraction and overwrite - Simplify field descriptions in UserMetadataProfile model - Add _update_timestamps helper to track changed fields
…licate user entity nodes
- Merge alias add/remove into MetadataExtractionResponse and Celery metadata task,
removing the separate sync step from extraction_orchestrator
- Replace first-person pronouns ("我") with "用户" in statement extraction to
preserve identity semantics for downstream metadata/alias extraction
- Update extract_statement.jinja2 prompt to enforce "用户" as subject for user
statements instead of resolving to real names
- Add alias change instructions (aliases_to_add/aliases_to_remove) to
extract_user_metadata.jinja2 with incremental merge logic
- Deduplicate special entities ("用户", "AI助手") in graph_saver by reusing
existing Neo4j node IDs per end_user_id
- Sync final aliases from PgSQL to Neo4j user entity nodes after metadata write
…metadata utils - Remove _replace_first_person_with_user from StatementExtractor to preserve original user text for downstream metadata/alias extraction - Delete metadata_utils.py module, inline clean_metadata into Celery task - Remove unused imports and commented-out collect_user_raw_messages method - Apply formatting cleanup across metadata models and extraction orchestrator
Contributor
审阅者指南(Reviewer's Guide)添加一个由新的基于 LLM 的 异步用户元数据抽取流水线的时序图sequenceDiagram
actor User
participant Orchestrator as ExtractionOrchestrator
participant MetadataExtractor as MetadataExtractor
participant CeleryBroker as CeleryBroker
participant CeleryWorker as CeleryWorker
participant PG as PostgresDB
participant EndUserRepo as EndUserRepository
participant EndUserInfoRepo as EndUserInfoRepository
participant MemoryConfigService as MemoryConfigService
participant MemoryClientFactory as MemoryClientFactory
participant LLMClient as LLMClient
participant Neo4j as Neo4jConnector
User->>Orchestrator: run()
Orchestrator->>MetadataExtractor: collect_user_related_statements(entity_nodes, statement_nodes, statement_entity_edges)
MetadataExtractor-->>Orchestrator: user_statements
alt has_user_statements
Orchestrator->>CeleryBroker: enqueue extract_user_metadata_task(end_user_id, statements, config_id, language)
CeleryBroker-->>CeleryWorker: dispatch extract_user_metadata_task
CeleryWorker->>PG: get_db_context()
CeleryWorker->>EndUserRepo: get_by_id(end_user_id)
EndUserRepo-->>CeleryWorker: end_user
CeleryWorker->>MemoryConfigService: get_config_with_fallback(memory_config_id, workspace_id)
MemoryConfigService-->>CeleryWorker: memory_config
CeleryWorker->>MemoryClientFactory: get_llm_client(llm_id)
MemoryClientFactory-->>CeleryWorker: LLMClient
CeleryWorker->>EndUserInfoRepo: get_by_end_user_id(end_user_id)
EndUserInfoRepo-->>CeleryWorker: existing_info(meta_data, aliases)
CeleryWorker->>MetadataExtractor: extract_metadata(statements, existing_metadata, existing_aliases)
MetadataExtractor->>LLMClient: response_structured(prompt, MetadataExtractionResponse)
LLMClient-->>MetadataExtractor: MetadataExtractionResponse(user_metadata, aliases_to_add, aliases_to_remove)
MetadataExtractor-->>CeleryWorker: user_metadata, aliases_to_add, aliases_to_remove
CeleryWorker->>CeleryWorker: clean_metadata(user_metadata)
CeleryWorker->>CeleryWorker: _update_timestamps(existing_meta, cleaned_meta, updated_at, now)
CeleryWorker->>PG: update EndUserInfo.meta_data, EndUserInfo.aliases, EndUser.other_name
PG-->>CeleryWorker: commit
CeleryWorker->>Neo4j: execute_query(MATCH ExtractedEntity user placeholders SET aliases)
Neo4j-->>CeleryWorker: ok
CeleryWorker-->>CeleryBroker: task result(status=SUCCESS)
CeleryBroker-->>Orchestrator: async completion (logged only)
else no_user_statements
Orchestrator->>Orchestrator: skip metadata extraction
end
元数据抽取及相关模型的类图classDiagram
class MetadataExtractor {
- llm_client
- language : str
+ MetadataExtractor(llm_client, language : str)
+ detect_language(statements : List~str~) str
+ collect_user_related_statements(entity_nodes : List~ExtractedEntityNode~, statement_nodes : List~StatementNode~, statement_entity_edges : List~StatementEntityEdge~) List~str~
+ extract_metadata(statements : List~str~, existing_metadata : dict, existing_aliases : List~str~) tuple
}
class ExtractedEntityNode {
+ id : str
+ name : str
+ entity_type : str
+ end_user_id : str
}
class StatementNode {
+ id : str
+ statement : str
+ speaker : str
+ end_user_id : str
}
class StatementEntityEdge {
+ source : str
+ target : str
}
class UserMetadataProfile {
+ role : str
+ domain : str
+ expertise : List~str~
+ interests : List~str~
}
class UserMetadataBehavioralHints {
+ learning_stage : str
+ preferred_depth : str
+ tone_preference : str
}
class UserMetadata {
+ profile : UserMetadataProfile
+ behavioral_hints : UserMetadataBehavioralHints
+ knowledge_tags : List~str~
}
class MetadataExtractionResponse {
+ user_metadata : UserMetadata
+ aliases_to_add : List~str~
+ aliases_to_remove : List~str~
}
class ChunkNode {
+ id : str
+ dialog_id : str
+ content : str
+ speaker : str
+ chunk_embedding : List~float~
+ sequence_number : int
+ metadata : dict
}
MetadataExtractor --> UserMetadata : returns
MetadataExtractionResponse --> UserMetadata : contains
UserMetadata --> UserMetadataProfile : has
UserMetadata --> UserMetadataBehavioralHints : has
MetadataExtractor --> ExtractedEntityNode : reads
MetadataExtractor --> StatementNode : reads
MetadataExtractor --> StatementEntityEdge : reads
StatementNode --> ChunkNode : derived_from
class EndUserInfo {
+ end_user_id : str
+ other_name : str
+ aliases : List~str~
+ meta_data : dict
}
class EndUser {
+ id : str
+ workspace_id : str
+ other_name : str
}
EndUserInfo --> EndUser : references
MetadataExtractor --> MetadataExtractionResponse : uses
EndUserInfo --> UserMetadata : stores_in_meta_data
文件级改动(File-Level Changes)
技巧与命令(Tips and commands)与 Sourcery 交互(Interacting with Sourcery)
自定义你的体验(Customizing Your Experience)访问你的 控制台(dashboard) 来:
获取帮助(Getting Help)Original review guide in EnglishReviewer's GuideAdds an asynchronous user metadata extraction pipeline powered by a new LLM-based MetadataExtractor, wires it into the extraction orchestrator and Celery, enriches Neo4j graph modeling (speaker, user/assistant canonicalization, Lucene escaping), and refactors keyword search and alias handling so user metadata and aliases are maintained consistently across Postgres and Neo4j. Sequence diagram for async user metadata extraction pipelinesequenceDiagram
actor User
participant Orchestrator as ExtractionOrchestrator
participant MetadataExtractor as MetadataExtractor
participant CeleryBroker as CeleryBroker
participant CeleryWorker as CeleryWorker
participant PG as PostgresDB
participant EndUserRepo as EndUserRepository
participant EndUserInfoRepo as EndUserInfoRepository
participant MemoryConfigService as MemoryConfigService
participant MemoryClientFactory as MemoryClientFactory
participant LLMClient as LLMClient
participant Neo4j as Neo4jConnector
User->>Orchestrator: run()
Orchestrator->>MetadataExtractor: collect_user_related_statements(entity_nodes, statement_nodes, statement_entity_edges)
MetadataExtractor-->>Orchestrator: user_statements
alt has_user_statements
Orchestrator->>CeleryBroker: enqueue extract_user_metadata_task(end_user_id, statements, config_id, language)
CeleryBroker-->>CeleryWorker: dispatch extract_user_metadata_task
CeleryWorker->>PG: get_db_context()
CeleryWorker->>EndUserRepo: get_by_id(end_user_id)
EndUserRepo-->>CeleryWorker: end_user
CeleryWorker->>MemoryConfigService: get_config_with_fallback(memory_config_id, workspace_id)
MemoryConfigService-->>CeleryWorker: memory_config
CeleryWorker->>MemoryClientFactory: get_llm_client(llm_id)
MemoryClientFactory-->>CeleryWorker: LLMClient
CeleryWorker->>EndUserInfoRepo: get_by_end_user_id(end_user_id)
EndUserInfoRepo-->>CeleryWorker: existing_info(meta_data, aliases)
CeleryWorker->>MetadataExtractor: extract_metadata(statements, existing_metadata, existing_aliases)
MetadataExtractor->>LLMClient: response_structured(prompt, MetadataExtractionResponse)
LLMClient-->>MetadataExtractor: MetadataExtractionResponse(user_metadata, aliases_to_add, aliases_to_remove)
MetadataExtractor-->>CeleryWorker: user_metadata, aliases_to_add, aliases_to_remove
CeleryWorker->>CeleryWorker: clean_metadata(user_metadata)
CeleryWorker->>CeleryWorker: _update_timestamps(existing_meta, cleaned_meta, updated_at, now)
CeleryWorker->>PG: update EndUserInfo.meta_data, EndUserInfo.aliases, EndUser.other_name
PG-->>CeleryWorker: commit
CeleryWorker->>Neo4j: execute_query(MATCH ExtractedEntity user placeholders SET aliases)
Neo4j-->>CeleryWorker: ok
CeleryWorker-->>CeleryBroker: task result(status=SUCCESS)
CeleryBroker-->>Orchestrator: async completion (logged only)
else no_user_statements
Orchestrator->>Orchestrator: skip metadata extraction
end
Class diagram for metadata extraction and related modelsclassDiagram
class MetadataExtractor {
- llm_client
- language : str
+ MetadataExtractor(llm_client, language : str)
+ detect_language(statements : List~str~) str
+ collect_user_related_statements(entity_nodes : List~ExtractedEntityNode~, statement_nodes : List~StatementNode~, statement_entity_edges : List~StatementEntityEdge~) List~str~
+ extract_metadata(statements : List~str~, existing_metadata : dict, existing_aliases : List~str~) tuple
}
class ExtractedEntityNode {
+ id : str
+ name : str
+ entity_type : str
+ end_user_id : str
}
class StatementNode {
+ id : str
+ statement : str
+ speaker : str
+ end_user_id : str
}
class StatementEntityEdge {
+ source : str
+ target : str
}
class UserMetadataProfile {
+ role : str
+ domain : str
+ expertise : List~str~
+ interests : List~str~
}
class UserMetadataBehavioralHints {
+ learning_stage : str
+ preferred_depth : str
+ tone_preference : str
}
class UserMetadata {
+ profile : UserMetadataProfile
+ behavioral_hints : UserMetadataBehavioralHints
+ knowledge_tags : List~str~
}
class MetadataExtractionResponse {
+ user_metadata : UserMetadata
+ aliases_to_add : List~str~
+ aliases_to_remove : List~str~
}
class ChunkNode {
+ id : str
+ dialog_id : str
+ content : str
+ speaker : str
+ chunk_embedding : List~float~
+ sequence_number : int
+ metadata : dict
}
MetadataExtractor --> UserMetadata : returns
MetadataExtractionResponse --> UserMetadata : contains
UserMetadata --> UserMetadataProfile : has
UserMetadata --> UserMetadataBehavioralHints : has
MetadataExtractor --> ExtractedEntityNode : reads
MetadataExtractor --> StatementNode : reads
MetadataExtractor --> StatementEntityEdge : reads
StatementNode --> ChunkNode : derived_from
class EndUserInfo {
+ end_user_id : str
+ other_name : str
+ aliases : List~str~
+ meta_data : dict
}
class EndUser {
+ id : str
+ workspace_id : str
+ other_name : str
}
EndUserInfo --> EndUser : references
MetadataExtractor --> MetadataExtractionResponse : uses
EndUserInfo --> UserMetadata : stores_in_meta_data
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - 我发现了 1 个问题,并留下了一些整体反馈:
- 在
extract_user_metadata_task.clean_metadata中,字典推导式会对同一个值多次调用clean_metadata(v)(既在值表达式中,又在过滤条件中),这样既低效又容易出错;建议为每个键先计算一次cleaned = clean_metadata(v),然后在两个位置复用这个结果。 - 在
MetadataExtractor.extract_metadata中,提示语的语言完全由detect_language(statements)决定,而忽略了传入 extractor 的language参数;如果调用方显式配置了language,你可能需要尊重这个参数(或者至少清晰地说明优先级),以避免产生意料之外的行为。
给 AI Agent 的提示
请根据这次代码审查中的评论进行修改:
## 整体评论
- 在 `extract_user_metadata_task.clean_metadata` 中,字典推导式会对同一个值多次调用 `clean_metadata(v)`(既在值表达式中,又在过滤条件中),这样既低效又容易出错;建议为每个键先计算一次 `cleaned = clean_metadata(v)`,然后在两个位置复用这个结果。
- 在 `MetadataExtractor.extract_metadata` 中,提示语的语言完全由 `detect_language(statements)` 决定,而忽略了传入 extractor 的 `language` 参数;如果调用方显式配置了 `language`,你可能需要尊重这个参数(或者至少清晰地说明优先级),以避免产生意料之外的行为。
## 单条评论
### 评论 1
<location path="api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py" line_range="34-43" />
<code_context>
+ def __init__(self, llm_client, language: str = "zh"):
</code_context>
<issue_to_address>
**question:** 请澄清或统一构造函数参数 `language` 与动态语言检测逻辑之间的关系。
`__init__` 会存储 `language`,但 `extract_metadata` 始终使用 `detect_language(statements)` 而不是 `self.language`,因此构造函数参数实际上被忽略了。建议在设置了 `self.language` 时优先使用它(仅在其为 `None` 时才自动检测),或者如果行为应始终自动检测,则移除 `language` 参数,从而让 API 对调用方来说不那么令人困惑。
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review。
Original comment in English
Hey - I've found 1 issue, and left some high level feedback:
- In
extract_user_metadata_task.clean_metadata, the dict comprehension callsclean_metadata(v)multiple times for the same value (both in the value expression and the filter), which is inefficient and can be error‑prone; consider computingcleaned = clean_metadata(v)once per key and reusing it in both places. - In
MetadataExtractor.extract_metadata, the prompt language is determined solely bydetect_language(statements)and ignores thelanguagepassed into the extractor; if the caller explicitly configureslanguage, you may want to respect that (or at least make the precedence clear) to avoid surprising behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `extract_user_metadata_task.clean_metadata`, the dict comprehension calls `clean_metadata(v)` multiple times for the same value (both in the value expression and the filter), which is inefficient and can be error‑prone; consider computing `cleaned = clean_metadata(v)` once per key and reusing it in both places.
- In `MetadataExtractor.extract_metadata`, the prompt language is determined solely by `detect_language(statements)` and ignores the `language` passed into the extractor; if the caller explicitly configures `language`, you may want to respect that (or at least make the precedence clear) to avoid surprising behavior.
## Individual Comments
### Comment 1
<location path="api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py" line_range="34-43" />
<code_context>
+ def __init__(self, llm_client, language: str = "zh"):
</code_context>
<issue_to_address>
**question:** Clarify or align the `language` constructor argument with the dynamic language detection logic.
`__init__` stores `language`, but `extract_metadata` always uses `detect_language(statements)` instead of `self.language`, so the constructor argument is effectively ignored. Consider either using `self.language` when set (and only auto-detecting when it is `None`) or removing the `language` parameter if behavior should always be auto-detected, so the API is less confusing for callers.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
…ogic - Make MetadataExtractor language param optional (default None) to support auto-detection fallback when no language is explicitly set - Refactor clean_metadata from walrus-operator dict comprehension to explicit loop for correctness and readability
keeees
approved these changes
Apr 10, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary by Sourcery
添加一个异步流水线,用于从与用户相关的语句中提取并持久化用户元数据和别名,并将其接入去重后的知识抽取与搜索栈。
New Features:
Bug Fixes:
end_user_info创建过程中的别名更新逻辑,通过避免写入空元数据记录,并将别名处理集中到元数据任务中。Enhancements:
query,并转义包括/在内的 Lucene 特殊字符,以减少搜索错误。Original summary in English
Summary by Sourcery
Add an asynchronous pipeline to extract and persist user metadata and aliases from user-related statements, and wire it into the post-dedup knowledge extraction and search stack.
New Features:
Bug Fixes:
Enhancements:
queryparameter and escape Lucene special characters, including '/', to reduce search errors.