diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index 0719167..1482815 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -1,6 +1,7 @@ """Session spawning for agent delegation. Implements sub-session creation with configuration inheritance and overlays. +Uses spawn_bundle() as the core primitive for session lifecycle management. """ import logging @@ -392,270 +393,176 @@ async def spawn_sub_session( ) assert sub_session_id is not None # Always generated above if not provided - # Create child session with parent_id and inherited UX systems (kernel mechanism) - # NOTE: We intentionally do NOT share parent's loader here. - # The loader caches modules with their config, so sharing would cause child sessions - # to get the parent's cached orchestrator config instead of their own. - # Each session needs its own loader to respect session-specific config (e.g., rate limiting). - display_system = parent_session.coordinator.display_system - child_session = AmplifierSession( - config=merged_config, - loader=None, # Let child create its own loader to respect its config - session_id=sub_session_id, - parent_id=parent_session.session_id, # Links to parent - approval_system=parent_session.coordinator.approval_system, # Inherit from parent - display_system=display_system, # Inherit from parent + # ========================================================================= + # PHASE 2: Create Inline Bundle from Merged Config + # ========================================================================= + # spawn_bundle() requires a Bundle object. We create an inline bundle from + # the merged config dict. This is equivalent to calling bundle.prepare() + # with the config directly. + + from amplifier_foundation.bundle import Bundle + + inline_bundle = Bundle( + name=agent_name, + version="1.0.0", + mount_plan=merged_config, + agents={}, # Agents are resolved at CLI layer, not bundle layer + context_files=[], ) - # Notify display system we're entering a nested session (for indentation) + # ========================================================================= + # PHASE 3: Display System Nesting + # ========================================================================= + + display_system = parent_session.coordinator.display_system if hasattr(display_system, "push_nesting"): display_system.push_nesting() - # NOTE: Parent message injection moved to AFTER initialize() because - # the context module is only mounted during initialize(). + # ========================================================================= + # PHASE 4: Define CLI-Specific Setup Hook + # ========================================================================= + # This hook runs after session init but before execution, allowing us to + # register CLI-specific capabilities that spawn_bundle() doesn't know about. - # Register app-layer capabilities for child session BEFORE initialization - # These must be mounted before initialize() because module loading needs the resolver - from amplifier_foundation.mentions import ContentDeduplicator - - from amplifier_app_cli.lib.mention_loading.app_resolver import AppMentionResolver - from amplifier_app_cli.paths import create_foundation_resolver - - # Module source resolver - inherit from parent to preserve BundleModuleResolver in bundle mode - # CRITICAL: Must be mounted BEFORE initialize() so modules with source: directives can be resolved - parent_resolver = parent_session.coordinator.get("module-source-resolver") - if parent_resolver: - await child_session.coordinator.mount("module-source-resolver", parent_resolver) - else: - # Fallback to fresh resolver if parent doesn't have one - resolver = create_foundation_resolver() - await child_session.coordinator.mount("module-source-resolver", resolver) - - # Share sys.path additions from parent BEFORE initialize() - # This ensures bundle packages (like amplifier_bundle_python_dev) are importable - # when child session loads modules that depend on them. - # - # Two sources of paths need to be shared: - # 1. loader._added_paths - individual module paths added during loading - # 2. bundle_package_paths capability - bundle src/ directories (e.g., python-dev) - import sys - - paths_to_share: list[str] = [] - - # Source 1: Module paths from parent loader - if hasattr(parent_session, "loader") and parent_session.loader is not None: - parent_added_paths = getattr(parent_session.loader, "_added_paths", []) - paths_to_share.extend(parent_added_paths) - - # Source 2: Bundle package paths (src/ directories from bundles like python-dev) - # These are registered as a capability during bundle preparation - bundle_package_paths = parent_session.coordinator.get_capability( - "bundle_package_paths" - ) - if bundle_package_paths: - paths_to_share.extend(bundle_package_paths) - - # Add all paths to sys.path - if paths_to_share: - for path in paths_to_share: - if path not in sys.path: - sys.path.insert(0, path) - logger.debug( - f"Shared {len(paths_to_share)} sys.path entries from parent to child session" + async def cli_pre_execute_hook(child_session: AmplifierSession) -> None: + """Register CLI-specific capabilities on child session.""" + # Self-delegation depth tracking (for recursion limits) + child_session.coordinator.register_capability( + "self_delegation_depth", self_delegation_depth ) - # Initialize child session (mounts modules per merged config) - # Now the resolver is available for loading modules with source: directives - await child_session.initialize() - - # Note: Parent context inheritance is now handled by tool-task formatting - # the parent messages directly into the instruction text. This ensures the - # child agent sees the context regardless of session/orchestrator behavior. - # The parent_messages parameter is kept for potential future use. - - # Wire up cancellation propagation: parent cancellation should propagate to child - # This enables graceful Ctrl+C handling for nested agent sessions - parent_cancellation = parent_session.coordinator.cancellation - child_cancellation = child_session.coordinator.cancellation - parent_cancellation.register_child(child_cancellation) - logger.debug( - f"Registered child cancellation token for sub-session {sub_session_id}" - ) + # Override session.spawn with CLI's version (has more parameters) + # spawn_bundle() registers its own version, but CLI needs agent resolution + async def child_spawn_capability( + agent_name: str, + instruction: str, + parent_session: AmplifierSession, + agent_configs: dict[str, dict], + sub_session_id: str | None = None, + tool_inheritance: dict[str, list[str]] | None = None, + hook_inheritance: dict[str, list[str]] | None = None, + orchestrator_config: dict | None = None, + parent_messages: list[dict] | None = None, + provider_override: str | None = None, + model_override: str | None = None, + provider_preferences: list | None = None, + self_delegation_depth: int = 0, + ) -> dict: + return await spawn_sub_session( + agent_name=agent_name, + instruction=instruction, + parent_session=parent_session, + agent_configs=agent_configs, + sub_session_id=sub_session_id, + tool_inheritance=tool_inheritance, + hook_inheritance=hook_inheritance, + orchestrator_config=orchestrator_config, + parent_messages=parent_messages, + provider_override=provider_override, + model_override=model_override, + provider_preferences=provider_preferences, + self_delegation_depth=self_delegation_depth, + ) - # Mention resolver - inherit from parent to preserve bundle_override context - parent_mention_resolver = parent_session.coordinator.get_capability( - "mention_resolver" - ) - if parent_mention_resolver: - child_session.coordinator.register_capability( - "mention_resolver", parent_mention_resolver - ) - else: - # Fallback to fresh resolver if parent doesn't have one - child_session.coordinator.register_capability( - "mention_resolver", AppMentionResolver() - ) + async def child_resume_capability( + sub_session_id: str, instruction: str + ) -> dict: + return await resume_sub_session( + sub_session_id=sub_session_id, + instruction=instruction, + ) - # Mention deduplicator - inherit from parent to preserve session-wide deduplication state - parent_deduplicator = parent_session.coordinator.get_capability( - "mention_deduplicator" - ) - if parent_deduplicator: + # Override spawn_bundle's session.spawn with CLI's version child_session.coordinator.register_capability( - "mention_deduplicator", parent_deduplicator + "session.spawn", child_spawn_capability ) - else: - # Fallback to fresh deduplicator if parent doesn't have one child_session.coordinator.register_capability( - "mention_deduplicator", ContentDeduplicator() + "session.resume", child_resume_capability ) - # Working directory - inherit from parent for consistent path resolution - # This ensures child sessions use the same project directory as parent - # (critical for server/web deployments where process cwd differs from user's project) - parent_working_dir = parent_session.coordinator.get_capability( - "session.working_dir" - ) - if parent_working_dir: - child_session.coordinator.register_capability( - "session.working_dir", parent_working_dir + # Approval provider (for hooks-approval module, if active) + register_provider_fn = child_session.coordinator.get_capability( + "approval.register_provider" ) + if register_provider_fn: + from rich.console import Console - # Self-delegation depth tracking (for recursion limits) - # This is a simple value capability, not a function - child_session.coordinator.register_capability( - "self_delegation_depth", self_delegation_depth - ) - - # Register session spawning capabilities on child session - # This enables nested agent delegation (child can spawn grandchildren) - # The capabilities are closures that reference the spawn/resume functions - async def child_spawn_capability( - agent_name: str, - instruction: str, - parent_session: AmplifierSession, - agent_configs: dict[str, dict], - sub_session_id: str | None = None, - tool_inheritance: dict[str, list[str]] | None = None, - hook_inheritance: dict[str, list[str]] | None = None, - orchestrator_config: dict | None = None, - parent_messages: list[dict] | None = None, - provider_override: str | None = None, - model_override: str | None = None, - provider_preferences: list | None = None, - self_delegation_depth: int = 0, - ) -> dict: - return await spawn_sub_session( - agent_name=agent_name, - instruction=instruction, - parent_session=parent_session, - agent_configs=agent_configs, - sub_session_id=sub_session_id, - tool_inheritance=tool_inheritance, - hook_inheritance=hook_inheritance, - orchestrator_config=orchestrator_config, - parent_messages=parent_messages, - provider_override=provider_override, - model_override=model_override, - provider_preferences=provider_preferences, - self_delegation_depth=self_delegation_depth, - ) - - async def child_resume_capability(sub_session_id: str, instruction: str) -> dict: - return await resume_sub_session( - sub_session_id=sub_session_id, - instruction=instruction, - ) - - child_session.coordinator.register_capability( - "session.spawn", child_spawn_capability - ) - child_session.coordinator.register_capability( - "session.resume", child_resume_capability - ) - - # Approval provider (for hooks-approval module, if active) - register_provider_fn = child_session.coordinator.get_capability( - "approval.register_provider" - ) - if register_provider_fn: - from rich.console import Console - - from amplifier_app_cli.approval_provider import CLIApprovalProvider - - console = Console() - approval_provider = CLIApprovalProvider(console) - register_provider_fn(approval_provider) - logger.debug(f"Registered approval provider for child session {sub_session_id}") - - # Inject agent's system instruction - # Check top-level instruction first (from agent .md file body), then nested system.instruction - system_instruction = agent_config.get("instruction") or agent_config.get( - "system", {} - ).get("instruction") - if system_instruction: - context = child_session.coordinator.get("context") - if context and hasattr(context, "add_message"): - await context.add_message({"role": "system", "content": system_instruction}) + from amplifier_app_cli.approval_provider import CLIApprovalProvider - # Execute instruction in child session - response = await child_session.execute(instruction) - - # Persist state for multi-turn resumption - from datetime import UTC - from datetime import datetime - - from .session_store import SessionStore + console = Console() + approval_provider = CLIApprovalProvider(console) + register_provider_fn(approval_provider) + logger.debug( + f"Registered approval provider for child session {sub_session_id}" + ) - context = child_session.coordinator.get("context") - transcript = await context.get_messages() if context else [] + # ========================================================================= + # PHASE 5: Prepare CLI-Specific Metadata + # ========================================================================= + # spawn_bundle() handles basic persistence, but CLI needs additional metadata + # for session resumption, trace context, and working directory. - # Extract or generate trace_id for W3C Trace Context pattern - # Root session ID is the trace_id, propagate it to all children parent_trace_id = getattr(parent_session, "trace_id", parent_session.session_id) # Extract child_span from sub_session_id for short_id resolution - # Format: {parent_id}-{child_span}_{agent_name} child_span: str | None = None if sub_session_id and "_" in sub_session_id and "-" in sub_session_id: base = sub_session_id.rsplit("_", 1)[0] # Remove agent name child_span = base.rsplit("-", 1)[-1] # Get child_span (16 hex chars) - metadata = { - "session_id": sub_session_id, - "parent_id": parent_session.session_id, - "trace_id": parent_trace_id, # W3C Trace Context: trace entire conversation + metadata_extra = { + "trace_id": parent_trace_id, # W3C Trace Context "agent_name": agent_name, - "child_span": child_span, # For short_id resolution (first 8 chars = short_id) - "created": datetime.now(UTC).isoformat(), + "child_span": child_span, "config": merged_config, "agent_overlay": agent_config, - "turn_count": 1, "bundle_context": _extract_bundle_context(parent_session), - "self_delegation_depth": self_delegation_depth, # For recursion limit tracking - # Store working_dir for session sync between CLI and web + "self_delegation_depth": self_delegation_depth, "working_dir": str(Path.cwd().resolve()), } - store = SessionStore() - store.save(sub_session_id, transcript, metadata) - logger.debug(f"Sub-session {sub_session_id} state persisted") + # ========================================================================= + # PHASE 6: Extract System Instruction + # ========================================================================= - # Unregister child cancellation token before cleanup - parent_cancellation.unregister_child(child_cancellation) - logger.debug( - f"Unregistered child cancellation token for sub-session {sub_session_id}" - ) + system_instruction = agent_config.get("instruction") or agent_config.get( + "system", {} + ).get("instruction") - # Notify display system we're exiting the nested session (for indentation) - if hasattr(display_system, "pop_nesting"): - display_system.pop_nesting() + # ========================================================================= + # PHASE 7: Spawn Using Foundation Primitive + # ========================================================================= - # Cleanup child session - await child_session.cleanup() + from amplifier_foundation.spawn import spawn_bundle + + from .session_store import SessionStore + + try: + result = await spawn_bundle( + bundle=inline_bundle, + instruction=instruction, + parent_session=parent_session, + # Inheritance controls - already filtered in merged_config + inherit_providers=False, + inherit_tools=False, + inherit_hooks=False, + # Session identity + session_id=sub_session_id, + session_name=agent_name, + # Persistence + session_storage=SessionStore(), + # Additional setup + system_instruction=system_instruction, + pre_execute_hook=cli_pre_execute_hook, + metadata_extra=metadata_extra, + ) + finally: + # Always restore display nesting, even on error + if hasattr(display_system, "pop_nesting"): + display_system.pop_nesting() # Return response and session ID for potential multi-turn - return {"output": response, "session_id": sub_session_id} + return {"output": result.output, "session_id": result.session_id} async def resume_sub_session(sub_session_id: str, instruction: str) -> dict: @@ -812,6 +719,59 @@ async def resume_sub_session(sub_session_id: str, instruction: str) -> dict: "self_delegation_depth", self_delegation_depth ) + # Working directory - restore from metadata for consistent path resolution + working_dir = metadata.get("working_dir") + if working_dir: + child_session.coordinator.register_capability( + "session.working_dir", working_dir + ) + + # Register session spawning capabilities on resumed session + # This enables nested agent delegation from resumed sessions + async def child_spawn_capability( + agent_name: str, + instruction: str, + parent_session: AmplifierSession, + agent_configs: dict[str, dict], + sub_session_id: str | None = None, + tool_inheritance: dict[str, list[str]] | None = None, + hook_inheritance: dict[str, list[str]] | None = None, + orchestrator_config: dict | None = None, + parent_messages: list[dict] | None = None, + provider_override: str | None = None, + model_override: str | None = None, + provider_preferences: list | None = None, + self_delegation_depth: int = 0, + ) -> dict: + return await spawn_sub_session( + agent_name=agent_name, + instruction=instruction, + parent_session=parent_session, + agent_configs=agent_configs, + sub_session_id=sub_session_id, + tool_inheritance=tool_inheritance, + hook_inheritance=hook_inheritance, + orchestrator_config=orchestrator_config, + parent_messages=parent_messages, + provider_override=provider_override, + model_override=model_override, + provider_preferences=provider_preferences, + self_delegation_depth=self_delegation_depth, + ) + + async def child_resume_capability(sub_session_id: str, instruction: str) -> dict: + return await resume_sub_session( + sub_session_id=sub_session_id, + instruction=instruction, + ) + + child_session.coordinator.register_capability( + "session.spawn", child_spawn_capability + ) + child_session.coordinator.register_capability( + "session.resume", child_resume_capability + ) + # Approval provider (for hooks-approval module, if active) register_provider_fn = child_session.coordinator.get_capability( "approval.register_provider"